diff --git a/PandaPkgInfo.py b/PandaPkgInfo.py index 3d5219442..8041202a8 100644 --- a/PandaPkgInfo.py +++ b/PandaPkgInfo.py @@ -1 +1 @@ -release_version = "0.3.0" +release_version = "0.3.1" diff --git a/pandaserver/taskbuffer/OraDBProxy.py b/pandaserver/taskbuffer/OraDBProxy.py index eae31071b..6afa76dbc 100644 --- a/pandaserver/taskbuffer/OraDBProxy.py +++ b/pandaserver/taskbuffer/OraDBProxy.py @@ -35,6 +35,7 @@ PrioUtil, ProcessGroups, SiteSpec, + task_split_rules, ) from pandaserver.taskbuffer.CloudTaskSpec import CloudTaskSpec from pandaserver.taskbuffer.DatasetSpec import DatasetSpec @@ -18792,6 +18793,154 @@ def increaseRamLimitJobJEDI_xtimes(self, job, jobRamCount, jediTaskID, attemptNr self.dumpErrorMessage(_logger, methodName) return False + # reduce input per job + def reduce_input_per_job(self, panda_id, jedi_task_id, attempt_nr, excluded_rules, steps, dry_mode): + comment = " /* DBProxy.reduce_input_per_job */" + method_name = comment.split(" ")[-2].split(".")[-1] + tmp_log = LogWrapper(_logger, method_name + f" < PandaID={panda_id} jediTaskID={jedi_task_id} attemptNr={attempt_nr} >") + tmp_log.debug("start") + try: + # rules to skip action when they are set + if not excluded_rules: + excluded_rules = ["nEventsPerJob", "nFilesPerJob"] + else: + excluded_rules = excluded_rules.split(",") + + # thresholds with attempt numbers to trigger actions + if not steps: + threshold_low = 2 + threshold_middle = 4 + threshold_high = 7 + else: + threshold_low, threshold_middle, threshold_high = [int(s) for s in steps.split(",")] + + # if no task associated to job don't take any action + if jedi_task_id in [None, 0, "NULL"]: + msg_str = "skipping since no task associated to job" + tmp_log.debug(msg_str) + return False, msg_str + + # check attempt number + if attempt_nr < threshold_low: + msg_str = f"skipping since not enough attempts ({attempt_nr} < {threshold_low}) have been made" + tmp_log.debug(msg_str) + return False, msg_str + + # get current split rules + var_map = {":jediTaskID": jedi_task_id} + sql_gr = f"SELECT splitRule FROM {panda_config.schemaJEDI}.JEDI_Tasks " + sql_gr += "WHERE jediTaskID=:jediTaskID " + self.cur.execute(sql_gr + comment, var_map) + (split_rule,) = self.cur.fetchone() + + # extract split rule values + rule_values = task_split_rules.extract_rule_values( + split_rule, ["nEventsPerJob", "nFilesPerJob", "nGBPerJob", "nMaxFilesPerJob", "retryModuleRules"] + ) + + # no action if num events or files per job is specified + for rule_name in excluded_rules: + if rule_values[rule_name]: + msg_str = f"skipping since task uses {rule_name}" + tmp_log.debug(msg_str) + return False, msg_str + + # current max number of files or gigabytes per job + current_max_files_per_job = rule_values["nMaxFilesPerJob"] + if current_max_files_per_job: + current_max_files_per_job = int(current_max_files_per_job) + current_gigabytes_per_job = rule_values["nGBPerJob"] + if current_gigabytes_per_job: + current_gigabytes_per_job = int(current_gigabytes_per_job) + + # initial max number of files or gigabytes per job for retry module + rules_for_retry_module = rule_values["retryModuleRules"] + rule_values_for_retry_module = task_split_rules.extract_rule_values(rules_for_retry_module, ["nGBPerJob", "nMaxFilesPerJob"], is_sub_rule=True) + init_gigabytes_per_job = rule_values_for_retry_module["nGBPerJob"] + init_max_files_per_job = rule_values_for_retry_module["nMaxFilesPerJob"] + + # set initial values for the first action + set_init_rules = False + if not init_gigabytes_per_job: + set_init_rules = True + if current_gigabytes_per_job: + init_gigabytes_per_job = current_gigabytes_per_job + else: + # use current job size as initial gigabytes per job for retry module + var_map = {":PandaID": panda_id} + sql_fz = f"SELECT SUM(fsize) FROM {panda_config.schemaPANDA}.filesTable4 " + sql_fz += "WHERE PandaID=:PandaID " + self.cur.execute(sql_fz + comment, var_map) + (init_gigabytes_per_job,) = self.cur.fetchone() + init_gigabytes_per_job = math.ceil(init_gigabytes_per_job / 1024 / 1024 / 1024) + if not init_max_files_per_job: + set_init_rules = True + if current_max_files_per_job: + init_max_files_per_job = current_max_files_per_job + else: + # use current job size as initial max number of files per job for retry module + var_map = {":PandaID": panda_id, ":jediTaskID": jedi_task_id, ":type1": "input", ":type2": "pseudo_input"} + sql_fc = f"SELECT COUNT(*) FROM {panda_config.schemaPANDA}.filesTable4 tabF, {panda_config.schemaJEDI}.JEDI_Datasets tabD " + sql_fc += ( + "WHERE tabD.jediTaskID=:jediTaskID AND tabD.type IN (:type1, :type2) AND tabD.masterID IS NULL " + "AND tabF.PandaID=:PandaID AND tabF.datasetID=tabD.datasetID " + ) + self.cur.execute(sql_fc + comment, var_map) + (init_max_files_per_job,) = self.cur.fetchone() + + # set target based on attempt number + if attempt_nr < threshold_middle: + target_gigabytes_per_job = math.floor(init_gigabytes_per_job / 2) + target_max_files_per_job = math.floor(init_max_files_per_job / 2) + elif attempt_nr < threshold_high: + target_gigabytes_per_job = math.floor(init_gigabytes_per_job / 4) + target_max_files_per_job = math.floor(init_max_files_per_job / 4) + else: + target_gigabytes_per_job = 1 + target_max_files_per_job = 1 + target_gigabytes_per_job = max(1, target_gigabytes_per_job) + target_max_files_per_job = max(1, target_max_files_per_job) + + # update rules when initial values were unset or new values need to be set + if set_init_rules or current_gigabytes_per_job != target_gigabytes_per_job or current_max_files_per_job != target_max_files_per_job: + msg_str = "update splitRule: " + if set_init_rules: + msg_str += f"initial nGBPerJob={init_gigabytes_per_job} nMaxFilesPerJob={init_max_files_per_job}. " + rules_for_retry_module = task_split_rules.replace_rule(rules_for_retry_module, "nGBPerJob", init_gigabytes_per_job, is_sub_rule=True) + rules_for_retry_module = task_split_rules.replace_rule(rules_for_retry_module, "nMaxFilesPerJob", init_max_files_per_job, is_sub_rule=True) + if not dry_mode: + self.changeTaskSplitRulePanda( + jedi_task_id, task_split_rules.split_rule_dict["retryModuleRules"], rules_for_retry_module, useCommit=False, sendLog=True + ) + if current_gigabytes_per_job != target_gigabytes_per_job: + msg_str += f"new nGBPerJob {current_gigabytes_per_job} -> {target_gigabytes_per_job}. " + if not dry_mode: + self.changeTaskSplitRulePanda( + jedi_task_id, task_split_rules.split_rule_dict["nGBPerJob"], target_gigabytes_per_job, useCommit=False, sendLog=True + ) + if current_max_files_per_job != target_max_files_per_job: + msg_str += f"new nMaxFilesPerJob {current_max_files_per_job} -> {target_max_files_per_job}. " + if not dry_mode: + self.changeTaskSplitRulePanda( + jedi_task_id, task_split_rules.split_rule_dict["nMaxFilesPerJob"], target_max_files_per_job, useCommit=False, sendLog=True + ) + tmp_log.debug(msg_str) + # commit + if not dry_mode and not self._commit(): + raise RuntimeError("Commit error") + return True, msg_str + + msg_str = "not applicable" + _logger.debug(msg_str) + return False, msg_str + except Exception: + # roll back + if not dry_mode: + self._rollback() + # error + self.dumpErrorMessage(_logger, method_name) + return None, "failed" + # reset files in JEDI def resetFileStatusInJEDI(self, dn, prodManager, datasetName, lostFiles, recoverParent, simul): comment = " /* DBProxy.resetFileStatusInJEDI */" diff --git a/pandaserver/taskbuffer/TaskBuffer.py b/pandaserver/taskbuffer/TaskBuffer.py index 390157c89..aa3db9123 100755 --- a/pandaserver/taskbuffer/TaskBuffer.py +++ b/pandaserver/taskbuffer/TaskBuffer.py @@ -11,6 +11,7 @@ # logger from pandacommon.pandalogger.PandaLogger import PandaLogger + from pandaserver.brokerage.SiteMapper import SiteMapper from pandaserver.config import panda_config from pandaserver.dataservice.closer import Closer @@ -3119,6 +3120,17 @@ def increaseRamLimitJobJEDI_xtimes(self, job, jobRamCount, jediTaskID, attemptNr # return return ret + # reduce input per job + def reduce_input_per_job(self, panda_id, jedi_task_id, attempt_nr, excluded_rules, steps, dry_mode=False): + # get proxy + proxy = self.proxyPool.getProxy() + # exec + ret = proxy.reduce_input_per_job(panda_id, jedi_task_id, attempt_nr, excluded_rules, steps, dry_mode) + # release proxy + self.proxyPool.putProxy(proxy) + # return + return ret + # reset files in JEDI def resetFileStatusInJEDI(self, dn, prodManager, datasetName, lostFiles, recoverParent, simul=False): # get proxy diff --git a/pandaserver/taskbuffer/retryModule.py b/pandaserver/taskbuffer/retryModule.py index 89307799a..8fe1828c0 100644 --- a/pandaserver/taskbuffer/retryModule.py +++ b/pandaserver/taskbuffer/retryModule.py @@ -1,6 +1,7 @@ import re import sys import time +import traceback from re import error as ReError from pandacommon.pandalogger.PandaLogger import PandaLogger @@ -13,6 +14,7 @@ LIMIT_RETRY = "limit_retry" INCREASE_CPU = "increase_cputime" INCREASE_MEM_XTIMES = "increase_memory_xtimes" +REDUCE_INPUT_PER_JOB = "reduce_input_per_job" def timeit(method): @@ -183,6 +185,23 @@ def preprocess_rules(rules, error_diag_job, release_job, architecture_job, wqid_ filtered_rules.append(rule) break + # See if there is a REDUCE_INPUT_PER_JOB rule. + for rule in rules: + if rule["action"] != REDUCE_INPUT_PER_JOB or not conditions_apply( + error_diag_job, + architecture_job, + release_job, + wqid_job, + rule["error_diag"], + rule["architecture"], + rule["release"], + rule["wqid"], + ): + continue + else: + filtered_rules.append(rule) + break + # See if there is a LIMIT_RETRY rule. Take the narrowest rule, in case of draw take the strictest conditions limit_retry_rule = {} for rule in rules: @@ -249,7 +268,8 @@ def apply_retrial_rules(task_buffer, jobID, errors, attemptNr): try: error_code = int(error_code) except ValueError: - _logger.error(f"Error code ({error_code}) can not be casted to int") + if error_code != "NULL": + _logger.error(f"Error code ({error_code}) can not be casted to int") continue try: rule = retrial_rules[error_source][error_code] @@ -377,7 +397,7 @@ def apply_retrial_rules(task_buffer, jobID, errors, attemptNr): _logger.info(message) except Exception: errtype, errvalue = sys.exc_info()[:2] - _logger.error("Failed to increase RAM xtimes limit : %s %s" % (errtype, errvalue)) + _logger.error(f"Failed to increase RAM xtimes limit : {errtype} {errvalue}") elif action == INCREASE_CPU: try: @@ -410,6 +430,20 @@ def apply_retrial_rules(task_buffer, jobID, errors, attemptNr): errtype, errvalue = sys.exc_info()[:2] _logger.error(f"Failed to increase CPU-Time : {errtype} {errvalue}") + elif action == REDUCE_INPUT_PER_JOB: + try: + applied = False + if active: + applied = task_buffer.reduce_input_per_job( + job.PandaID, job.jediTaskID, job.attemptNr, parameters.get("excluded_rules"), parameters.get("steps") + ) + # Log to pandamon and logfile + message = f"action=reduceInputPerJob for PandaID={jobID} jediTaskID={job.jediTaskID} applied={applied} ( ErrorSource={error_source} ErrorCode={error_code} ErrorDiag: {error_code}. Error/action active={active} error_id={error_id} )" + acted_on_job = True + _logger.info(message) + except Exception as e: + _logger.error(f"Failed to reduce input per job : {e} {traceback.format_exc()}") + _logger.debug(f"Finished rule {rule} for PandaID={jobID} error_source={error_source} error_code={error_code} attemptNr={attemptNr}") except KeyError: diff --git a/pandaserver/taskbuffer/task_split_rules.py b/pandaserver/taskbuffer/task_split_rules.py new file mode 100644 index 000000000..e10007bb2 --- /dev/null +++ b/pandaserver/taskbuffer/task_split_rules.py @@ -0,0 +1,170 @@ +# split rules +split_rule_dict = { + "allowEmptyInput": "AE", + "addNthFieldToLFN": "AN", + "allowPartialFinish": "AP", + "altStageOut": "AT", + "avoidVP": "AV", + "maxCoreCount": "CC", + "cloudAsVO": "CV", + "ddmBackEnd": "DE", + "disableAutoFinish": "DF", + "disableReassign": "DI", + "debugMode": "DM", + "disableAutoRetry": "DR", + "dynamicNumEvents": "DY", + "nEsConsumers": "EC", + "nEventsPerInput": "EI", + "encJobParams": "EJ", + "nEventsPerWorker": "ES", + "firstContentsFeed": "FC", + "failGoalUnreached": "FG", + "fineGrainedProc": "FP", + "firstEvent": "FT", + "fullChain": "FU", + "groupBoundaryID": "GB", + "hpoWorkflow": "HO", + "instantiateTmplSite": "IA", + "inFilePosEvtNum": "IF", + "ipStack": "IK", + "allowInputLAN": "IL", + "ignoreMissingInDS": "IM", + "intermediateTask": "IN", + "ipConnectivity": "IP", + "inputPreStaging": "IS", + "instantiateTmpl": "IT", + "allowInputWAN": "IW", + "noLoopingCheck": "LC", + "useLocalIO": "LI", + "limitedSites": "LS", + "loadXML": "LX", + "minCpuEfficiency": "MC", + "messageDriven": "MD", + "mergeEsOnOS": "ME", + "nMaxFilesPerJob": "MF", + "maxJumboPerSite": "MJ", + "maxNumJobs": "MN", + "mergeOutput": "MO", + "multiStepExec": "MS", + "maxWalltime": "MW", + "maxEventsPerJob": "MX", + "noExecStrCnv": "NC", + "notDiscardEvents": "ND", + "nEventsPerJob": "NE", + "nFilesPerJob": "NF", + "nGBPerJob": "NG", + "noInputPooling": "NI", + "nJumboJobs": "NJ", + "nSitesPerJob": "NS", + "nChunksToWait": "NT", + "noWaitParent": "NW", + "orderInputBy": "OI", + "orderByLB": "OL", + "onSiteMerging": "OM", + "osMatching": "OS", + "onlyTagsForFC": "OT", + "pushStatusChanges": "PC", + "pushJob": "PJ", + "pfnList": "PL", + "putLogToOS": "PO", + "runUntilClosed": "RC", + "registerDatasets": "RD", + "registerEsFiles": "RE", + "respectLB": "RL", + "retryModuleRules": "RM", + "reuseSecOnDemand": "RO", + "releasePerLB": "RP", + "respectSplitRule": "RR", + "randomSeed": "RS", + "retryRamOffset": "RX", + "retryRamStep": "RY", + "resurrectConsumers": "SC", + "switchEStoNormal": "SE", + "stayOutputOnSite": "SO", + "scoutSuccessRate": "SS", + "useSecrets": "ST", + "segmentedWork": "SW", + "totNumJobs": "TJ", + "tgtMaxOutputForNG": "TN", + "t1Weight": "TW", + "useBuild": "UB", + "useJobCloning": "UC", + "useRealNumEvents": "UE", + "useFileAsSourceLFN": "UF", + "usePrePro": "UP", + "useScout": "US", + "usePrefetcher": "UT", + "useExhausted": "UX", + "useZipToPin": "UZ", + "writeInputToFile": "WF", + "waitInput": "WI", + "maxAttemptES": "XA", + "decAttOnFailedES": "XF", + "maxAttemptEsJob": "XJ", + "nEventsPerMergeJob": "ZE", + "nFilesPerMergeJob": "ZF", + "nGBPerMergeJob": "ZG", + "nMaxFilesPerMergeJob": "ZM", +} + + +# extract rules +def extract_rule_values(split_rules, rule_names, is_sub_rule=False): + """ + Extract rule values from split rule string + :param split_rules: comma separated string + :param rule_names: list of rule names + :param is_sub_rule: bool to indicate if the rule is a sub-rule + :return: dict of rule names and values + """ + if split_rules is None: + split_rules = "" + ret = {} + if is_sub_rule: + rule_separator = "|" + key_value_separator = ":" + else: + rule_separator = "," + key_value_separator = "=" + if is_sub_rule: + for tmp_name in rule_names: + ret[tmp_name] = None + for tmp_rule in split_rules.split(rule_separator): + for tmp_name in rule_names: + if tmp_rule.startswith(tmp_name + key_value_separator): + ret[tmp_name] = tmp_rule.split(key_value_separator)[-1] + for tmp_name in rule_names: + if tmp_name not in ret: + ret[tmp_name] = None + return ret + + +# replace a rule +def replace_rule(split_rules, rule_name, rule_value, is_sub_rule=False): + """ + Replace a rule in the split rule string + :param split_rules: comma separated string + :param rule_name: rule name + :param rule_value: rule value + :param is_sub_rule: bool to indicate if the rule is a sub-rule + :return: string of split rules + """ + if split_rules is None: + split_rules = "" + if is_sub_rule: + rule_separator = "|" + key_value_separator = ":" + else: + rule_separator = "," + key_value_separator = "=" + tmp_str = "" + for tmp_rule in split_rules.split(rule_separator): + if tmp_rule.startswith(rule_name + key_value_separator): + continue + if tmp_str != "": + tmp_str += rule_separator + tmp_str += tmp_rule + if tmp_str != "": + tmp_str += rule_separator + tmp_str += rule_name + key_value_separator + str(rule_value) + return tmp_str