Skip to content

Commit

Permalink
Merge pull request #22 from lsst/tickets/DM-40810
Browse files Browse the repository at this point in the history
DM-40810: Implement an allocateNodes "auto"
  • Loading branch information
daues authored Nov 29, 2023
2 parents 8a65b98 + 85cc365 commit e3d61e8
Show file tree
Hide file tree
Showing 4 changed files with 175 additions and 8 deletions.
6 changes: 6 additions & 0 deletions python/lsst/ctrl/execute/allocator.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,12 @@ def isVerbose(self):
"""
return self.opts.verbose

def isAuto(self):
"""Status of the auto flag
@return True if the flag was set, False otherwise
"""
return self.opts.auto

def getUserName(self):
"""Accessor for USER_NAME
@return the value of USER_NAME
Expand Down
6 changes: 6 additions & 0 deletions python/lsst/ctrl/execute/allocatorParser.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,12 @@ def parseArgs(self, basename):

parser = argparse.ArgumentParser(prog=basename)
parser.add_argument("platform", help="node allocation platform")
parser.add_argument(
"--auto",
action="store_true",
dest="auto",
help="use automatic detection of jobs to determine glide-ins",
)
parser.add_argument(
"-n",
"--node-count",
Expand Down
170 changes: 162 additions & 8 deletions python/lsst/ctrl/execute/slurmPlugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ def submit(self, platform, platformPkgDir):

self.loadSlurm(configName, platformPkgDir)
verbose = self.isVerbose()
auto = self.isAuto()

# create the fully-resolved scratch directory string
scratchDirParam = self.getScratchDirectory()
Expand All @@ -60,11 +61,9 @@ def submit(self, platform, platformPkgDir):
)
self.createAllocationFile(allocationName)

nodes = self.getNodes()
cpus = self.getCPUs()
memoryPerCore = self.getMemoryPerCore()
totalMemory = cpus * memoryPerCore
print("Targeting %s glidein(s) for the computing pool/set." % nodes)

# run the sbatch command
template = Template(self.getLocalScratchDirectory())
Expand All @@ -89,14 +88,26 @@ def submit(self, platform, platformPkgDir):
print("The Slurm job name for the glidein jobs is %s " % jobname)
print("The user home directory is %s " % self.getUserHome())

batcmd = "".join(["squeue --noheader --name=", jobname, " | wc -l"])
result = subprocess.check_output(batcmd, shell=True)
strResult = result.decode("UTF-8")
if auto:
numberToAdd = self.glideinsFromJobPressure()
print("The number of glidein jobs to submit now is %s" % numberToAdd)
else:
nodes = self.getNodes()
# In this case 'nodes' is the Target.
print("Targeting %s glidein(s) for the computing pool/set." % nodes)

batcmd = "".join(["squeue --noheader --name=", jobname, " | wc -l"])
print("The squeue command is: %s " % batcmd)
try:
result = subprocess.check_output(batcmd, shell=True)
except subprocess.CalledProcessError as e:
print(e.output)
strResult = result.decode("UTF-8")

print("Detected this number of preexisting glidein jobs: %s " % strResult)
print("Detected this number of preexisting glidein jobs: %s " % strResult)

numberToAdd = nodes - int(strResult)
print("The number of glidein jobs to submit now is %s" % numberToAdd)
numberToAdd = nodes - int(strResult)
print("The number of glidein jobs to submit now is %s" % numberToAdd)

for glide in range(0, numberToAdd):
print("Submitting glidein %s " % glide)
Expand Down Expand Up @@ -162,3 +173,146 @@ def createAllocationFile(self, input):
print("Wrote new Slurm job allocation bash script to %s" % outfile)
os.chmod(outfile, 0o755)
return outfile

def glideinsFromJobPressure(self):
"""Calculate the number of glideins needed from job pressure
Returns
-------
number : `str`
The number of glideins
"""

import math
import socket

import htcondor
from lsst.ctrl.bps.htcondor import condor_q

verbose = self.isVerbose()

maxNumberOfGlideins = self.getNodes()
coresPerGlidein = self.getCPUs()
ratioMemCore = self.getMemoryPerCore()
auser = self.getUserName()

# initialize counters
totalCores = 0

try:
schedd_name = socket.getfqdn()
coll = htcondor.Collector()
schedd_ad = coll.locate(htcondor.DaemonTypes.Schedd)
scheddref = htcondor.Schedd(schedd_ad)
# projection contains the job classads to be returned.
# These include the cpu and memory profile of each job,
# in the form of RequestCpus and RequestMemory
projection = [
"JobStatus",
"Owner",
"RequestCpus",
"JobUniverse",
"RequestMemory",
]
owner = f'(Owner=="{auser}") '
jstat = "&& (JobStatus==1) "
juniv = "&& (JobUniverse==5)"
# The constraint determines that the jobs to be returned belong to
# the current user (Owner) and are Idle vanilla universe jobs.
full_constraint = f"{owner}{jstat}{juniv}"
if verbose:
print(f"full_constraint {full_constraint}")
condorq_data = condor_q(
constraint=full_constraint,
schedds={schedd_name: scheddref},
projection=projection,
)
if len(condorq_data) > 0:
print("glideinsFromJobPressure: Fetched")
condorq_bps = condorq_data[schedd_name]
if verbose:
print(len(condorq_bps))
print(condorq_bps)
# disassemble the dictionary of dictionaries
for jid in list(condorq_bps.keys()):
job = condorq_bps[jid]
thisCores = job["RequestCpus"]
thisMemory = job["RequestMemory"]
totalCores = totalCores + thisCores
if verbose:
print(
f"glideinsFromJobPressure: The key in the dictionary is {jid}"
)
print(f"\tRequestCpus {thisCores}")
print(f"\tCurrent value of totalCores {totalCores}")
thisRatio = thisMemory / ratioMemCore
if thisRatio > thisCores:
if verbose:
print("\t\tNeed to Add More:")
print(f"\t\tRequestMemory is {thisMemory} ")
print(f"\t\tRatio to {ratioMemCore} MB is {thisRatio} ")
totalCores = totalCores + (thisRatio - thisCores)
if verbose:
print(f"\t\tCurrent value of totalCores {totalCores}")

else:
print("Length Zero")
print(len(condorq_data))
except Exception as exc:
raise type(exc)("Problem querying condor schedd for jobs") from None

print(f"glideinsFromJobPressure: The final TotalCores is {totalCores}")
numberOfGlideins = math.ceil(totalCores / coresPerGlidein)
print(
f"glideinsFromJobPressure: Target # Glideins for Idle Jobs is {numberOfGlideins}"
)

# Check Slurm queue Running glideins
jobname = f"glide_{auser}"
existingGlideinsRunning = 0
batcmd = f"squeue --noheader --states=R --name={jobname} | wc -l"
print("The squeue command is: %s " % batcmd)
try:
resultR = subprocess.check_output(batcmd, shell=True)
except subprocess.CalledProcessError as e:
print(e.output)
existingGlideinsRunning = int(resultR.decode("UTF-8"))

# Check Slurm queue Idle Glideins
existingGlideinsIdle = 0
batcmd = f"squeue --noheader --states=PD --name={jobname} | wc -l"
print("The squeue command is: %s " % batcmd)
try:
resultPD = subprocess.check_output(batcmd, shell=True)
except subprocess.CalledProcessError as e:
print(e.output)
existingGlideinsIdle = int(resultPD.decode("UTF-8"))

print(
f"glideinsFromJobPressure: existingGlideinsRunning {existingGlideinsRunning}"
)
print(f"glideinsFromJobPressure: existingGlideinsIdle {existingGlideinsIdle}")
numberOfGlideinsRed = numberOfGlideins - existingGlideinsIdle

print(
f"glideinsFromJobPressure: Target # Glideins Max to Submit {numberOfGlideinsRed}"
)

maxIdleGlideins = maxNumberOfGlideins - existingGlideinsRunning
maxSubmitGlideins = maxIdleGlideins - existingGlideinsIdle

print(f"glideinsFromJobPressure: maxNumberOfGlideins {maxNumberOfGlideins}")
print(
f"glideinsFromJobPressure: existingGlideinsRunning {existingGlideinsRunning}"
)
print(f"glideinsFromJobPressure: maxIdleGlideins {maxIdleGlideins}")
print(f"glideinsFromJobPressure: existingGlideinsIdle {existingGlideinsIdle}")
print(f"glideinsFromJobPressure: maxSubmitGlideins {maxSubmitGlideins}")

if numberOfGlideinsRed > maxSubmitGlideins:
numberOfGlideinsRed = maxSubmitGlideins

print(
f"glideinsFromJobPressure: The number of Glideins to submit now is {numberOfGlideinsRed}"
)
return numberOfGlideinsRed
1 change: 1 addition & 0 deletions ups/ctrl_execute.table
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
setupRequired(pex_config)
setupRequired(utils)
setupRequired(ctrl_bps_htcondor)

envPrepend(PYTHONPATH, ${PRODUCT_DIR}/python)
envPrepend(PATH, ${PRODUCT_DIR}/bin)

0 comments on commit e3d61e8

Please sign in to comment.