Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-40810: Implement an allocateNodes "auto" #22

Merged
merged 10 commits into from
Nov 29, 2023
6 changes: 6 additions & 0 deletions python/lsst/ctrl/execute/allocator.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,12 @@ def isVerbose(self):
"""
return self.opts.verbose

def isAuto(self):
"""Status of the auto flag
@return True if the flag was set, False otherwise
"""
return self.opts.auto

def getUserName(self):
"""Accessor for USER_NAME
@return the value of USER_NAME
Expand Down
7 changes: 7 additions & 0 deletions python/lsst/ctrl/execute/allocatorParser.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,13 @@ def parseArgs(self, basename):

parser = argparse.ArgumentParser(prog=basename)
parser.add_argument("platform", help="node allocation platform")
parser.add_argument(
"-u",

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

-u is used in a lot of commands for the username. I don't know if this will cause confusion or worse if a username is ever needed for allocateNodes. I don't know that there's a better single character. I would be ok with just --auto.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can I just remove the "-u" and it will be happy with only one such argument? I'll try.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems to be work; now just --auto.

"--auto",
action="store_true",
dest="auto",
help="use automatic detection of jobs to determine glide-ins",
)
parser.add_argument(
"-n",
"--node-count",
Expand Down
156 changes: 148 additions & 8 deletions python/lsst/ctrl/execute/slurmPlugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ def submit(self, platform, platformPkgDir):

self.loadSlurm(configName, platformPkgDir)
verbose = self.isVerbose()
auto = self.isAuto()

# create the fully-resolved scratch directory string
scratchDirParam = self.getScratchDirectory()
Expand All @@ -60,11 +61,9 @@ def submit(self, platform, platformPkgDir):
)
self.createAllocationFile(allocationName)

nodes = self.getNodes()
cpus = self.getCPUs()
memoryPerCore = self.getMemoryPerCore()
totalMemory = cpus * memoryPerCore
print("Targeting %s glidein(s) for the computing pool/set." % nodes)

# run the sbatch command
template = Template(self.getLocalScratchDirectory())
Expand All @@ -89,14 +88,23 @@ def submit(self, platform, platformPkgDir):
print("The Slurm job name for the glidein jobs is %s " % jobname)
print("The user home directory is %s " % self.getUserHome())

batcmd = "".join(["squeue --noheader --name=", jobname, " | wc -l"])
result = subprocess.check_output(batcmd, shell=True)
strResult = result.decode("UTF-8")
if auto:
numberToAdd = self.glideinsFromJobPressure()
print("The number of glidein jobs to submit now is %s" % numberToAdd)
else:
nodes = self.getNodes()
# In this case 'nodes' is the Target.
print("Targeting %s glidein(s) for the computing pool/set." % nodes)

batcmd = "".join(["squeue --noheader --name=", jobname, " | wc -l"])
print("The squeue command is: %s " % batcmd)
result = subprocess.check_output(batcmd, shell=True)
strResult = result.decode("UTF-8")

print("Detected this number of preexisting glidein jobs: %s " % strResult)
print("Detected this number of preexisting glidein jobs: %s " % strResult)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wondering if this output would be helpful in the automated case as well.


numberToAdd = nodes - int(strResult)
print("The number of glidein jobs to submit now is %s" % numberToAdd)
numberToAdd = nodes - int(strResult)
print("The number of glidein jobs to submit now is %s" % numberToAdd)

for glide in range(0, numberToAdd):
print("Submitting glidein %s " % glide)
Expand Down Expand Up @@ -162,3 +170,135 @@ def createAllocationFile(self, input):
print("Wrote new Slurm job allocation bash script to %s" % outfile)
os.chmod(outfile, 0o755)
return outfile

def glideinsFromJobPressure(self):
"""Calculate the number of glideins needed from job pressure

Returns
-------
number : `str`

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think copy-paste error in the return type.

The number of glideins
"""

import math
import socket

import htcondor
from lsst.ctrl.bps.htcondor import condor_q

verbose = self.isVerbose()

maxNumberOfGlideins = self.getNodes()
coresPerGlidein = self.getCPUs()
ratioMemCore = self.getMemoryPerCore()
auser = self.getUserName()

# initialize counters
totalCores = 0

try:
schedd_name = socket.getfqdn()
coll = htcondor.Collector()
schedd_ad = coll.locate(htcondor.DaemonTypes.Schedd)
scheddref = htcondor.Schedd(schedd_ad)
projection = [
"JobStatus",
"Owner",
"RequestCpus",
"JobUniverse",
"RequestMemory",
]

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps a comment about the pieces of the constraint? Something like "Limit query to compute jobs for this user that are idle" or put comments with JobStatus and JobUniverse (It took me a couple minutes to figure out why needed JobUniverse).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added comments on the classads to be returned, and the constraint.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and it can be emphasized that there are classads to be returned (the first list), and then the constraint (a second list.) All of the classads there do not have to be returned but they seemed useful to have available for debugging.

owner = "".join(['(Owner=="', auser, '") '])
jstat = "&& (JobStatus==1) "
juniv = "&& (JobUniverse==5)"
full_constraint = "".join([owner, jstat, juniv])
if verbose:
print(f"full_constraint {full_constraint}")
condorq_data = condor_q(
constraint=full_constraint,
schedds={schedd_name: scheddref},

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this only detecting jobs on a single schedd? So one would have to run allocateNodes on multiple machines but jobs submitted on either can use nodes from either allocateNodes?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is the case for this version of the software, please add it to the documentation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes this is detecting jobs for the single schedd on the current machine. So working on/across sdfrome001/2 simultaneously, yes each needs an allocateNodes. They do not interfere at the first level, all necessary glideins will be submitted, but then jobs could run on either. Interference does exist but it is small (all Idle jobs at a given instant are accounted for.) The addition of Tags / Requirements would be the solution.
I'll comment in the docs about the single schedd.

projection=projection,
)
if len(condorq_data) > 0:
print("glideinsFromJobPressure: Fetched")
condorq_bps = condorq_data[schedd_name]
if verbose:
print(len(condorq_bps))
print(condorq_bps)
# disassemble the dictionary of dictionaries
for jid in list(condorq_bps.keys()):
job = condorq_bps[jid]
thisCores = job["RequestCpus"]
thisMemory = job["RequestMemory"]
totalCores = totalCores + thisCores
if verbose:
print(
f"glideinsFromJobPressure: The key in the dictionary is {jid}"
)
print(f"\tRequestCpus {thisCores}")
print(f"\tCurrent value of totalCores {totalCores}")
thisRatio = thisMemory / ratioMemCore
if thisRatio > thisCores:
if verbose:
print("\t\tNeed to Add More:")
print(f"\t\tRequestMemory is {thisMemory} ")
print(f"\t\tRatio to 4 GB is {thisRatio} ")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change the "4" to "{ratioMemCore}"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep.

totalCores = totalCores + (thisRatio - thisCores)
if verbose:
print(f"\t\tCurrent value of totalCores {totalCores}")

else:
print("Length Zero")
print(len(condorq_data))
except Exception:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why doing such a broad catch of exceptions? And then do nothing (not even printing the exception for later debugging).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added something like
except Exception as exc:
raise type(exc)("Problem querying condor schedd for jobs") from None

print("Exception")

print(f"glideinsFromJobPressure: The final TotalCores is {totalCores}")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is TotalCores? Not in verbose so, perhaps something like "total number of cores needed for submitted jobs"?

numberOfGlideins = math.ceil(totalCores / coresPerGlidein)
print(
f"glideinsFromJobPressure: Target # Glideins for Idle Jobs is {numberOfGlideins}"
)

# Check Slurm queue Running glideins
jobname = "".join(["glide_", auser])

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why join instead of an f-string?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some of the joins are code that Steve wrote a decade ago, but I'll use f-string for the new stuff.

existingGlideinsRunning = 0
batcmd = "".join(["squeue --noheader --states=R --name=", jobname, " | wc -l"])

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again question about the join vs f-string. Assume same question for all the rest of the joins where there isn't an unknown number of values in a list being turned into a string.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Converted to f-string.

print("The squeue command is: %s " % batcmd)
resultR = subprocess.check_output(batcmd, shell=True)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the command fails, I think this will throw an exception. Just wondering why these aren't caught like the ones in the condor section.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll add

try:
subprocess.check_output(...)
except subprocess.CalledProcessError as e:
print(e.output)

existingGlideinsRunning = int(resultR.decode("UTF-8"))

# Check Slurm queue Idle Glideins
existingGlideinsIdle = 0
batcmd = "".join(["squeue --noheader --states=PD --name=", jobname, " | wc -l"])
print("The squeue command is: %s " % batcmd)
resultPD = subprocess.check_output(batcmd, shell=True)
existingGlideinsIdle = int(resultPD.decode("UTF-8"))

print(
f"glideinsFromJobPressure: existingGlideinsRunning {existingGlideinsRunning}"
)
print(f"glideinsFromJobPressure: existingGlideinsIdle {existingGlideinsIdle}")
numberOfGlideinsRed = numberOfGlideins - existingGlideinsIdle

print(
f"glideinsFromJobPressure: Target # Glideins Max to Submit {numberOfGlideinsRed}"
)

maxIdleGlideins = maxNumberOfGlideins - existingGlideinsRunning
maxSubmitGlideins = maxIdleGlideins - existingGlideinsIdle

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needed to read code here a couple times. maxSubmitGlideins confused me when set to that difference. I would move the non-maxSubmitGlideins print statements above here moving these two differences closer to the if statement below. Then put a comment about capping new glideins based upon a limit of total idle glideins in queue.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I will leave these as is right now, as this is the "working like a glidein factory" part, and it is likely worth another ticket to put the whole procedure into "memory"
and think through the best names for all of these variables, if the current ones are not totally clear. (It's fairly complicated.)


print(f"glideinsFromJobPressure: maxNumberOfGlideins {maxNumberOfGlideins}")
print(
f"glideinsFromJobPressure: existingGlideinsRunning {existingGlideinsRunning}"
)
print(f"glideinsFromJobPressure: maxIdleGlideins {maxIdleGlideins}")
print(f"glideinsFromJobPressure: existingGlideinsIdle {existingGlideinsIdle}")
print(f"glideinsFromJobPressure: maxSubmitGlideins {maxSubmitGlideins}")

if numberOfGlideinsRed > maxSubmitGlideins:
numberOfGlideinsRed = maxSubmitGlideins

print(
f"glideinsFromJobPressure: The number of Glideins to submit now is {numberOfGlideinsRed}"
)
return numberOfGlideinsRed
1 change: 1 addition & 0 deletions ups/ctrl_execute.table
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
setupRequired(pex_config)
setupRequired(utils)
setupRequired(ctrl_bps_htcondor)

envPrepend(PYTHONPATH, ${PRODUCT_DIR}/python)
envPrepend(PATH, ${PRODUCT_DIR}/bin)