From b9474ab87afa1f17e42c5f6969701ebd976385c2 Mon Sep 17 00:00:00 2001 From: Greg Daues Date: Wed, 27 Mar 2024 06:41:01 -0700 Subject: [PATCH 1/4] changes to evaluate RequestMemory expressions --- python/lsst/ctrl/execute/slurmPlugin.py | 376 ++++++++++-------------- 1 file changed, 158 insertions(+), 218 deletions(-) diff --git a/python/lsst/ctrl/execute/slurmPlugin.py b/python/lsst/ctrl/execute/slurmPlugin.py index 524773d..e69e28b 100644 --- a/python/lsst/ctrl/execute/slurmPlugin.py +++ b/python/lsst/ctrl/execute/slurmPlugin.py @@ -93,8 +93,7 @@ def submit(self, platform, platformPkgDir): print("The user home directory is %s " % self.getUserHome()) if auto: - self.largeGlideinsFromJobPressure(generatedSlurmFile) - self.smallGlideinsFromJobPressure(generatedSlurmFile) + self.glideinsFromJobPressure(generatedSlurmFile) else: nodes = self.getNodes() # In this case 'nodes' is the Target. @@ -207,8 +206,8 @@ def createAllocationFile(self, input): os.chmod(outfile, 0o755) return outfile - def largeGlideinsFromJobPressure(self, generatedSlurmFile): - """Determine and submit the large glideins needed from job pressure""" + def glideinsFromJobPressure(self, generatedSlurmFile): + """Determine and submit the glideins needed from job pressure""" verbose = self.isVerbose() autoCPUs = self.getAutoCPUs() @@ -217,18 +216,12 @@ def largeGlideinsFromJobPressure(self, generatedSlurmFile): auser = self.getUserName() try: - # Query for jobs that have (bps_run isnt Undefined) - # (bps_job_label isnt Undefined) - # and amongst those identify Large jobs - # thisCores > 16 - # thisMemory > 16*4096 - # # projection contains the job classads to be returned. # These include the cpu and memory profile of each job, # in the form of RequestCpus and RequestMemory projection = [ - "bps_run", - "bps_job_label", + "ClusterId", + "ProcId", "JobStatus", "Owner", "RequestCpus", @@ -238,188 +231,122 @@ def largeGlideinsFromJobPressure(self, generatedSlurmFile): owner = f'(Owner=="{auser}")' # query for idle jobs jstat = f"(JobStatus=={htcondor.JobStatus.IDLE})" - bps1 = "(bps_run isnt Undefined)" - bps2 = "(bps_job_label isnt Undefined)" # query for vanilla universe # JobUniverse constants are in htcondor C++ # UNIVERSE = { 1: "Standard", ..., 5: "Vanilla", ... } juniv = "(JobUniverse==5)" - large = f"(RequestMemory>{memoryLimit} || RequestCpus>{autoCPUs})" + # The constraint determines that the jobs to be returned belong to # the current user (Owner) and are Idle vanilla universe jobs. full_constraint = ( - f"{owner} && {jstat} && {bps1} && {bps2} && {juniv} && {large}" + f"{owner} && {jstat} && {juniv}" ) + print("Auto: Query for htcondor jobs.") if verbose: - print("Find Large BPS Jobs:") print(f"full_constraint {full_constraint}") condorq_data = condor_q( constraint=full_constraint, projection=projection, ) - if not condorq_data: - print("Auto: No Large BPS Jobs.") - return + except Exception as exc: + raise type(exc)("Problem querying condor schedd for jobs") from None + + if not condorq_data: + print("Auto: No HTCondor Jobs detected.") + return - # Collect a list of the labels - schedd_name = list(condorq_data.keys())[0] - condorq_bps_large = condorq_data[schedd_name] - job_labels = [] - if verbose: - print("Loop over list of Large Jobs") - for jid in list(condorq_bps_large.keys()): - ajob = condorq_bps_large[jid] + condorq_large = [] + condorq_small = [] + schedd_name = list(condorq_data.keys())[0] + condorq_full = condorq_data[schedd_name] + + print("Auto: Search for Large htcondor jobs.") + for jid in list(condorq_full.keys()): + ajob = condorq_full[jid] + thisCpus = ajob["RequestCpus"] + if isinstance(ajob["RequestMemory"], int): + thisEvalMemory = ajob["RequestMemory"] + else: + thisEvalMemory = ajob["RequestMemory"].eval() + ajob["RequestMemoryEval"] = thisEvalMemory if verbose: - print(jid) - print(ajob["bps_job_label"]) - job_labels.append(ajob["bps_job_label"]) - - # - # Get a list of the unique labels - # - unique_labels = set(job_labels) - - # - # Make a jobs dictionary with the unique labels as keys - # - if verbose: - print("Loop over unique job label list") - label_dict = {} - for job_label in unique_labels: - empty_list = [] - label_dict[job_label] = empty_list - - # Loop over the Large jobs and categorize - for jid in list(condorq_bps_large.keys()): - ajob = condorq_bps_large[jid] - this_label = ajob["bps_job_label"] - this_list = label_dict[this_label] - this_list.append(ajob) - - for job_label in unique_labels: + print(f"Making an evaluation {thisEvalMemory}") + # Search for jobs that are Large jobs + # thisCpus > 16 or thisEvalMemory > 16*4096 + if thisEvalMemory > memoryLimit or thisCpus > autoCPUs: + print(f"Appending a Large Job {jid}") + condorq_large.append(ajob) + else: + condorq_small.append(ajob) + + if not condorq_large: + print("Auto: no Large jobs detected.") + else: + print("Auto: detected Large jobs") + for ajob in condorq_large: if verbose: - print(f"\n{job_label}") - alist = label_dict[job_label] - thisMemory = alist[0]["RequestMemory"] - useCores = alist[0]["RequestCpus"] + print(f"\n{ajob['ClusterId']}.{ajob['ProcId']}") + print(ajob) + if isinstance(ajob["RequestMemory"], int): + thisMemory = ajob["RequestMemory"] + else: + # Do not eval() a second time, recall the value + thisMemory = ajob["RequestMemoryEval"] + useCores = ajob["RequestCpus"] + clusterid = ajob["ClusterId"] + procid = ajob["ProcId"] + job_label = f"{clusterid}_{procid}_{thisMemory}" if useCores < autoCPUs: useCores = autoCPUs hash = hashlib.sha1(job_label.encode("UTF-8")).hexdigest() shash = hash[:6] - numberOfGlideins = len(alist) jobname = f"{auser}_{shash}" - print(f"{job_label} {jobname} target {numberOfGlideins}") - - # Do not submit squeue commands rapidly - time.sleep(2) - # Check Slurm queue Idle Glideins - batcmd = f"squeue --noheader --states=PD --name={jobname} | wc -l" - if verbose: - print("The squeue command is: %s " % batcmd) - try: - resultPD = subprocess.check_output(batcmd, shell=True) - except subprocess.CalledProcessError as e: - print(e.output) - existingGlideinsIdle = int(resultPD.decode("UTF-8")) - if verbose: - print(f"existingGlideinsIdle {jobname}") - print(existingGlideinsIdle) - - numberOfGlideinsReduced = numberOfGlideins - existingGlideinsIdle - if verbose: - print(f"{job_label} reduced {numberOfGlideinsReduced}") - + print(f"jobname {jobname}") + # Check if Job exists Idle in the queue + numberJobname = self.countIdleSlurmJobs(jobname) + if numberJobname > 0: + print(f"Job {jobname} already exists, do not submit") + continue cpuopt = f"--cpus-per-task {useCores}" memopt = f"--mem {thisMemory}" jobopt = f"-J {jobname}" cmd = f"sbatch {cpuopt} {memopt} {jobopt} {generatedSlurmFile}" if verbose: print(cmd) - for glide in range(0, numberOfGlideinsReduced): - print("Submitting Large glidein %s " % glide) - exitCode = self.runCommand(cmd, verbose) - if exitCode != 0: - print("error running %s" % cmd) - sys.exit(exitCode) - - except Exception as exc: - raise type(exc)("Problem querying condor schedd for jobs") from None - - return - - def smallGlideinsFromJobPressure(self, generatedSlurmFile): - """Determine and submit the small glideins needed from job pressure""" - - verbose = self.isVerbose() - maxNumberOfGlideins = self.getNodes() - maxAllowedNumberOfGlideins = self.getAllowedAutoGlideins() - if verbose: - print(f"maxNumberOfGlideins {maxNumberOfGlideins}") - print(f"maxAllowedNumberOfGlideins {maxAllowedNumberOfGlideins}") - # The number of cores for the small glideins is capped at 8000 - # Corresponds to maxAllowedNumberOfGlideins = 500 16-core glideins - if maxNumberOfGlideins > maxAllowedNumberOfGlideins: - maxNumberOfGlideins = maxAllowedNumberOfGlideins - print("Reducing Small Glidein limit due to threshold.") - autoCPUs = self.getAutoCPUs() - memoryPerCore = self.getMemoryPerCore() - memoryLimit = autoCPUs * memoryPerCore - auser = self.getUserName() - - # initialize counters - totalCores = 0 - - try: - # projection contains the job classads to be returned. - # These include the cpu and memory profile of each job, - # in the form of RequestCpus and RequestMemory - projection = [ - "JobStatus", - "Owner", - "RequestCpus", - "JobUniverse", - "RequestMemory", - ] - owner = f'(Owner=="{auser}")' - # query for idle jobs - jstat = f"(JobStatus=={htcondor.JobStatus.IDLE})" - # query for vanilla universe - # JobUniverse constants are in htcondor C++ - # UNIVERSE = { 1: "Standard", ..., 5: "Vanilla", ... } - juniv = "(JobUniverse==5)" - small = f"(RequestMemory<={memoryLimit} && RequestCpus<={autoCPUs})" - # The constraint determines that the jobs to be returned belong to - # the current user (Owner) and are Idle vanilla universe jobs. - full_constraint = f"{owner} && {jstat} && {juniv} && {small}" - if verbose: - print("\nQuerying condor queue for standard jobs") - print(f"full_constraint {full_constraint}") - condorq_data = condor_q( - constraint=full_constraint, - projection=projection, - ) - - if not condorq_data: - print("Auto: No small htcondor jobs detected.") - return - - print("smallGlideins: Jobs Fetched") + print(f"Submitting Large glidein for {ajob['ClusterId']}.{ajob['ProcId']}") + time.sleep(3) + exitCode = self.runCommand(cmd, verbose) + if exitCode != 0: + print("error running %s" % cmd) + sys.exit(exitCode) - schedd_name = list(condorq_data.keys())[0] - condorq_bps = condorq_data[schedd_name] + if not condorq_small: + print("Auto: no small Jobs detected.") + else: + print("Auto: summarize small jobs.") + maxNumberOfGlideins = self.getNodes() + maxAllowedNumberOfGlideins = self.getAllowedAutoGlideins() if verbose: - print(len(condorq_bps)) - # This can be extremely large - # print(condorq_bps) - # disassemble the dictionary of dictionaries - for jid in list(condorq_bps.keys()): - job = condorq_bps[jid] - thisCores = job["RequestCpus"] - thisMemory = job["RequestMemory"] + print(f"maxNumberOfGlideins {maxNumberOfGlideins}") + print(f"maxAllowedNumberOfGlideins {maxAllowedNumberOfGlideins}") + # The number of cores for the small glideins is capped at 8000 + # Corresponds to maxAllowedNumberOfGlideins = 500 16-core glideins + if maxNumberOfGlideins > maxAllowedNumberOfGlideins: + maxNumberOfGlideins = maxAllowedNumberOfGlideins + print("Reducing Small Glidein limit due to threshold.") + # initialize counter + totalCores = 0 + for ajob in condorq_small: + thisCores = ajob["RequestCpus"] + if isinstance(ajob["RequestMemory"], int): + thisMemory = ajob["RequestMemory"] + else: + thisMemory = ajob["RequestMemoryEval"] + print("Using RequestMemoryEval") totalCores = totalCores + thisCores if verbose: - print(f"smallGlideins: The key in the dictionary is {jid}") + print(f"smallGlideins: jobid {ajob['ClusterId']}.{ajob['ProcId']}") print(f"\tRequestCpus {thisCores}") print(f"\tCurrent value of totalCores {totalCores}") thisRatio = thisMemory / memoryPerCore @@ -432,72 +359,85 @@ def smallGlideinsFromJobPressure(self, generatedSlurmFile): if verbose: print(f"\t\tCurrent value of totalCores {totalCores}") - except Exception as exc: - raise type(exc)("Problem querying condor schedd for jobs") from None + print(f"smallGlideins: The final TotalCores is {totalCores}") - print(f"smallGlideins: The final TotalCores is {totalCores}") + # The number of Glideins needed to service the detected Idle jobs + # is "numberOfGlideins" + numberOfGlideins = math.ceil(totalCores / autoCPUs) + print(f"smallGlideins: Number for detected jobs is {numberOfGlideins}") - # The number of Glideins needed to service the detected Idle jobs - # is "numberOfGlideins" - numberOfGlideins = math.ceil(totalCores / autoCPUs) - print(f"smallGlideins: Number for detected jobs is {numberOfGlideins}") + jobname = f"glide_{auser}" - # Check Slurm queue Running glideins - jobname = f"glide_{auser}" - existingGlideinsRunning = 0 - batcmd = f"squeue --noheader --states=R --name={jobname} | wc -l" - print("The squeue command is: %s " % batcmd) + # Check Slurm queue Running glideins + existingGlideinsRunning = self.countRunningSlurmJobs(jobname) + + # Check Slurm queue Idle Glideins + existingGlideinsIdle = self.countIdleSlurmJobs(jobname) + + print(f"smallGlideins: existingGlideinsRunning {existingGlideinsRunning}") + print(f"smallGlideins: existingGlideinsIdle {existingGlideinsIdle}") + + # The number of Glideins needed to service the detected + # Idle jobs is "numberOfGlideins" less the existing Idle glideins + numberOfGlideinsReduced = numberOfGlideins - existingGlideinsIdle + print(f"smallGlideins: Target Number to submit {numberOfGlideinsReduced}") + + # The maximum number of Glideins that we can submit with + # the imposed threshold (maxNumberOfGlideins) + # is maxSubmitGlideins + existingGlideins = existingGlideinsRunning + existingGlideinsIdle + maxSubmitGlideins = maxNumberOfGlideins - existingGlideins + print(f"smallGlideins: maxNumberOfGlideins {maxNumberOfGlideins}") + print(f"smallGlideins: maxSubmitGlideins {maxSubmitGlideins}") + + # Reduce the number of Glideins to submit if threshold exceeded + if numberOfGlideinsReduced > maxSubmitGlideins: + numberOfGlideinsReduced = maxSubmitGlideins + print("smallGlideins: Reducing due to threshold.") + print( + f"smallGlideins: Number of Glideins to submit is {numberOfGlideinsReduced}" + ) + + cpuopt = f"--cpus-per-task {autoCPUs}" + memopt = f"--mem {memoryLimit}" + jobopt = f"-J {jobname}" + cmd = f"sbatch {cpuopt} {memopt} {jobopt} {generatedSlurmFile}" + if verbose: + print(cmd) + for glide in range(0, numberOfGlideinsReduced): + print("Submitting glidein %s " % glide) + exitCode = self.runCommand(cmd, verbose) + if exitCode != 0: + print("error running %s" % cmd) + sys.exit(exitCode) + + return + + def countIdleSlurmJobs(self, jobname): + """Check Slurm queue for Idle Glideins""" + + print(f"Checking if Idle Slurm job {jobname} exists:") + batcmd = f"squeue --noheader --states=PD --name={jobname} | wc -l" + print(f"The squeue command is {batcmd}") + time.sleep(3) try: - resultR = subprocess.check_output(batcmd, shell=True) + resultPD = subprocess.check_output(batcmd, shell=True) except subprocess.CalledProcessError as e: print(e.output) - existingGlideinsRunning = int(resultR.decode("UTF-8")) + numberOfJobs = int(resultPD.decode("UTF-8")) + return numberOfJobs - # Check Slurm queue Idle Glideins - existingGlideinsIdle = 0 - batcmd = f"squeue --noheader --states=PD --name={jobname} | wc -l" - print("The squeue command is: %s " % batcmd) + def countRunningSlurmJobs(self, jobname): + """Check Slurm queue for Running Glideins""" + + print(f"Checking if running Slurm job {jobname} exists:") + batcmd = f"squeue --noheader --states=R --name={jobname} | wc -l" + print(f"The squeue command is {batcmd}") + time.sleep(3) try: resultPD = subprocess.check_output(batcmd, shell=True) except subprocess.CalledProcessError as e: print(e.output) - existingGlideinsIdle = int(resultPD.decode("UTF-8")) - - print(f"smallGlideins: existingGlideinsRunning {existingGlideinsRunning}") - print(f"smallGlideins: existingGlideinsIdle {existingGlideinsIdle}") - - # The number of Glideins needed to service the detected - # Idle jobs is "numberOfGlideins" less the existing Idle glideins - numberOfGlideinsReduced = numberOfGlideins - existingGlideinsIdle - print(f"smallGlideins: Target Number to submit {numberOfGlideinsReduced}") - - # The maximum number of Glideins that we can submit with - # the imposed threshold (maxNumberOfGlideins) - # is maxSubmitGlideins - existingGlideins = existingGlideinsRunning + existingGlideinsIdle - maxSubmitGlideins = maxNumberOfGlideins - existingGlideins - print(f"smallGlideins: maxNumberOfGlideins {maxNumberOfGlideins}") - print(f"smallGlideins: maxSubmitGlideins {maxSubmitGlideins}") - - # Reduce the number of Glideins to submit if threshold exceeded - if numberOfGlideinsReduced > maxSubmitGlideins: - numberOfGlideinsReduced = maxSubmitGlideins - print("smallGlideins: Reducing due to threshold.") - print( - f"smallGlideins: Number of Glideins to submit is {numberOfGlideinsReduced}" - ) + numberOfJobs = int(resultPD.decode("UTF-8")) + return numberOfJobs - cpuopt = f"--cpus-per-task {autoCPUs}" - memopt = f"--mem {memoryLimit}" - jobopt = f"-J {jobname}" - cmd = f"sbatch {cpuopt} {memopt} {jobopt} {generatedSlurmFile}" - if verbose: - print(cmd) - for glide in range(0, numberOfGlideinsReduced): - print("Submitting glidein %s " % glide) - exitCode = self.runCommand(cmd, verbose) - if exitCode != 0: - print("error running %s" % cmd) - sys.exit(exitCode) - - return From e090c48ca8da92a02f360940519902d4cc016617 Mon Sep 17 00:00:00 2001 From: Greg Daues Date: Wed, 27 Mar 2024 06:48:08 -0700 Subject: [PATCH 2/4] python formatting --- python/lsst/ctrl/execute/slurmPlugin.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/python/lsst/ctrl/execute/slurmPlugin.py b/python/lsst/ctrl/execute/slurmPlugin.py index e69e28b..0ef1f19 100644 --- a/python/lsst/ctrl/execute/slurmPlugin.py +++ b/python/lsst/ctrl/execute/slurmPlugin.py @@ -238,9 +238,7 @@ def glideinsFromJobPressure(self, generatedSlurmFile): # The constraint determines that the jobs to be returned belong to # the current user (Owner) and are Idle vanilla universe jobs. - full_constraint = ( - f"{owner} && {jstat} && {juniv}" - ) + full_constraint = f"{owner} && {jstat} && {juniv}" print("Auto: Query for htcondor jobs.") if verbose: print(f"full_constraint {full_constraint}") @@ -314,7 +312,9 @@ def glideinsFromJobPressure(self, generatedSlurmFile): cmd = f"sbatch {cpuopt} {memopt} {jobopt} {generatedSlurmFile}" if verbose: print(cmd) - print(f"Submitting Large glidein for {ajob['ClusterId']}.{ajob['ProcId']}") + print( + f"Submitting Large glidein for {ajob['ClusterId']}.{ajob['ProcId']}" + ) time.sleep(3) exitCode = self.runCommand(cmd, verbose) if exitCode != 0: @@ -440,4 +440,3 @@ def countRunningSlurmJobs(self, jobname): print(e.output) numberOfJobs = int(resultPD.decode("UTF-8")) return numberOfJobs - From 23d21b3ffe1236b53174925afdc520d91439c921 Mon Sep 17 00:00:00 2001 From: Greg Daues Date: Wed, 10 Apr 2024 08:52:46 -0700 Subject: [PATCH 3/4] updates in response to review of pr 26 --- python/lsst/ctrl/execute/slurmPlugin.py | 228 ++++++++++++++---------- 1 file changed, 134 insertions(+), 94 deletions(-) diff --git a/python/lsst/ctrl/execute/slurmPlugin.py b/python/lsst/ctrl/execute/slurmPlugin.py index 0ef1f19..e06ae8b 100644 --- a/python/lsst/ctrl/execute/slurmPlugin.py +++ b/python/lsst/ctrl/execute/slurmPlugin.py @@ -36,6 +36,69 @@ class SlurmPlugin(Allocator): + + @staticmethod + def countSlurmJobs(jobname, jobstates): + """Check Slurm queue for Glideins of given states + + Parameters + ---------- + jobname : `string` + Slurm jobname to be searched for via squeue. + jobstates : `string` + Slurm jobstates to be searched for via squeue. + + Returns + ------- + numberOfJobs : `int` + The number of Slurm jobs detected via squeue. + """ + batcmd = f"squeue --noheader --states={jobstates} --name={jobname} | wc -l" + print(f"The squeue command is {batcmd}") + time.sleep(3) + try: + resultPD = subprocess.check_output(batcmd, shell=True) + except subprocess.CalledProcessError as e: + print(e.output) + numberOfJobs = int(resultPD.decode("UTF-8")) + return numberOfJobs + + @staticmethod + def countIdleSlurmJobs(jobname): + """Check Slurm queue for Idle Glideins + + Parameters + ---------- + jobname : `string` + Slurm jobname to be searched for via squeue. + + Returns + ------- + numberOfJobs : `int` + The number of Slurm jobs detected via squeue. + """ + print(f"Checking if idle Slurm job {jobname} exists:") + numberOfJobs = SlurmPlugin.countSlurmJobs(jobname, jobstates="PD") + return numberOfJobs + + @staticmethod + def countRunningSlurmJobs(jobname): + """Check Slurm queue for Running Glideins + + Parameters + ---------- + jobname : `string` + Slurm jobname to be searched for via squeue. + + Returns + ------- + numberOfJobs : `int` + The number of Slurm jobs detected via squeue. + """ + print(f"Checking if running Slurm job {jobname} exists:") + numberOfJobs = SlurmPlugin.countSlurmJobs(jobname, jobstates="R") + return numberOfJobs + def submit(self, platform, platformPkgDir): configName = os.path.join(platformPkgDir, "etc", "config", "slurmConfig.py") @@ -215,33 +278,33 @@ def glideinsFromJobPressure(self, generatedSlurmFile): memoryLimit = autoCPUs * memoryPerCore auser = self.getUserName() + # projection contains the job classads to be returned. + # These include the cpu and memory profile of each job, + # in the form of RequestCpus and RequestMemory + projection = [ + "ClusterId", + "ProcId", + "JobStatus", + "Owner", + "RequestCpus", + "JobUniverse", + "RequestMemory", + ] + owner = f'(Owner=="{auser}")' + # query for idle jobs + jstat = f"(JobStatus=={htcondor.JobStatus.IDLE})" + # query for vanilla universe + # JobUniverse constants are in htcondor C++ + # UNIVERSE = { 1: "Standard", ..., 5: "Vanilla", ... } + juniv = "(JobUniverse==5)" + + # The constraint determines that the jobs to be returned belong to + # the current user (Owner) and are Idle vanilla universe jobs. + full_constraint = f"{owner} && {jstat} && {juniv}" + print("Auto: Query for htcondor jobs.") + if verbose: + print(f"full_constraint {full_constraint}") try: - # projection contains the job classads to be returned. - # These include the cpu and memory profile of each job, - # in the form of RequestCpus and RequestMemory - projection = [ - "ClusterId", - "ProcId", - "JobStatus", - "Owner", - "RequestCpus", - "JobUniverse", - "RequestMemory", - ] - owner = f'(Owner=="{auser}")' - # query for idle jobs - jstat = f"(JobStatus=={htcondor.JobStatus.IDLE})" - # query for vanilla universe - # JobUniverse constants are in htcondor C++ - # UNIVERSE = { 1: "Standard", ..., 5: "Vanilla", ... } - juniv = "(JobUniverse==5)" - - # The constraint determines that the jobs to be returned belong to - # the current user (Owner) and are Idle vanilla universe jobs. - full_constraint = f"{owner} && {jstat} && {juniv}" - print("Auto: Query for htcondor jobs.") - if verbose: - print(f"full_constraint {full_constraint}") condorq_data = condor_q( constraint=full_constraint, projection=projection, @@ -256,22 +319,20 @@ def glideinsFromJobPressure(self, generatedSlurmFile): condorq_large = [] condorq_small = [] - schedd_name = list(condorq_data.keys())[0] - condorq_full = condorq_data[schedd_name] + schedd_name, condorq_full = condorq_data.popitem() print("Auto: Search for Large htcondor jobs.") - for jid in list(condorq_full.keys()): - ajob = condorq_full[jid] + for jid, ajob in condorq_full.items(): thisCpus = ajob["RequestCpus"] if isinstance(ajob["RequestMemory"], int): thisEvalMemory = ajob["RequestMemory"] else: thisEvalMemory = ajob["RequestMemory"].eval() - ajob["RequestMemoryEval"] = thisEvalMemory if verbose: print(f"Making an evaluation {thisEvalMemory}") # Search for jobs that are Large jobs # thisCpus > 16 or thisEvalMemory > 16*4096 + ajob["RequestMemoryEval"] = thisEvalMemory if thisEvalMemory > memoryLimit or thisCpus > autoCPUs: print(f"Appending a Large Job {jid}") condorq_large.append(ajob) @@ -286,11 +347,7 @@ def glideinsFromJobPressure(self, generatedSlurmFile): if verbose: print(f"\n{ajob['ClusterId']}.{ajob['ProcId']}") print(ajob) - if isinstance(ajob["RequestMemory"], int): - thisMemory = ajob["RequestMemory"] - else: - # Do not eval() a second time, recall the value - thisMemory = ajob["RequestMemoryEval"] + thisMemory = ajob["RequestMemoryEval"] useCores = ajob["RequestCpus"] clusterid = ajob["ClusterId"] procid = ajob["ProcId"] @@ -300,9 +357,10 @@ def glideinsFromJobPressure(self, generatedSlurmFile): hash = hashlib.sha1(job_label.encode("UTF-8")).hexdigest() shash = hash[:6] jobname = f"{auser}_{shash}" - print(f"jobname {jobname}") + if verbose: + print(f"jobname {jobname}") # Check if Job exists Idle in the queue - numberJobname = self.countIdleSlurmJobs(jobname) + numberJobname = SlurmPlugin.countIdleSlurmJobs(jobname) if numberJobname > 0: print(f"Job {jobname} already exists, do not submit") continue @@ -335,68 +393,78 @@ def glideinsFromJobPressure(self, generatedSlurmFile): if maxNumberOfGlideins > maxAllowedNumberOfGlideins: maxNumberOfGlideins = maxAllowedNumberOfGlideins print("Reducing Small Glidein limit due to threshold.") - # initialize counter + # + # In the following loop we calculate the number of cores + # required by the set of small jobs. This calculation utilizes + # the requested cpus for a job, but also checks the requested + # memory and counts an effective core for each 'memoryPerCore' + # of memory (by default the 4GB per core than S3DF Slurm schedules). totalCores = 0 for ajob in condorq_small: - thisCores = ajob["RequestCpus"] - if isinstance(ajob["RequestMemory"], int): - thisMemory = ajob["RequestMemory"] - else: - thisMemory = ajob["RequestMemoryEval"] - print("Using RequestMemoryEval") - totalCores = totalCores + thisCores + requestedCpus = ajob["RequestCpus"] + # if isinstance(ajob["RequestMemory"], int): + # requestedMemory = ajob["RequestMemory"] + # else: + # requestedMemory = ajob["RequestMemoryEval"] + # print("Using RequestMemoryEval") + requestedMemory = ajob["RequestMemoryEval"] + totalCores = totalCores + requestedCpus if verbose: - print(f"smallGlideins: jobid {ajob['ClusterId']}.{ajob['ProcId']}") - print(f"\tRequestCpus {thisCores}") + print(f"small: jobid {ajob['ClusterId']}.{ajob['ProcId']}") + print(f"\tRequestCpus {requestedCpus}") print(f"\tCurrent value of totalCores {totalCores}") - thisRatio = thisMemory / memoryPerCore - if thisRatio > thisCores: + neededCpus = requestedMemory / memoryPerCore + if neededCpus > requestedCpus: if verbose: print("\t\tNeed to Add More:") - print(f"\t\tRequestMemory is {thisMemory} ") - print(f"\t\tRatio to {memoryPerCore} MB is {thisRatio} ") - totalCores = totalCores + (thisRatio - thisCores) + print(f"\t\tRequestMemory is {requestedMemory} ") + print(f"\t\tRatio to {memoryPerCore} MB is {neededCpus}") + totalCores = totalCores + (neededCpus - requestedCpus) if verbose: print(f"\t\tCurrent value of totalCores {totalCores}") - print(f"smallGlideins: The final TotalCores is {totalCores}") + print(f"small: The final TotalCores is {totalCores}") # The number of Glideins needed to service the detected Idle jobs # is "numberOfGlideins" numberOfGlideins = math.ceil(totalCores / autoCPUs) - print(f"smallGlideins: Number for detected jobs is {numberOfGlideins}") + print(f"small: Number for detected jobs is {numberOfGlideins}") jobname = f"glide_{auser}" # Check Slurm queue Running glideins - existingGlideinsRunning = self.countRunningSlurmJobs(jobname) + existingGlideinsRunning = SlurmPlugin.countRunningSlurmJobs(jobname) # Check Slurm queue Idle Glideins - existingGlideinsIdle = self.countIdleSlurmJobs(jobname) + existingGlideinsIdle = SlurmPlugin.countIdleSlurmJobs(jobname) - print(f"smallGlideins: existingGlideinsRunning {existingGlideinsRunning}") - print(f"smallGlideins: existingGlideinsIdle {existingGlideinsIdle}") + if verbose: + print(f"small: existingGlideinsRunning {existingGlideinsRunning}") + print(f"small: existingGlideinsIdle {existingGlideinsIdle}") # The number of Glideins needed to service the detected # Idle jobs is "numberOfGlideins" less the existing Idle glideins numberOfGlideinsReduced = numberOfGlideins - existingGlideinsIdle - print(f"smallGlideins: Target Number to submit {numberOfGlideinsReduced}") + if verbose: + print(f"small: Target Number to submit {numberOfGlideinsReduced}") # The maximum number of Glideins that we can submit with # the imposed threshold (maxNumberOfGlideins) # is maxSubmitGlideins existingGlideins = existingGlideinsRunning + existingGlideinsIdle maxSubmitGlideins = maxNumberOfGlideins - existingGlideins - print(f"smallGlideins: maxNumberOfGlideins {maxNumberOfGlideins}") - print(f"smallGlideins: maxSubmitGlideins {maxSubmitGlideins}") + if verbose: + print(f"small: maxNumberOfGlideins {maxNumberOfGlideins}") + print(f"small: maxSubmitGlideins {maxSubmitGlideins}") # Reduce the number of Glideins to submit if threshold exceeded if numberOfGlideinsReduced > maxSubmitGlideins: numberOfGlideinsReduced = maxSubmitGlideins - print("smallGlideins: Reducing due to threshold.") - print( - f"smallGlideins: Number of Glideins to submit is {numberOfGlideinsReduced}" - ) + print("small: Reducing due to threshold.") + if verbose: + print( + f"small: Number of Glideins to submit is {numberOfGlideinsReduced}" + ) cpuopt = f"--cpus-per-task {autoCPUs}" memopt = f"--mem {memoryLimit}" @@ -412,31 +480,3 @@ def glideinsFromJobPressure(self, generatedSlurmFile): sys.exit(exitCode) return - - def countIdleSlurmJobs(self, jobname): - """Check Slurm queue for Idle Glideins""" - - print(f"Checking if Idle Slurm job {jobname} exists:") - batcmd = f"squeue --noheader --states=PD --name={jobname} | wc -l" - print(f"The squeue command is {batcmd}") - time.sleep(3) - try: - resultPD = subprocess.check_output(batcmd, shell=True) - except subprocess.CalledProcessError as e: - print(e.output) - numberOfJobs = int(resultPD.decode("UTF-8")) - return numberOfJobs - - def countRunningSlurmJobs(self, jobname): - """Check Slurm queue for Running Glideins""" - - print(f"Checking if running Slurm job {jobname} exists:") - batcmd = f"squeue --noheader --states=R --name={jobname} | wc -l" - print(f"The squeue command is {batcmd}") - time.sleep(3) - try: - resultPD = subprocess.check_output(batcmd, shell=True) - except subprocess.CalledProcessError as e: - print(e.output) - numberOfJobs = int(resultPD.decode("UTF-8")) - return numberOfJobs From eb27653816f122505a943f3c7599822b2b255935 Mon Sep 17 00:00:00 2001 From: Greg Daues Date: Wed, 10 Apr 2024 08:56:39 -0700 Subject: [PATCH 4/4] comment one character too long --- python/lsst/ctrl/execute/slurmPlugin.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/lsst/ctrl/execute/slurmPlugin.py b/python/lsst/ctrl/execute/slurmPlugin.py index e06ae8b..f0606fb 100644 --- a/python/lsst/ctrl/execute/slurmPlugin.py +++ b/python/lsst/ctrl/execute/slurmPlugin.py @@ -398,7 +398,7 @@ def glideinsFromJobPressure(self, generatedSlurmFile): # required by the set of small jobs. This calculation utilizes # the requested cpus for a job, but also checks the requested # memory and counts an effective core for each 'memoryPerCore' - # of memory (by default the 4GB per core than S3DF Slurm schedules). + # of memory (by default the 4GB per core of S3DF Slurm scheduler). totalCores = 0 for ajob in condorq_small: requestedCpus = ajob["RequestCpus"]