Skip to content

Commit

Permalink
Merge pull request #318 from tmaeno/master
Browse files Browse the repository at this point in the history
added reduce_intput_per_job retry action
  • Loading branch information
tmaeno authored Mar 20, 2024
2 parents 4e77b8d + 72bf455 commit 4174a42
Show file tree
Hide file tree
Showing 5 changed files with 368 additions and 3 deletions.
2 changes: 1 addition & 1 deletion PandaPkgInfo.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
release_version = "0.3.0"
release_version = "0.3.1"
149 changes: 149 additions & 0 deletions pandaserver/taskbuffer/OraDBProxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
PrioUtil,
ProcessGroups,
SiteSpec,
task_split_rules,
)
from pandaserver.taskbuffer.CloudTaskSpec import CloudTaskSpec
from pandaserver.taskbuffer.DatasetSpec import DatasetSpec
Expand Down Expand Up @@ -18792,6 +18793,154 @@ def increaseRamLimitJobJEDI_xtimes(self, job, jobRamCount, jediTaskID, attemptNr
self.dumpErrorMessage(_logger, methodName)
return False

# reduce input per job
def reduce_input_per_job(self, panda_id, jedi_task_id, attempt_nr, excluded_rules, steps, dry_mode):
comment = " /* DBProxy.reduce_input_per_job */"
method_name = comment.split(" ")[-2].split(".")[-1]
tmp_log = LogWrapper(_logger, method_name + f" < PandaID={panda_id} jediTaskID={jedi_task_id} attemptNr={attempt_nr} >")
tmp_log.debug("start")
try:
# rules to skip action when they are set
if not excluded_rules:
excluded_rules = ["nEventsPerJob", "nFilesPerJob"]
else:
excluded_rules = excluded_rules.split(",")

# thresholds with attempt numbers to trigger actions
if not steps:
threshold_low = 2
threshold_middle = 4
threshold_high = 7
else:
threshold_low, threshold_middle, threshold_high = [int(s) for s in steps.split(",")]

# if no task associated to job don't take any action
if jedi_task_id in [None, 0, "NULL"]:
msg_str = "skipping since no task associated to job"
tmp_log.debug(msg_str)
return False, msg_str

# check attempt number
if attempt_nr < threshold_low:
msg_str = f"skipping since not enough attempts ({attempt_nr} < {threshold_low}) have been made"
tmp_log.debug(msg_str)
return False, msg_str

# get current split rules
var_map = {":jediTaskID": jedi_task_id}
sql_gr = f"SELECT splitRule FROM {panda_config.schemaJEDI}.JEDI_Tasks "
sql_gr += "WHERE jediTaskID=:jediTaskID "
self.cur.execute(sql_gr + comment, var_map)
(split_rule,) = self.cur.fetchone()

# extract split rule values
rule_values = task_split_rules.extract_rule_values(
split_rule, ["nEventsPerJob", "nFilesPerJob", "nGBPerJob", "nMaxFilesPerJob", "retryModuleRules"]
)

# no action if num events or files per job is specified
for rule_name in excluded_rules:
if rule_values[rule_name]:
msg_str = f"skipping since task uses {rule_name}"
tmp_log.debug(msg_str)
return False, msg_str

# current max number of files or gigabytes per job
current_max_files_per_job = rule_values["nMaxFilesPerJob"]
if current_max_files_per_job:
current_max_files_per_job = int(current_max_files_per_job)
current_gigabytes_per_job = rule_values["nGBPerJob"]
if current_gigabytes_per_job:
current_gigabytes_per_job = int(current_gigabytes_per_job)

# initial max number of files or gigabytes per job for retry module
rules_for_retry_module = rule_values["retryModuleRules"]
rule_values_for_retry_module = task_split_rules.extract_rule_values(rules_for_retry_module, ["nGBPerJob", "nMaxFilesPerJob"], is_sub_rule=True)
init_gigabytes_per_job = rule_values_for_retry_module["nGBPerJob"]
init_max_files_per_job = rule_values_for_retry_module["nMaxFilesPerJob"]

# set initial values for the first action
set_init_rules = False
if not init_gigabytes_per_job:
set_init_rules = True
if current_gigabytes_per_job:
init_gigabytes_per_job = current_gigabytes_per_job
else:
# use current job size as initial gigabytes per job for retry module
var_map = {":PandaID": panda_id}
sql_fz = f"SELECT SUM(fsize) FROM {panda_config.schemaPANDA}.filesTable4 "
sql_fz += "WHERE PandaID=:PandaID "
self.cur.execute(sql_fz + comment, var_map)
(init_gigabytes_per_job,) = self.cur.fetchone()
init_gigabytes_per_job = math.ceil(init_gigabytes_per_job / 1024 / 1024 / 1024)
if not init_max_files_per_job:
set_init_rules = True
if current_max_files_per_job:
init_max_files_per_job = current_max_files_per_job
else:
# use current job size as initial max number of files per job for retry module
var_map = {":PandaID": panda_id, ":jediTaskID": jedi_task_id, ":type1": "input", ":type2": "pseudo_input"}
sql_fc = f"SELECT COUNT(*) FROM {panda_config.schemaPANDA}.filesTable4 tabF, {panda_config.schemaJEDI}.JEDI_Datasets tabD "
sql_fc += (
"WHERE tabD.jediTaskID=:jediTaskID AND tabD.type IN (:type1, :type2) AND tabD.masterID IS NULL "
"AND tabF.PandaID=:PandaID AND tabF.datasetID=tabD.datasetID "
)
self.cur.execute(sql_fc + comment, var_map)
(init_max_files_per_job,) = self.cur.fetchone()

# set target based on attempt number
if attempt_nr < threshold_middle:
target_gigabytes_per_job = math.floor(init_gigabytes_per_job / 2)
target_max_files_per_job = math.floor(init_max_files_per_job / 2)
elif attempt_nr < threshold_high:
target_gigabytes_per_job = math.floor(init_gigabytes_per_job / 4)
target_max_files_per_job = math.floor(init_max_files_per_job / 4)
else:
target_gigabytes_per_job = 1
target_max_files_per_job = 1
target_gigabytes_per_job = max(1, target_gigabytes_per_job)
target_max_files_per_job = max(1, target_max_files_per_job)

# update rules when initial values were unset or new values need to be set
if set_init_rules or current_gigabytes_per_job != target_gigabytes_per_job or current_max_files_per_job != target_max_files_per_job:
msg_str = "update splitRule: "
if set_init_rules:
msg_str += f"initial nGBPerJob={init_gigabytes_per_job} nMaxFilesPerJob={init_max_files_per_job}. "
rules_for_retry_module = task_split_rules.replace_rule(rules_for_retry_module, "nGBPerJob", init_gigabytes_per_job, is_sub_rule=True)
rules_for_retry_module = task_split_rules.replace_rule(rules_for_retry_module, "nMaxFilesPerJob", init_max_files_per_job, is_sub_rule=True)
if not dry_mode:
self.changeTaskSplitRulePanda(
jedi_task_id, task_split_rules.split_rule_dict["retryModuleRules"], rules_for_retry_module, useCommit=False, sendLog=True
)
if current_gigabytes_per_job != target_gigabytes_per_job:
msg_str += f"new nGBPerJob {current_gigabytes_per_job} -> {target_gigabytes_per_job}. "
if not dry_mode:
self.changeTaskSplitRulePanda(
jedi_task_id, task_split_rules.split_rule_dict["nGBPerJob"], target_gigabytes_per_job, useCommit=False, sendLog=True
)
if current_max_files_per_job != target_max_files_per_job:
msg_str += f"new nMaxFilesPerJob {current_max_files_per_job} -> {target_max_files_per_job}. "
if not dry_mode:
self.changeTaskSplitRulePanda(
jedi_task_id, task_split_rules.split_rule_dict["nMaxFilesPerJob"], target_max_files_per_job, useCommit=False, sendLog=True
)
tmp_log.debug(msg_str)
# commit
if not dry_mode and not self._commit():
raise RuntimeError("Commit error")
return True, msg_str

msg_str = "not applicable"
_logger.debug(msg_str)
return False, msg_str
except Exception:
# roll back
if not dry_mode:
self._rollback()
# error
self.dumpErrorMessage(_logger, method_name)
return None, "failed"

# reset files in JEDI
def resetFileStatusInJEDI(self, dn, prodManager, datasetName, lostFiles, recoverParent, simul):
comment = " /* DBProxy.resetFileStatusInJEDI */"
Expand Down
12 changes: 12 additions & 0 deletions pandaserver/taskbuffer/TaskBuffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

# logger
from pandacommon.pandalogger.PandaLogger import PandaLogger

from pandaserver.brokerage.SiteMapper import SiteMapper
from pandaserver.config import panda_config
from pandaserver.dataservice.closer import Closer
Expand Down Expand Up @@ -3119,6 +3120,17 @@ def increaseRamLimitJobJEDI_xtimes(self, job, jobRamCount, jediTaskID, attemptNr
# return
return ret

# reduce input per job
def reduce_input_per_job(self, panda_id, jedi_task_id, attempt_nr, excluded_rules, steps, dry_mode=False):
# get proxy
proxy = self.proxyPool.getProxy()
# exec
ret = proxy.reduce_input_per_job(panda_id, jedi_task_id, attempt_nr, excluded_rules, steps, dry_mode)
# release proxy
self.proxyPool.putProxy(proxy)
# return
return ret

# reset files in JEDI
def resetFileStatusInJEDI(self, dn, prodManager, datasetName, lostFiles, recoverParent, simul=False):
# get proxy
Expand Down
38 changes: 36 additions & 2 deletions pandaserver/taskbuffer/retryModule.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import re
import sys
import time
import traceback
from re import error as ReError

from pandacommon.pandalogger.PandaLogger import PandaLogger
Expand All @@ -13,6 +14,7 @@
LIMIT_RETRY = "limit_retry"
INCREASE_CPU = "increase_cputime"
INCREASE_MEM_XTIMES = "increase_memory_xtimes"
REDUCE_INPUT_PER_JOB = "reduce_input_per_job"


def timeit(method):
Expand Down Expand Up @@ -183,6 +185,23 @@ def preprocess_rules(rules, error_diag_job, release_job, architecture_job, wqid_
filtered_rules.append(rule)
break

# See if there is a REDUCE_INPUT_PER_JOB rule.
for rule in rules:
if rule["action"] != REDUCE_INPUT_PER_JOB or not conditions_apply(
error_diag_job,
architecture_job,
release_job,
wqid_job,
rule["error_diag"],
rule["architecture"],
rule["release"],
rule["wqid"],
):
continue
else:
filtered_rules.append(rule)
break

# See if there is a LIMIT_RETRY rule. Take the narrowest rule, in case of draw take the strictest conditions
limit_retry_rule = {}
for rule in rules:
Expand Down Expand Up @@ -249,7 +268,8 @@ def apply_retrial_rules(task_buffer, jobID, errors, attemptNr):
try:
error_code = int(error_code)
except ValueError:
_logger.error(f"Error code ({error_code}) can not be casted to int")
if error_code != "NULL":
_logger.error(f"Error code ({error_code}) can not be casted to int")
continue
try:
rule = retrial_rules[error_source][error_code]
Expand Down Expand Up @@ -377,7 +397,7 @@ def apply_retrial_rules(task_buffer, jobID, errors, attemptNr):
_logger.info(message)
except Exception:
errtype, errvalue = sys.exc_info()[:2]
_logger.error("Failed to increase RAM xtimes limit : %s %s" % (errtype, errvalue))
_logger.error(f"Failed to increase RAM xtimes limit : {errtype} {errvalue}")

elif action == INCREASE_CPU:
try:
Expand Down Expand Up @@ -410,6 +430,20 @@ def apply_retrial_rules(task_buffer, jobID, errors, attemptNr):
errtype, errvalue = sys.exc_info()[:2]
_logger.error(f"Failed to increase CPU-Time : {errtype} {errvalue}")

elif action == REDUCE_INPUT_PER_JOB:
try:
applied = False
if active:
applied = task_buffer.reduce_input_per_job(
job.PandaID, job.jediTaskID, job.attemptNr, parameters.get("excluded_rules"), parameters.get("steps")
)
# Log to pandamon and logfile
message = f"action=reduceInputPerJob for PandaID={jobID} jediTaskID={job.jediTaskID} applied={applied} ( ErrorSource={error_source} ErrorCode={error_code} ErrorDiag: {error_code}. Error/action active={active} error_id={error_id} )"
acted_on_job = True
_logger.info(message)
except Exception as e:
_logger.error(f"Failed to reduce input per job : {e} {traceback.format_exc()}")

_logger.debug(f"Finished rule {rule} for PandaID={jobID} error_source={error_source} error_code={error_code} attemptNr={attemptNr}")

except KeyError:
Expand Down
Loading

0 comments on commit 4174a42

Please sign in to comment.