diff --git a/python/lsst/ctrl/execute/allocator.py b/python/lsst/ctrl/execute/allocator.py index 871b356..1a04c06 100644 --- a/python/lsst/ctrl/execute/allocator.py +++ b/python/lsst/ctrl/execute/allocator.py @@ -100,6 +100,8 @@ def __init__(self, platform, opts, configuration, condorInfoFileName): self.commandLineDefaults["WALL_CLOCK"] = self.opts.maximumWallClock self.commandLineDefaults["ACCOUNT"] = self.opts.account self.commandLineDefaults["MEMPERCORE"] = 4096 + self.commandLineDefaults["ALLOWEDAUTO"] = 500 + self.commandLineDefaults["AUTOCPUS"] = 16 self.commandLineDefaults["QUEUE"] = self.opts.queue self.load() @@ -325,6 +327,12 @@ def getMemoryPerCore(self): """ return self.getParameter("MEMPERCORE") + def getAllowedAutoGlideins(self): + """Accessor for AllowedAutoGlideins + @return the value of AllowedAuto + """ + return self.getParameter("ALLOWEDAUTO") + def getQOS(self): """Accessor for QOS @return the value of QOS @@ -337,6 +345,12 @@ def getCPUs(self): """ return self.getParameter("CPUS") + def getAutoCPUs(self): + """Size of standard glideins for allocateNodes auto + @return the value of autoCPUs + """ + return self.getParameter("AUTOCPUS") + def getWallClock(self): """Accessor for WALL_CLOCK @return the value of WALL_CLOCK diff --git a/python/lsst/ctrl/execute/allocatorParser.py b/python/lsst/ctrl/execute/allocatorParser.py index 7c7ace9..a6c949a 100644 --- a/python/lsst/ctrl/execute/allocatorParser.py +++ b/python/lsst/ctrl/execute/allocatorParser.py @@ -81,11 +81,11 @@ def parseArgs(self, basename): "-c", "--cpus", action="store", - default=None, + default=16, dest="cpus", help="cores / cpus per glidein", type=int, - required=True, + required=False, ) parser.add_argument( "-a", diff --git a/python/lsst/ctrl/execute/slurmPlugin.py b/python/lsst/ctrl/execute/slurmPlugin.py index 9d4dba6..524773d 100644 --- a/python/lsst/ctrl/execute/slurmPlugin.py +++ b/python/lsst/ctrl/execute/slurmPlugin.py @@ -22,11 +22,16 @@ # see . # +import hashlib +import math import os import subprocess import sys +import time from string import Template +import htcondor +from lsst.ctrl.bps.htcondor import condor_q from lsst.ctrl.execute.allocator import Allocator @@ -76,26 +81,40 @@ def submit(self, platform, platformPkgDir): "The working local scratch directory localScratchDir is %s " % localScratchDir ) - print("The generated Slurm submit file is %s " % generatedSlurmFile) cmd = "sbatch --mem %s %s" % (totalMemory, generatedSlurmFile) auser = self.getUserName() - jobname = "".join(["glide_", auser]) + jobname = f"glide_{auser}" if verbose: print("The unix user name is %s " % auser) print("The Slurm job name for the glidein jobs is %s " % jobname) print("The user home directory is %s " % self.getUserHome()) if auto: - numberToAdd = self.glideinsFromJobPressure() - print("The number of glidein jobs to submit now is %s" % numberToAdd) + self.largeGlideinsFromJobPressure(generatedSlurmFile) + self.smallGlideinsFromJobPressure(generatedSlurmFile) else: nodes = self.getNodes() # In this case 'nodes' is the Target. - print("Targeting %s glidein(s) for the computing pool/set." % nodes) + # Limit number of cores to be <= 8000 which 500 16-core glideins + # allowed auto glideins is 500 + allowedAutoGlideins = self.getAllowedAutoGlideins() + # auto glidein size is 16 + autoSize = self.getAutoCPUs() + targetedCores = nodes * cpus + coreLimit = allowedAutoGlideins * autoSize + if targetedCores > coreLimit: + # Reduce number of nodes because of threshold + nodes = int(coreLimit / cpus) + print("Reducing number of glideins because of core limit threshold") + print(f"coreLimit {coreLimit}") + print(f"glidein size {cpus}") + print(f"New number of glideins {nodes}") + + print("Targeting %s glidein(s) for the computing pool/set." % nodes) batcmd = "".join(["squeue --noheader --name=", jobname, " | wc -l"]) print("The squeue command is: %s " % batcmd) try: @@ -109,12 +128,12 @@ def submit(self, platform, platformPkgDir): numberToAdd = nodes - int(strResult) print("The number of glidein jobs to submit now is %s" % numberToAdd) - for glide in range(0, numberToAdd): - print("Submitting glidein %s " % glide) - exitCode = self.runCommand(cmd, verbose) - if exitCode != 0: - print("error running %s" % cmd) - sys.exit(exitCode) + for glide in range(0, numberToAdd): + print("Submitting glidein %s " % glide) + exitCode = self.runCommand(cmd, verbose) + if exitCode != 0: + print("error running %s" % cmd) + sys.exit(exitCode) def loadSlurm(self, name, platformPkgDir): if self.opts.reservation is not None: @@ -188,36 +207,170 @@ def createAllocationFile(self, input): os.chmod(outfile, 0o755) return outfile - def glideinsFromJobPressure(self): - """Calculate the number of glideins needed from job pressure + def largeGlideinsFromJobPressure(self, generatedSlurmFile): + """Determine and submit the large glideins needed from job pressure""" - Returns - ------- - number : `str` - The number of glideins - """ + verbose = self.isVerbose() + autoCPUs = self.getAutoCPUs() + memoryPerCore = self.getMemoryPerCore() + memoryLimit = autoCPUs * memoryPerCore + auser = self.getUserName() - import math - import socket + try: + # Query for jobs that have (bps_run isnt Undefined) + # (bps_job_label isnt Undefined) + # and amongst those identify Large jobs + # thisCores > 16 + # thisMemory > 16*4096 + # + # projection contains the job classads to be returned. + # These include the cpu and memory profile of each job, + # in the form of RequestCpus and RequestMemory + projection = [ + "bps_run", + "bps_job_label", + "JobStatus", + "Owner", + "RequestCpus", + "JobUniverse", + "RequestMemory", + ] + owner = f'(Owner=="{auser}")' + # query for idle jobs + jstat = f"(JobStatus=={htcondor.JobStatus.IDLE})" + bps1 = "(bps_run isnt Undefined)" + bps2 = "(bps_job_label isnt Undefined)" + # query for vanilla universe + # JobUniverse constants are in htcondor C++ + # UNIVERSE = { 1: "Standard", ..., 5: "Vanilla", ... } + juniv = "(JobUniverse==5)" + large = f"(RequestMemory>{memoryLimit} || RequestCpus>{autoCPUs})" + # The constraint determines that the jobs to be returned belong to + # the current user (Owner) and are Idle vanilla universe jobs. + full_constraint = ( + f"{owner} && {jstat} && {bps1} && {bps2} && {juniv} && {large}" + ) + if verbose: + print("Find Large BPS Jobs:") + print(f"full_constraint {full_constraint}") + condorq_data = condor_q( + constraint=full_constraint, + projection=projection, + ) - import htcondor - from lsst.ctrl.bps.htcondor import condor_q + if not condorq_data: + print("Auto: No Large BPS Jobs.") + return - verbose = self.isVerbose() + # Collect a list of the labels + schedd_name = list(condorq_data.keys())[0] + condorq_bps_large = condorq_data[schedd_name] + job_labels = [] + if verbose: + print("Loop over list of Large Jobs") + for jid in list(condorq_bps_large.keys()): + ajob = condorq_bps_large[jid] + if verbose: + print(jid) + print(ajob["bps_job_label"]) + job_labels.append(ajob["bps_job_label"]) + + # + # Get a list of the unique labels + # + unique_labels = set(job_labels) + + # + # Make a jobs dictionary with the unique labels as keys + # + if verbose: + print("Loop over unique job label list") + label_dict = {} + for job_label in unique_labels: + empty_list = [] + label_dict[job_label] = empty_list + + # Loop over the Large jobs and categorize + for jid in list(condorq_bps_large.keys()): + ajob = condorq_bps_large[jid] + this_label = ajob["bps_job_label"] + this_list = label_dict[this_label] + this_list.append(ajob) + + for job_label in unique_labels: + if verbose: + print(f"\n{job_label}") + alist = label_dict[job_label] + thisMemory = alist[0]["RequestMemory"] + useCores = alist[0]["RequestCpus"] + if useCores < autoCPUs: + useCores = autoCPUs + hash = hashlib.sha1(job_label.encode("UTF-8")).hexdigest() + shash = hash[:6] + numberOfGlideins = len(alist) + jobname = f"{auser}_{shash}" + print(f"{job_label} {jobname} target {numberOfGlideins}") + + # Do not submit squeue commands rapidly + time.sleep(2) + # Check Slurm queue Idle Glideins + batcmd = f"squeue --noheader --states=PD --name={jobname} | wc -l" + if verbose: + print("The squeue command is: %s " % batcmd) + try: + resultPD = subprocess.check_output(batcmd, shell=True) + except subprocess.CalledProcessError as e: + print(e.output) + existingGlideinsIdle = int(resultPD.decode("UTF-8")) + if verbose: + print(f"existingGlideinsIdle {jobname}") + print(existingGlideinsIdle) + + numberOfGlideinsReduced = numberOfGlideins - existingGlideinsIdle + if verbose: + print(f"{job_label} reduced {numberOfGlideinsReduced}") + + cpuopt = f"--cpus-per-task {useCores}" + memopt = f"--mem {thisMemory}" + jobopt = f"-J {jobname}" + cmd = f"sbatch {cpuopt} {memopt} {jobopt} {generatedSlurmFile}" + if verbose: + print(cmd) + for glide in range(0, numberOfGlideinsReduced): + print("Submitting Large glidein %s " % glide) + exitCode = self.runCommand(cmd, verbose) + if exitCode != 0: + print("error running %s" % cmd) + sys.exit(exitCode) + + except Exception as exc: + raise type(exc)("Problem querying condor schedd for jobs") from None + + return + + def smallGlideinsFromJobPressure(self, generatedSlurmFile): + """Determine and submit the small glideins needed from job pressure""" + verbose = self.isVerbose() maxNumberOfGlideins = self.getNodes() - coresPerGlidein = self.getCPUs() - ratioMemCore = self.getMemoryPerCore() + maxAllowedNumberOfGlideins = self.getAllowedAutoGlideins() + if verbose: + print(f"maxNumberOfGlideins {maxNumberOfGlideins}") + print(f"maxAllowedNumberOfGlideins {maxAllowedNumberOfGlideins}") + # The number of cores for the small glideins is capped at 8000 + # Corresponds to maxAllowedNumberOfGlideins = 500 16-core glideins + if maxNumberOfGlideins > maxAllowedNumberOfGlideins: + maxNumberOfGlideins = maxAllowedNumberOfGlideins + print("Reducing Small Glidein limit due to threshold.") + autoCPUs = self.getAutoCPUs() + memoryPerCore = self.getMemoryPerCore() + memoryLimit = autoCPUs * memoryPerCore auser = self.getUserName() # initialize counters totalCores = 0 try: - schedd_name = socket.getfqdn() - coll = htcondor.Collector() - schedd_ad = coll.locate(htcondor.DaemonTypes.Schedd) - scheddref = htcondor.Schedd(schedd_ad) # projection contains the job classads to be returned. # These include the cpu and memory profile of each job, # in the form of RequestCpus and RequestMemory @@ -228,58 +381,66 @@ def glideinsFromJobPressure(self): "JobUniverse", "RequestMemory", ] - owner = f'(Owner=="{auser}") ' - jstat = "&& (JobStatus==1) " - juniv = "&& (JobUniverse==5)" + owner = f'(Owner=="{auser}")' + # query for idle jobs + jstat = f"(JobStatus=={htcondor.JobStatus.IDLE})" + # query for vanilla universe + # JobUniverse constants are in htcondor C++ + # UNIVERSE = { 1: "Standard", ..., 5: "Vanilla", ... } + juniv = "(JobUniverse==5)" + small = f"(RequestMemory<={memoryLimit} && RequestCpus<={autoCPUs})" # The constraint determines that the jobs to be returned belong to # the current user (Owner) and are Idle vanilla universe jobs. - full_constraint = f"{owner}{jstat}{juniv}" + full_constraint = f"{owner} && {jstat} && {juniv} && {small}" if verbose: + print("\nQuerying condor queue for standard jobs") print(f"full_constraint {full_constraint}") condorq_data = condor_q( constraint=full_constraint, - schedds={schedd_name: scheddref}, projection=projection, ) - if len(condorq_data) > 0: - print("glideinsFromJobPressure: Fetched") - condorq_bps = condorq_data[schedd_name] + + if not condorq_data: + print("Auto: No small htcondor jobs detected.") + return + + print("smallGlideins: Jobs Fetched") + + schedd_name = list(condorq_data.keys())[0] + condorq_bps = condorq_data[schedd_name] + if verbose: + print(len(condorq_bps)) + # This can be extremely large + # print(condorq_bps) + # disassemble the dictionary of dictionaries + for jid in list(condorq_bps.keys()): + job = condorq_bps[jid] + thisCores = job["RequestCpus"] + thisMemory = job["RequestMemory"] + totalCores = totalCores + thisCores if verbose: - print(len(condorq_bps)) - print(condorq_bps) - # disassemble the dictionary of dictionaries - for jid in list(condorq_bps.keys()): - job = condorq_bps[jid] - thisCores = job["RequestCpus"] - thisMemory = job["RequestMemory"] - totalCores = totalCores + thisCores + print(f"smallGlideins: The key in the dictionary is {jid}") + print(f"\tRequestCpus {thisCores}") + print(f"\tCurrent value of totalCores {totalCores}") + thisRatio = thisMemory / memoryPerCore + if thisRatio > thisCores: if verbose: - print( - f"glideinsFromJobPressure: The key in the dictionary is {jid}" - ) - print(f"\tRequestCpus {thisCores}") - print(f"\tCurrent value of totalCores {totalCores}") - thisRatio = thisMemory / ratioMemCore - if thisRatio > thisCores: - if verbose: - print("\t\tNeed to Add More:") - print(f"\t\tRequestMemory is {thisMemory} ") - print(f"\t\tRatio to {ratioMemCore} MB is {thisRatio} ") - totalCores = totalCores + (thisRatio - thisCores) - if verbose: - print(f"\t\tCurrent value of totalCores {totalCores}") - - else: - print("Length Zero") - print(len(condorq_data)) + print("\t\tNeed to Add More:") + print(f"\t\tRequestMemory is {thisMemory} ") + print(f"\t\tRatio to {memoryPerCore} MB is {thisRatio} ") + totalCores = totalCores + (thisRatio - thisCores) + if verbose: + print(f"\t\tCurrent value of totalCores {totalCores}") + except Exception as exc: raise type(exc)("Problem querying condor schedd for jobs") from None - print(f"glideinsFromJobPressure: The final TotalCores is {totalCores}") - numberOfGlideins = math.ceil(totalCores / coresPerGlidein) - print( - f"glideinsFromJobPressure: Target # Glideins for Idle Jobs is {numberOfGlideins}" - ) + print(f"smallGlideins: The final TotalCores is {totalCores}") + + # The number of Glideins needed to service the detected Idle jobs + # is "numberOfGlideins" + numberOfGlideins = math.ceil(totalCores / autoCPUs) + print(f"smallGlideins: Number for detected jobs is {numberOfGlideins}") # Check Slurm queue Running glideins jobname = f"glide_{auser}" @@ -302,31 +463,41 @@ def glideinsFromJobPressure(self): print(e.output) existingGlideinsIdle = int(resultPD.decode("UTF-8")) + print(f"smallGlideins: existingGlideinsRunning {existingGlideinsRunning}") + print(f"smallGlideins: existingGlideinsIdle {existingGlideinsIdle}") + + # The number of Glideins needed to service the detected + # Idle jobs is "numberOfGlideins" less the existing Idle glideins + numberOfGlideinsReduced = numberOfGlideins - existingGlideinsIdle + print(f"smallGlideins: Target Number to submit {numberOfGlideinsReduced}") + + # The maximum number of Glideins that we can submit with + # the imposed threshold (maxNumberOfGlideins) + # is maxSubmitGlideins + existingGlideins = existingGlideinsRunning + existingGlideinsIdle + maxSubmitGlideins = maxNumberOfGlideins - existingGlideins + print(f"smallGlideins: maxNumberOfGlideins {maxNumberOfGlideins}") + print(f"smallGlideins: maxSubmitGlideins {maxSubmitGlideins}") + + # Reduce the number of Glideins to submit if threshold exceeded + if numberOfGlideinsReduced > maxSubmitGlideins: + numberOfGlideinsReduced = maxSubmitGlideins + print("smallGlideins: Reducing due to threshold.") print( - f"glideinsFromJobPressure: existingGlideinsRunning {existingGlideinsRunning}" - ) - print(f"glideinsFromJobPressure: existingGlideinsIdle {existingGlideinsIdle}") - numberOfGlideinsRed = numberOfGlideins - existingGlideinsIdle - - print( - f"glideinsFromJobPressure: Target # Glideins Max to Submit {numberOfGlideinsRed}" - ) - - maxIdleGlideins = maxNumberOfGlideins - existingGlideinsRunning - maxSubmitGlideins = maxIdleGlideins - existingGlideinsIdle - - print(f"glideinsFromJobPressure: maxNumberOfGlideins {maxNumberOfGlideins}") - print( - f"glideinsFromJobPressure: existingGlideinsRunning {existingGlideinsRunning}" + f"smallGlideins: Number of Glideins to submit is {numberOfGlideinsReduced}" ) - print(f"glideinsFromJobPressure: maxIdleGlideins {maxIdleGlideins}") - print(f"glideinsFromJobPressure: existingGlideinsIdle {existingGlideinsIdle}") - print(f"glideinsFromJobPressure: maxSubmitGlideins {maxSubmitGlideins}") - if numberOfGlideinsRed > maxSubmitGlideins: - numberOfGlideinsRed = maxSubmitGlideins + cpuopt = f"--cpus-per-task {autoCPUs}" + memopt = f"--mem {memoryLimit}" + jobopt = f"-J {jobname}" + cmd = f"sbatch {cpuopt} {memopt} {jobopt} {generatedSlurmFile}" + if verbose: + print(cmd) + for glide in range(0, numberOfGlideinsReduced): + print("Submitting glidein %s " % glide) + exitCode = self.runCommand(cmd, verbose) + if exitCode != 0: + print("error running %s" % cmd) + sys.exit(exitCode) - print( - f"glideinsFromJobPressure: The number of Glideins to submit now is {numberOfGlideinsRed}" - ) - return numberOfGlideinsRed + return