Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-42530: changes for automatic Large Glideins #25

Merged
merged 11 commits into from
Feb 17, 2024
7 changes: 7 additions & 0 deletions python/lsst/ctrl/execute/allocator.py
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,13 @@ def getCPUs(self):
"""
return self.getParameter("CPUS")

def getAutoCPUs(self):
"""Size of standard glideins for allocateNodes auto
@return the value of autoCPUs
MichelleGower marked this conversation as resolved.
Show resolved Hide resolved
"""
autoCPUs = 16
return autoCPUs

def getWallClock(self):
"""Accessor for WALL_CLOCK
@return the value of WALL_CLOCK
Expand Down
4 changes: 2 additions & 2 deletions python/lsst/ctrl/execute/allocatorParser.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,11 @@ def parseArgs(self, basename):
"-c",
"--cpus",
action="store",
default=None,
default=16,
dest="cpus",
help="cores / cpus per glidein",
type=int,
required=True,
required=False,
)
parser.add_argument(
"-a",
Expand Down
198 changes: 189 additions & 9 deletions python/lsst/ctrl/execute/slurmPlugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,26 +76,25 @@ def submit(self, platform, platformPkgDir):
"The working local scratch directory localScratchDir is %s "
% localScratchDir
)

print("The generated Slurm submit file is %s " % generatedSlurmFile)

cmd = "sbatch --mem %s %s" % (totalMemory, generatedSlurmFile)

auser = self.getUserName()
jobname = "".join(["glide_", auser])
jobname = f"glide_{auser}"
if verbose:
print("The unix user name is %s " % auser)
print("The Slurm job name for the glidein jobs is %s " % jobname)
print("The user home directory is %s " % self.getUserHome())

if auto:
self.largeGlideinsFromJobPressure(generatedSlurmFile)
MichelleGower marked this conversation as resolved.
Show resolved Hide resolved
numberToAdd = self.glideinsFromJobPressure()
print("The number of glidein jobs to submit now is %s" % numberToAdd)
else:
nodes = self.getNodes()
# In this case 'nodes' is the Target.
print("Targeting %s glidein(s) for the computing pool/set." % nodes)

batcmd = "".join(["squeue --noheader --name=", jobname, " | wc -l"])
print("The squeue command is: %s " % batcmd)
try:
Expand Down Expand Up @@ -188,6 +187,180 @@ def createAllocationFile(self, input):
os.chmod(outfile, 0o755)
return outfile

def largeGlideinsFromJobPressure(self, generatedSlurmFile):
"""Calculate the number of glideins needed from job pressure

Returns
-------
number : `str`
The number of glideins
"""

import hashlib
MichelleGower marked this conversation as resolved.
Show resolved Hide resolved
import socket
import time

import htcondor
from lsst.ctrl.bps.htcondor import condor_q

verbose = self.isVerbose()

autoCPUs = self.getAutoCPUs()
memoryPerCore = self.getMemoryPerCore()
auser = self.getUserName()

try:
schedd_name = socket.getfqdn()
coll = htcondor.Collector()
schedd_ad = coll.locate(htcondor.DaemonTypes.Schedd)
scheddref = htcondor.Schedd(schedd_ad)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To my understanding these "preparations" (lines 221 - 224) are made only to call condor_q() later in line 260. However, it looks like the code is concerned with identifying the local Schedd only. As a result, it is a bit redundant as condor_q() can do it for you if you omit the (optional) schedds argument.


# Query for jobs that have (bps_run isnt Undefined)
# (bps_job_label isnt Undefined)
# and amongst those identify Large jobs
# thisCores > 16
# thisMemory > 16*4096
#
# projection contains the job classads to be returned.
# These include the cpu and memory profile of each job,
# in the form of RequestCpus and RequestMemory
projection = [
"bps_run",
"bps_job_label",
"JobStatus",
"Owner",
"RequestCpus",
"JobUniverse",
"RequestMemory",
]
owner = f'(Owner=="{auser}") '
jstat = "&& (JobStatus==1) "
MichelleGower marked this conversation as resolved.
Show resolved Hide resolved
bps1 = "&& (bps_run isnt Undefined) "

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Took me a while to find "isnt" in the documentation. In case someone else needs the link: https://htcondor.readthedocs.io/en/latest/classads/classad-mechanism.html#expression-operators

bps2 = "&& (bps_job_label isnt Undefined) "
juniv = "&& (JobUniverse==5)"
# The constraint determines that the jobs to be returned belong to
# the current user (Owner) and are Idle vanilla universe jobs.
full_constraint = f"{owner}{jstat}{bps1}{bps2}{juniv}"
if verbose:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In some future cleanup ticket, may want to look into python logging.

print("Find Large BPS Jobs:")
MichelleGower marked this conversation as resolved.
Show resolved Hide resolved
print(f"full_constraint {full_constraint}")
condorq_data = condor_q(
constraint=full_constraint,
schedds={schedd_name: scheddref},
projection=projection,
)

# Dictionaries of results need to be disassembled

if len(condorq_data) == 0:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Empty Python containers are false, so the preferred way to check if condor_data contains any elements would be

if not condor_data:

print("No Large BPS Jobs.")

if len(condorq_data) > 0:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this check is necessary (or alternatively the previous one in line 266). If condor_data contains no elements we will never reach this point because of the return statement in line 268 and containers size can't be a negative number.

#
# Gather the Large jobs
#
condorq_bps_large = {}
if verbose:
print("Large jobs exist, have been queried/fetched.")
condorq_bps = condorq_data[schedd_name]
for jid in list(condorq_bps.keys()):
job = condorq_bps[jid]
thisCores = job["RequestCpus"]
thisMemory = job["RequestMemory"]
if thisCores > autoCPUs or thisMemory > autoCPUs * memoryPerCore:
condorq_bps_large[jid] = job

#
# Collect a list of the labels
#
job_labels = []
if verbose:
print("Loop over list of Large Jobs")
for jid in list(condorq_bps_large.keys()):
MichelleGower marked this conversation as resolved.
Show resolved Hide resolved
ajob = condorq_bps_large[jid]
if verbose:
print(jid)
print(ajob["bps_job_label"])
job_labels.append(ajob["bps_job_label"])

#
# Get a list of the unique labels
#
unique_labels = set(job_labels)
MichelleGower marked this conversation as resolved.
Show resolved Hide resolved

#
# Make a jobs dictionary with the unique labels as keys
#
if verbose:
print("Loop over unique job label list")
label_dict = {}
for job_label in unique_labels:
hash = hashlib.sha1(job_label.encode("UTF-8")).hexdigest()
shash = hash[:6]
MichelleGower marked this conversation as resolved.
Show resolved Hide resolved
empty_list = []
label_dict[job_label] = empty_list
MichelleGower marked this conversation as resolved.
Show resolved Hide resolved

# Loop over the Large jobs and categorize
for jid in list(condorq_bps_large.keys()):
ajob = condorq_bps_large[jid]
this_label = ajob["bps_job_label"]
this_list = label_dict[this_label]
this_list.append(ajob)
MichelleGower marked this conversation as resolved.
Show resolved Hide resolved

for job_label in unique_labels:
if verbose:
print(f"\n{job_label}")
existingGlideinsIdle = 0
numberOfGlideinsRed = 0
MichelleGower marked this conversation as resolved.
Show resolved Hide resolved
numberOfGlideins = 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure why we need to set existingGlideinsIdle, numberOfGlideinsReduced, and numberOfGlideins to zero here. Each of these variables seems to be initialized with a different value later (see lines 320, 334, and 339, respectively).

alist = label_dict[job_label]
thisMemory = alist[0]["RequestMemory"]
useCores = alist[0]["RequestCpus"]
if useCores < autoCPUs:
useCores = autoCPUs
MichelleGower marked this conversation as resolved.
Show resolved Hide resolved
hash = hashlib.sha1(job_label.encode("UTF-8")).hexdigest()
shash = hash[:6]
numberOfGlideins = len(alist)
jobname = f"{auser}_{shash}"
print(f"{job_label} {jobname} target {numberOfGlideins}")

# Do not submit squeue commands rapidly
time.sleep(2)
# Check Slurm queue Idle Glideins
batcmd = f"squeue --noheader --states=PD --name={jobname} | wc -l"
if verbose:
print("The squeue command is: %s " % batcmd)
try:
resultPD = subprocess.check_output(batcmd, shell=True)
except subprocess.CalledProcessError as e:
print(e.output)
existingGlideinsIdle = int(resultPD.decode("UTF-8"))
if verbose:
print(f"existingGlideinsIdle {jobname}")
print(existingGlideinsIdle)

numberOfGlideinsRed = numberOfGlideins - existingGlideinsIdle
if verbose:
print(f"{job_label} {jobname} reduced {numberOfGlideinsRed}")

cpuopt = f"--cpus-per-task {useCores}"
memopt = f"--mem {thisMemory}"
jobopt = f"-J {jobname}"
cmd = f"sbatch {cpuopt} {memopt} {jobopt} {generatedSlurmFile}"
if verbose:
print(cmd)
MichelleGower marked this conversation as resolved.
Show resolved Hide resolved
for glide in range(0, numberOfGlideinsRed):
print("Submitting Large glidein %s " % glide)
exitCode = self.runCommand(cmd, verbose)
if exitCode != 0:
print("error running %s" % cmd)
sys.exit(exitCode)
MichelleGower marked this conversation as resolved.
Show resolved Hide resolved

except Exception as exc:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This exception for problems querying condor schedd has a ton of code in it post that query code. I would move this except closer to the condor_q query and if wanting to catch other exceptions add more try/except blocks with more related messages.

raise type(exc)("Problem querying condor schedd for jobs") from None

return

def glideinsFromJobPressure(self):
"""Calculate the number of glideins needed from job pressure

Expand All @@ -206,8 +379,8 @@ def glideinsFromJobPressure(self):
verbose = self.isVerbose()

maxNumberOfGlideins = self.getNodes()
coresPerGlidein = self.getCPUs()
ratioMemCore = self.getMemoryPerCore()
autoCPUs = self.getAutoCPUs()
memoryPerCore = self.getMemoryPerCore()
auser = self.getUserName()

# initialize counters
Expand Down Expand Up @@ -235,6 +408,7 @@ def glideinsFromJobPressure(self):
# the current user (Owner) and are Idle vanilla universe jobs.
full_constraint = f"{owner}{jstat}{juniv}"
if verbose:
print("\nQuerying condor queue for standard jobs")
print(f"full_constraint {full_constraint}")
condorq_data = condor_q(
constraint=full_constraint,
Expand All @@ -246,25 +420,31 @@ def glideinsFromJobPressure(self):
condorq_bps = condorq_data[schedd_name]
if verbose:
print(len(condorq_bps))
print(condorq_bps)
# This can be extremely large
# print(condorq_bps)
# disassemble the dictionary of dictionaries
for jid in list(condorq_bps.keys()):
job = condorq_bps[jid]
thisCores = job["RequestCpus"]
thisMemory = job["RequestMemory"]
if thisCores > autoCPUs or thisMemory > autoCPUs * memoryPerCore:
MichelleGower marked this conversation as resolved.
Show resolved Hide resolved
if verbose:
print("Skipping large job")
print(jid)
continue
totalCores = totalCores + thisCores
if verbose:
print(
f"glideinsFromJobPressure: The key in the dictionary is {jid}"
)
print(f"\tRequestCpus {thisCores}")
print(f"\tCurrent value of totalCores {totalCores}")
thisRatio = thisMemory / ratioMemCore
thisRatio = thisMemory / memoryPerCore
if thisRatio > thisCores:
if verbose:
print("\t\tNeed to Add More:")
print(f"\t\tRequestMemory is {thisMemory} ")
print(f"\t\tRatio to {ratioMemCore} MB is {thisRatio} ")
print(f"\t\tRatio to {memoryPerCore} MB is {thisRatio} ")
totalCores = totalCores + (thisRatio - thisCores)
if verbose:
print(f"\t\tCurrent value of totalCores {totalCores}")
Expand All @@ -276,7 +456,7 @@ def glideinsFromJobPressure(self):
raise type(exc)("Problem querying condor schedd for jobs") from None

print(f"glideinsFromJobPressure: The final TotalCores is {totalCores}")
numberOfGlideins = math.ceil(totalCores / coresPerGlidein)
numberOfGlideins = math.ceil(totalCores / autoCPUs)
print(
f"glideinsFromJobPressure: Target # Glideins for Idle Jobs is {numberOfGlideins}"
)
Expand Down
Loading