diff --git a/python/lsst/ctrl/execute/slurmPlugin.py b/python/lsst/ctrl/execute/slurmPlugin.py index 524773d..f0606fb 100644 --- a/python/lsst/ctrl/execute/slurmPlugin.py +++ b/python/lsst/ctrl/execute/slurmPlugin.py @@ -36,6 +36,69 @@ class SlurmPlugin(Allocator): + + @staticmethod + def countSlurmJobs(jobname, jobstates): + """Check Slurm queue for Glideins of given states + + Parameters + ---------- + jobname : `string` + Slurm jobname to be searched for via squeue. + jobstates : `string` + Slurm jobstates to be searched for via squeue. + + Returns + ------- + numberOfJobs : `int` + The number of Slurm jobs detected via squeue. + """ + batcmd = f"squeue --noheader --states={jobstates} --name={jobname} | wc -l" + print(f"The squeue command is {batcmd}") + time.sleep(3) + try: + resultPD = subprocess.check_output(batcmd, shell=True) + except subprocess.CalledProcessError as e: + print(e.output) + numberOfJobs = int(resultPD.decode("UTF-8")) + return numberOfJobs + + @staticmethod + def countIdleSlurmJobs(jobname): + """Check Slurm queue for Idle Glideins + + Parameters + ---------- + jobname : `string` + Slurm jobname to be searched for via squeue. + + Returns + ------- + numberOfJobs : `int` + The number of Slurm jobs detected via squeue. + """ + print(f"Checking if idle Slurm job {jobname} exists:") + numberOfJobs = SlurmPlugin.countSlurmJobs(jobname, jobstates="PD") + return numberOfJobs + + @staticmethod + def countRunningSlurmJobs(jobname): + """Check Slurm queue for Running Glideins + + Parameters + ---------- + jobname : `string` + Slurm jobname to be searched for via squeue. + + Returns + ------- + numberOfJobs : `int` + The number of Slurm jobs detected via squeue. + """ + print(f"Checking if running Slurm job {jobname} exists:") + numberOfJobs = SlurmPlugin.countSlurmJobs(jobname, jobstates="R") + return numberOfJobs + def submit(self, platform, platformPkgDir): configName = os.path.join(platformPkgDir, "etc", "config", "slurmConfig.py") @@ -93,8 +156,7 @@ def submit(self, platform, platformPkgDir): print("The user home directory is %s " % self.getUserHome()) if auto: - self.largeGlideinsFromJobPressure(generatedSlurmFile) - self.smallGlideinsFromJobPressure(generatedSlurmFile) + self.glideinsFromJobPressure(generatedSlurmFile) else: nodes = self.getNodes() # In this case 'nodes' is the Target. @@ -207,8 +269,8 @@ def createAllocationFile(self, input): os.chmod(outfile, 0o755) return outfile - def largeGlideinsFromJobPressure(self, generatedSlurmFile): - """Determine and submit the large glideins needed from job pressure""" + def glideinsFromJobPressure(self, generatedSlurmFile): + """Determine and submit the glideins needed from job pressure""" verbose = self.isVerbose() autoCPUs = self.getAutoCPUs() @@ -216,288 +278,205 @@ def largeGlideinsFromJobPressure(self, generatedSlurmFile): memoryLimit = autoCPUs * memoryPerCore auser = self.getUserName() + # projection contains the job classads to be returned. + # These include the cpu and memory profile of each job, + # in the form of RequestCpus and RequestMemory + projection = [ + "ClusterId", + "ProcId", + "JobStatus", + "Owner", + "RequestCpus", + "JobUniverse", + "RequestMemory", + ] + owner = f'(Owner=="{auser}")' + # query for idle jobs + jstat = f"(JobStatus=={htcondor.JobStatus.IDLE})" + # query for vanilla universe + # JobUniverse constants are in htcondor C++ + # UNIVERSE = { 1: "Standard", ..., 5: "Vanilla", ... } + juniv = "(JobUniverse==5)" + + # The constraint determines that the jobs to be returned belong to + # the current user (Owner) and are Idle vanilla universe jobs. + full_constraint = f"{owner} && {jstat} && {juniv}" + print("Auto: Query for htcondor jobs.") + if verbose: + print(f"full_constraint {full_constraint}") try: - # Query for jobs that have (bps_run isnt Undefined) - # (bps_job_label isnt Undefined) - # and amongst those identify Large jobs - # thisCores > 16 - # thisMemory > 16*4096 - # - # projection contains the job classads to be returned. - # These include the cpu and memory profile of each job, - # in the form of RequestCpus and RequestMemory - projection = [ - "bps_run", - "bps_job_label", - "JobStatus", - "Owner", - "RequestCpus", - "JobUniverse", - "RequestMemory", - ] - owner = f'(Owner=="{auser}")' - # query for idle jobs - jstat = f"(JobStatus=={htcondor.JobStatus.IDLE})" - bps1 = "(bps_run isnt Undefined)" - bps2 = "(bps_job_label isnt Undefined)" - # query for vanilla universe - # JobUniverse constants are in htcondor C++ - # UNIVERSE = { 1: "Standard", ..., 5: "Vanilla", ... } - juniv = "(JobUniverse==5)" - large = f"(RequestMemory>{memoryLimit} || RequestCpus>{autoCPUs})" - # The constraint determines that the jobs to be returned belong to - # the current user (Owner) and are Idle vanilla universe jobs. - full_constraint = ( - f"{owner} && {jstat} && {bps1} && {bps2} && {juniv} && {large}" - ) - if verbose: - print("Find Large BPS Jobs:") - print(f"full_constraint {full_constraint}") condorq_data = condor_q( constraint=full_constraint, projection=projection, ) - if not condorq_data: - print("Auto: No Large BPS Jobs.") - return - - # Collect a list of the labels - schedd_name = list(condorq_data.keys())[0] - condorq_bps_large = condorq_data[schedd_name] - job_labels = [] - if verbose: - print("Loop over list of Large Jobs") - for jid in list(condorq_bps_large.keys()): - ajob = condorq_bps_large[jid] - if verbose: - print(jid) - print(ajob["bps_job_label"]) - job_labels.append(ajob["bps_job_label"]) + except Exception as exc: + raise type(exc)("Problem querying condor schedd for jobs") from None - # - # Get a list of the unique labels - # - unique_labels = set(job_labels) + if not condorq_data: + print("Auto: No HTCondor Jobs detected.") + return - # - # Make a jobs dictionary with the unique labels as keys - # - if verbose: - print("Loop over unique job label list") - label_dict = {} - for job_label in unique_labels: - empty_list = [] - label_dict[job_label] = empty_list - - # Loop over the Large jobs and categorize - for jid in list(condorq_bps_large.keys()): - ajob = condorq_bps_large[jid] - this_label = ajob["bps_job_label"] - this_list = label_dict[this_label] - this_list.append(ajob) - - for job_label in unique_labels: + condorq_large = [] + condorq_small = [] + schedd_name, condorq_full = condorq_data.popitem() + + print("Auto: Search for Large htcondor jobs.") + for jid, ajob in condorq_full.items(): + thisCpus = ajob["RequestCpus"] + if isinstance(ajob["RequestMemory"], int): + thisEvalMemory = ajob["RequestMemory"] + else: + thisEvalMemory = ajob["RequestMemory"].eval() if verbose: - print(f"\n{job_label}") - alist = label_dict[job_label] - thisMemory = alist[0]["RequestMemory"] - useCores = alist[0]["RequestCpus"] + print(f"Making an evaluation {thisEvalMemory}") + # Search for jobs that are Large jobs + # thisCpus > 16 or thisEvalMemory > 16*4096 + ajob["RequestMemoryEval"] = thisEvalMemory + if thisEvalMemory > memoryLimit or thisCpus > autoCPUs: + print(f"Appending a Large Job {jid}") + condorq_large.append(ajob) + else: + condorq_small.append(ajob) + + if not condorq_large: + print("Auto: no Large jobs detected.") + else: + print("Auto: detected Large jobs") + for ajob in condorq_large: + if verbose: + print(f"\n{ajob['ClusterId']}.{ajob['ProcId']}") + print(ajob) + thisMemory = ajob["RequestMemoryEval"] + useCores = ajob["RequestCpus"] + clusterid = ajob["ClusterId"] + procid = ajob["ProcId"] + job_label = f"{clusterid}_{procid}_{thisMemory}" if useCores < autoCPUs: useCores = autoCPUs hash = hashlib.sha1(job_label.encode("UTF-8")).hexdigest() shash = hash[:6] - numberOfGlideins = len(alist) jobname = f"{auser}_{shash}" - print(f"{job_label} {jobname} target {numberOfGlideins}") - - # Do not submit squeue commands rapidly - time.sleep(2) - # Check Slurm queue Idle Glideins - batcmd = f"squeue --noheader --states=PD --name={jobname} | wc -l" - if verbose: - print("The squeue command is: %s " % batcmd) - try: - resultPD = subprocess.check_output(batcmd, shell=True) - except subprocess.CalledProcessError as e: - print(e.output) - existingGlideinsIdle = int(resultPD.decode("UTF-8")) - if verbose: - print(f"existingGlideinsIdle {jobname}") - print(existingGlideinsIdle) - - numberOfGlideinsReduced = numberOfGlideins - existingGlideinsIdle if verbose: - print(f"{job_label} reduced {numberOfGlideinsReduced}") - + print(f"jobname {jobname}") + # Check if Job exists Idle in the queue + numberJobname = SlurmPlugin.countIdleSlurmJobs(jobname) + if numberJobname > 0: + print(f"Job {jobname} already exists, do not submit") + continue cpuopt = f"--cpus-per-task {useCores}" memopt = f"--mem {thisMemory}" jobopt = f"-J {jobname}" cmd = f"sbatch {cpuopt} {memopt} {jobopt} {generatedSlurmFile}" if verbose: print(cmd) - for glide in range(0, numberOfGlideinsReduced): - print("Submitting Large glidein %s " % glide) - exitCode = self.runCommand(cmd, verbose) - if exitCode != 0: - print("error running %s" % cmd) - sys.exit(exitCode) - - except Exception as exc: - raise type(exc)("Problem querying condor schedd for jobs") from None - - return - - def smallGlideinsFromJobPressure(self, generatedSlurmFile): - """Determine and submit the small glideins needed from job pressure""" - - verbose = self.isVerbose() - maxNumberOfGlideins = self.getNodes() - maxAllowedNumberOfGlideins = self.getAllowedAutoGlideins() - if verbose: - print(f"maxNumberOfGlideins {maxNumberOfGlideins}") - print(f"maxAllowedNumberOfGlideins {maxAllowedNumberOfGlideins}") - # The number of cores for the small glideins is capped at 8000 - # Corresponds to maxAllowedNumberOfGlideins = 500 16-core glideins - if maxNumberOfGlideins > maxAllowedNumberOfGlideins: - maxNumberOfGlideins = maxAllowedNumberOfGlideins - print("Reducing Small Glidein limit due to threshold.") - autoCPUs = self.getAutoCPUs() - memoryPerCore = self.getMemoryPerCore() - memoryLimit = autoCPUs * memoryPerCore - auser = self.getUserName() - - # initialize counters - totalCores = 0 - - try: - # projection contains the job classads to be returned. - # These include the cpu and memory profile of each job, - # in the form of RequestCpus and RequestMemory - projection = [ - "JobStatus", - "Owner", - "RequestCpus", - "JobUniverse", - "RequestMemory", - ] - owner = f'(Owner=="{auser}")' - # query for idle jobs - jstat = f"(JobStatus=={htcondor.JobStatus.IDLE})" - # query for vanilla universe - # JobUniverse constants are in htcondor C++ - # UNIVERSE = { 1: "Standard", ..., 5: "Vanilla", ... } - juniv = "(JobUniverse==5)" - small = f"(RequestMemory<={memoryLimit} && RequestCpus<={autoCPUs})" - # The constraint determines that the jobs to be returned belong to - # the current user (Owner) and are Idle vanilla universe jobs. - full_constraint = f"{owner} && {jstat} && {juniv} && {small}" - if verbose: - print("\nQuerying condor queue for standard jobs") - print(f"full_constraint {full_constraint}") - condorq_data = condor_q( - constraint=full_constraint, - projection=projection, - ) - - if not condorq_data: - print("Auto: No small htcondor jobs detected.") - return - - print("smallGlideins: Jobs Fetched") + print( + f"Submitting Large glidein for {ajob['ClusterId']}.{ajob['ProcId']}" + ) + time.sleep(3) + exitCode = self.runCommand(cmd, verbose) + if exitCode != 0: + print("error running %s" % cmd) + sys.exit(exitCode) - schedd_name = list(condorq_data.keys())[0] - condorq_bps = condorq_data[schedd_name] + if not condorq_small: + print("Auto: no small Jobs detected.") + else: + print("Auto: summarize small jobs.") + maxNumberOfGlideins = self.getNodes() + maxAllowedNumberOfGlideins = self.getAllowedAutoGlideins() if verbose: - print(len(condorq_bps)) - # This can be extremely large - # print(condorq_bps) - # disassemble the dictionary of dictionaries - for jid in list(condorq_bps.keys()): - job = condorq_bps[jid] - thisCores = job["RequestCpus"] - thisMemory = job["RequestMemory"] - totalCores = totalCores + thisCores + print(f"maxNumberOfGlideins {maxNumberOfGlideins}") + print(f"maxAllowedNumberOfGlideins {maxAllowedNumberOfGlideins}") + # The number of cores for the small glideins is capped at 8000 + # Corresponds to maxAllowedNumberOfGlideins = 500 16-core glideins + if maxNumberOfGlideins > maxAllowedNumberOfGlideins: + maxNumberOfGlideins = maxAllowedNumberOfGlideins + print("Reducing Small Glidein limit due to threshold.") + # + # In the following loop we calculate the number of cores + # required by the set of small jobs. This calculation utilizes + # the requested cpus for a job, but also checks the requested + # memory and counts an effective core for each 'memoryPerCore' + # of memory (by default the 4GB per core of S3DF Slurm scheduler). + totalCores = 0 + for ajob in condorq_small: + requestedCpus = ajob["RequestCpus"] + # if isinstance(ajob["RequestMemory"], int): + # requestedMemory = ajob["RequestMemory"] + # else: + # requestedMemory = ajob["RequestMemoryEval"] + # print("Using RequestMemoryEval") + requestedMemory = ajob["RequestMemoryEval"] + totalCores = totalCores + requestedCpus if verbose: - print(f"smallGlideins: The key in the dictionary is {jid}") - print(f"\tRequestCpus {thisCores}") + print(f"small: jobid {ajob['ClusterId']}.{ajob['ProcId']}") + print(f"\tRequestCpus {requestedCpus}") print(f"\tCurrent value of totalCores {totalCores}") - thisRatio = thisMemory / memoryPerCore - if thisRatio > thisCores: + neededCpus = requestedMemory / memoryPerCore + if neededCpus > requestedCpus: if verbose: print("\t\tNeed to Add More:") - print(f"\t\tRequestMemory is {thisMemory} ") - print(f"\t\tRatio to {memoryPerCore} MB is {thisRatio} ") - totalCores = totalCores + (thisRatio - thisCores) + print(f"\t\tRequestMemory is {requestedMemory} ") + print(f"\t\tRatio to {memoryPerCore} MB is {neededCpus}") + totalCores = totalCores + (neededCpus - requestedCpus) if verbose: print(f"\t\tCurrent value of totalCores {totalCores}") - except Exception as exc: - raise type(exc)("Problem querying condor schedd for jobs") from None + print(f"small: The final TotalCores is {totalCores}") - print(f"smallGlideins: The final TotalCores is {totalCores}") + # The number of Glideins needed to service the detected Idle jobs + # is "numberOfGlideins" + numberOfGlideins = math.ceil(totalCores / autoCPUs) + print(f"small: Number for detected jobs is {numberOfGlideins}") - # The number of Glideins needed to service the detected Idle jobs - # is "numberOfGlideins" - numberOfGlideins = math.ceil(totalCores / autoCPUs) - print(f"smallGlideins: Number for detected jobs is {numberOfGlideins}") + jobname = f"glide_{auser}" - # Check Slurm queue Running glideins - jobname = f"glide_{auser}" - existingGlideinsRunning = 0 - batcmd = f"squeue --noheader --states=R --name={jobname} | wc -l" - print("The squeue command is: %s " % batcmd) - try: - resultR = subprocess.check_output(batcmd, shell=True) - except subprocess.CalledProcessError as e: - print(e.output) - existingGlideinsRunning = int(resultR.decode("UTF-8")) + # Check Slurm queue Running glideins + existingGlideinsRunning = SlurmPlugin.countRunningSlurmJobs(jobname) - # Check Slurm queue Idle Glideins - existingGlideinsIdle = 0 - batcmd = f"squeue --noheader --states=PD --name={jobname} | wc -l" - print("The squeue command is: %s " % batcmd) - try: - resultPD = subprocess.check_output(batcmd, shell=True) - except subprocess.CalledProcessError as e: - print(e.output) - existingGlideinsIdle = int(resultPD.decode("UTF-8")) - - print(f"smallGlideins: existingGlideinsRunning {existingGlideinsRunning}") - print(f"smallGlideins: existingGlideinsIdle {existingGlideinsIdle}") - - # The number of Glideins needed to service the detected - # Idle jobs is "numberOfGlideins" less the existing Idle glideins - numberOfGlideinsReduced = numberOfGlideins - existingGlideinsIdle - print(f"smallGlideins: Target Number to submit {numberOfGlideinsReduced}") - - # The maximum number of Glideins that we can submit with - # the imposed threshold (maxNumberOfGlideins) - # is maxSubmitGlideins - existingGlideins = existingGlideinsRunning + existingGlideinsIdle - maxSubmitGlideins = maxNumberOfGlideins - existingGlideins - print(f"smallGlideins: maxNumberOfGlideins {maxNumberOfGlideins}") - print(f"smallGlideins: maxSubmitGlideins {maxSubmitGlideins}") - - # Reduce the number of Glideins to submit if threshold exceeded - if numberOfGlideinsReduced > maxSubmitGlideins: - numberOfGlideinsReduced = maxSubmitGlideins - print("smallGlideins: Reducing due to threshold.") - print( - f"smallGlideins: Number of Glideins to submit is {numberOfGlideinsReduced}" - ) + # Check Slurm queue Idle Glideins + existingGlideinsIdle = SlurmPlugin.countIdleSlurmJobs(jobname) - cpuopt = f"--cpus-per-task {autoCPUs}" - memopt = f"--mem {memoryLimit}" - jobopt = f"-J {jobname}" - cmd = f"sbatch {cpuopt} {memopt} {jobopt} {generatedSlurmFile}" - if verbose: - print(cmd) - for glide in range(0, numberOfGlideinsReduced): - print("Submitting glidein %s " % glide) - exitCode = self.runCommand(cmd, verbose) - if exitCode != 0: - print("error running %s" % cmd) - sys.exit(exitCode) + if verbose: + print(f"small: existingGlideinsRunning {existingGlideinsRunning}") + print(f"small: existingGlideinsIdle {existingGlideinsIdle}") + + # The number of Glideins needed to service the detected + # Idle jobs is "numberOfGlideins" less the existing Idle glideins + numberOfGlideinsReduced = numberOfGlideins - existingGlideinsIdle + if verbose: + print(f"small: Target Number to submit {numberOfGlideinsReduced}") + + # The maximum number of Glideins that we can submit with + # the imposed threshold (maxNumberOfGlideins) + # is maxSubmitGlideins + existingGlideins = existingGlideinsRunning + existingGlideinsIdle + maxSubmitGlideins = maxNumberOfGlideins - existingGlideins + if verbose: + print(f"small: maxNumberOfGlideins {maxNumberOfGlideins}") + print(f"small: maxSubmitGlideins {maxSubmitGlideins}") + + # Reduce the number of Glideins to submit if threshold exceeded + if numberOfGlideinsReduced > maxSubmitGlideins: + numberOfGlideinsReduced = maxSubmitGlideins + print("small: Reducing due to threshold.") + if verbose: + print( + f"small: Number of Glideins to submit is {numberOfGlideinsReduced}" + ) + + cpuopt = f"--cpus-per-task {autoCPUs}" + memopt = f"--mem {memoryLimit}" + jobopt = f"-J {jobname}" + cmd = f"sbatch {cpuopt} {memopt} {jobopt} {generatedSlurmFile}" + if verbose: + print(cmd) + for glide in range(0, numberOfGlideinsReduced): + print("Submitting glidein %s " % glide) + exitCode = self.runCommand(cmd, verbose) + if exitCode != 0: + print("error running %s" % cmd) + sys.exit(exitCode) return