Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-40810: Implement an allocateNodes "auto" #22

Merged
merged 10 commits into from
Nov 29, 2023
6 changes: 6 additions & 0 deletions python/lsst/ctrl/execute/allocator.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,12 @@ def isVerbose(self):
"""
return self.opts.verbose

def isAuto(self):
"""Status of the auto flag
@return True if the flag was set, False otherwise
"""
return self.opts.auto

def getUserName(self):
"""Accessor for USER_NAME
@return the value of USER_NAME
Expand Down
6 changes: 6 additions & 0 deletions python/lsst/ctrl/execute/allocatorParser.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,12 @@ def parseArgs(self, basename):

parser = argparse.ArgumentParser(prog=basename)
parser.add_argument("platform", help="node allocation platform")
parser.add_argument(
"--auto",
action="store_true",
dest="auto",
help="use automatic detection of jobs to determine glide-ins",
)
parser.add_argument(
"-n",
"--node-count",
Expand Down
170 changes: 162 additions & 8 deletions python/lsst/ctrl/execute/slurmPlugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ def submit(self, platform, platformPkgDir):

self.loadSlurm(configName, platformPkgDir)
verbose = self.isVerbose()
auto = self.isAuto()

# create the fully-resolved scratch directory string
scratchDirParam = self.getScratchDirectory()
Expand All @@ -60,11 +61,9 @@ def submit(self, platform, platformPkgDir):
)
self.createAllocationFile(allocationName)

nodes = self.getNodes()
cpus = self.getCPUs()
memoryPerCore = self.getMemoryPerCore()
totalMemory = cpus * memoryPerCore
print("Targeting %s glidein(s) for the computing pool/set." % nodes)

# run the sbatch command
template = Template(self.getLocalScratchDirectory())
Expand All @@ -89,14 +88,26 @@ def submit(self, platform, platformPkgDir):
print("The Slurm job name for the glidein jobs is %s " % jobname)
print("The user home directory is %s " % self.getUserHome())

batcmd = "".join(["squeue --noheader --name=", jobname, " | wc -l"])
result = subprocess.check_output(batcmd, shell=True)
strResult = result.decode("UTF-8")
if auto:
numberToAdd = self.glideinsFromJobPressure()
print("The number of glidein jobs to submit now is %s" % numberToAdd)
else:
nodes = self.getNodes()
# In this case 'nodes' is the Target.
print("Targeting %s glidein(s) for the computing pool/set." % nodes)

batcmd = "".join(["squeue --noheader --name=", jobname, " | wc -l"])
print("The squeue command is: %s " % batcmd)
try:
result = subprocess.check_output(batcmd, shell=True)
except subprocess.CalledProcessError as e:
print(e.output)
strResult = result.decode("UTF-8")

print("Detected this number of preexisting glidein jobs: %s " % strResult)
print("Detected this number of preexisting glidein jobs: %s " % strResult)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wondering if this output would be helpful in the automated case as well.


numberToAdd = nodes - int(strResult)
print("The number of glidein jobs to submit now is %s" % numberToAdd)
numberToAdd = nodes - int(strResult)
print("The number of glidein jobs to submit now is %s" % numberToAdd)

for glide in range(0, numberToAdd):
print("Submitting glidein %s " % glide)
Expand Down Expand Up @@ -162,3 +173,146 @@ def createAllocationFile(self, input):
print("Wrote new Slurm job allocation bash script to %s" % outfile)
os.chmod(outfile, 0o755)
return outfile

def glideinsFromJobPressure(self):
"""Calculate the number of glideins needed from job pressure

Returns
-------
number : `str`

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think copy-paste error in the return type.

The number of glideins
"""

import math
import socket

import htcondor
from lsst.ctrl.bps.htcondor import condor_q

verbose = self.isVerbose()

maxNumberOfGlideins = self.getNodes()
coresPerGlidein = self.getCPUs()
ratioMemCore = self.getMemoryPerCore()
auser = self.getUserName()

# initialize counters
totalCores = 0

try:
schedd_name = socket.getfqdn()
coll = htcondor.Collector()
schedd_ad = coll.locate(htcondor.DaemonTypes.Schedd)
scheddref = htcondor.Schedd(schedd_ad)
# projection contains the job classads to be returned.
# These include the cpu and memory profile of each job,
# in the form of RequestCpus and RequestMemory
projection = [
"JobStatus",
"Owner",
"RequestCpus",
"JobUniverse",
"RequestMemory",
]

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps a comment about the pieces of the constraint? Something like "Limit query to compute jobs for this user that are idle" or put comments with JobStatus and JobUniverse (It took me a couple minutes to figure out why needed JobUniverse).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added comments on the classads to be returned, and the constraint.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and it can be emphasized that there are classads to be returned (the first list), and then the constraint (a second list.) All of the classads there do not have to be returned but they seemed useful to have available for debugging.

owner = f'(Owner=="{auser}") '
jstat = "&& (JobStatus==1) "
juniv = "&& (JobUniverse==5)"
# The constraint determines that the jobs to be returned belong to
# the current user (Owner) and are Idle vanilla universe jobs.
full_constraint = f"{owner}{jstat}{juniv}"
if verbose:
print(f"full_constraint {full_constraint}")
condorq_data = condor_q(
constraint=full_constraint,
schedds={schedd_name: scheddref},

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this only detecting jobs on a single schedd? So one would have to run allocateNodes on multiple machines but jobs submitted on either can use nodes from either allocateNodes?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is the case for this version of the software, please add it to the documentation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes this is detecting jobs for the single schedd on the current machine. So working on/across sdfrome001/2 simultaneously, yes each needs an allocateNodes. They do not interfere at the first level, all necessary glideins will be submitted, but then jobs could run on either. Interference does exist but it is small (all Idle jobs at a given instant are accounted for.) The addition of Tags / Requirements would be the solution.
I'll comment in the docs about the single schedd.

projection=projection,
)
if len(condorq_data) > 0:
print("glideinsFromJobPressure: Fetched")
condorq_bps = condorq_data[schedd_name]
if verbose:
print(len(condorq_bps))
print(condorq_bps)
# disassemble the dictionary of dictionaries
for jid in list(condorq_bps.keys()):
job = condorq_bps[jid]
thisCores = job["RequestCpus"]
thisMemory = job["RequestMemory"]
totalCores = totalCores + thisCores
if verbose:
print(
f"glideinsFromJobPressure: The key in the dictionary is {jid}"
)
print(f"\tRequestCpus {thisCores}")
print(f"\tCurrent value of totalCores {totalCores}")
thisRatio = thisMemory / ratioMemCore
if thisRatio > thisCores:
if verbose:
print("\t\tNeed to Add More:")
print(f"\t\tRequestMemory is {thisMemory} ")
print(f"\t\tRatio to {ratioMemCore} MB is {thisRatio} ")
totalCores = totalCores + (thisRatio - thisCores)
if verbose:
print(f"\t\tCurrent value of totalCores {totalCores}")

else:
print("Length Zero")
print(len(condorq_data))
except Exception as exc:
raise type(exc)("Problem querying condor schedd for jobs") from None

print(f"glideinsFromJobPressure: The final TotalCores is {totalCores}")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is TotalCores? Not in verbose so, perhaps something like "total number of cores needed for submitted jobs"?

numberOfGlideins = math.ceil(totalCores / coresPerGlidein)
print(
f"glideinsFromJobPressure: Target # Glideins for Idle Jobs is {numberOfGlideins}"
)

# Check Slurm queue Running glideins
jobname = f"glide_{auser}"
existingGlideinsRunning = 0
batcmd = f"squeue --noheader --states=R --name={jobname} | wc -l"
print("The squeue command is: %s " % batcmd)
try:
resultR = subprocess.check_output(batcmd, shell=True)
except subprocess.CalledProcessError as e:
print(e.output)
existingGlideinsRunning = int(resultR.decode("UTF-8"))

# Check Slurm queue Idle Glideins
existingGlideinsIdle = 0
batcmd = f"squeue --noheader --states=PD --name={jobname} | wc -l"
print("The squeue command is: %s " % batcmd)
try:
resultPD = subprocess.check_output(batcmd, shell=True)
except subprocess.CalledProcessError as e:
print(e.output)
existingGlideinsIdle = int(resultPD.decode("UTF-8"))

print(
f"glideinsFromJobPressure: existingGlideinsRunning {existingGlideinsRunning}"
)
print(f"glideinsFromJobPressure: existingGlideinsIdle {existingGlideinsIdle}")
numberOfGlideinsRed = numberOfGlideins - existingGlideinsIdle

print(
f"glideinsFromJobPressure: Target # Glideins Max to Submit {numberOfGlideinsRed}"
)

maxIdleGlideins = maxNumberOfGlideins - existingGlideinsRunning
maxSubmitGlideins = maxIdleGlideins - existingGlideinsIdle

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needed to read code here a couple times. maxSubmitGlideins confused me when set to that difference. I would move the non-maxSubmitGlideins print statements above here moving these two differences closer to the if statement below. Then put a comment about capping new glideins based upon a limit of total idle glideins in queue.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I will leave these as is right now, as this is the "working like a glidein factory" part, and it is likely worth another ticket to put the whole procedure into "memory"
and think through the best names for all of these variables, if the current ones are not totally clear. (It's fairly complicated.)


print(f"glideinsFromJobPressure: maxNumberOfGlideins {maxNumberOfGlideins}")
print(
f"glideinsFromJobPressure: existingGlideinsRunning {existingGlideinsRunning}"
)
print(f"glideinsFromJobPressure: maxIdleGlideins {maxIdleGlideins}")
print(f"glideinsFromJobPressure: existingGlideinsIdle {existingGlideinsIdle}")
print(f"glideinsFromJobPressure: maxSubmitGlideins {maxSubmitGlideins}")

if numberOfGlideinsRed > maxSubmitGlideins:
numberOfGlideinsRed = maxSubmitGlideins

print(
f"glideinsFromJobPressure: The number of Glideins to submit now is {numberOfGlideinsRed}"
)
return numberOfGlideinsRed
1 change: 1 addition & 0 deletions ups/ctrl_execute.table
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
setupRequired(pex_config)
setupRequired(utils)
setupRequired(ctrl_bps_htcondor)

envPrepend(PYTHONPATH, ${PRODUCT_DIR}/python)
envPrepend(PATH, ${PRODUCT_DIR}/bin)