Skip to content

Commit

Permalink
feat: Workflow minor fix
Browse files Browse the repository at this point in the history
  • Loading branch information
aldbr committed Jan 16, 2024
1 parent f8e5650 commit 400b327
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 7 deletions.
3 changes: 1 addition & 2 deletions src/DIRAC/Core/Workflow/Workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,7 @@ def createStepInstance(self, type_o, name):
raise KeyError("Can not find StepDefinition " + type + " to create StepInstrance " + name)

def removeStepInstance(self, name):
self.step_instances[name].setParent(None)
self.step_instances.delete(name)
self.step_instances = [step for step in self.step_instances if step.getName() != name]

def updateParents(self):
self.module_definitions.updateParents(self)
Expand Down
12 changes: 7 additions & 5 deletions src/DIRAC/WorkloadManagementSystem/Agent/PushJobAgent.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def __init__(self, agentName, loadName, baseAgentName=False, properties=None):
# Choose the submission policy
# - Workflow: the agent will submit a workflow to a PoolCE, the workflow is responsible for interacting with the remote site
# - JobWrapper: the agent will submit a JobWrapper directly to the remote site, it is responsible of the remote execution
self.submissionPolicy = "Workflow"
self.submissionPolicy = "Application"

# cleanTask is used to clean the task in the remote site
self.cleanTask = True
Expand All @@ -82,7 +82,7 @@ def initialize(self):
# Get the submission policy
# Initialized here because it cannot be dynamically modified during the execution
self.submissionPolicy = self.am_getOption("SubmissionPolicy", self.submissionPolicy)
if self.submissionPolicy not in ["Workflow", "JobWrapper"]:
if self.submissionPolicy not in ["Application", "JobWrapper"]:
return S_ERROR("SubmissionPolicy must be either Workflow or JobWrapper")

result = self._initializeComputingElement("Pool")
Expand Down Expand Up @@ -169,7 +169,7 @@ def execute(self):
if not result["OK"] or result["Value"]:
return result

if self.submissionPolicy == "Workflow":
if self.submissionPolicy == "Application":
# Check errors that could have occurred during job submission and/or execution
# Status are handled internally, and therefore, not checked outside of the method
result = self._checkSubmittedJobs()
Expand Down Expand Up @@ -284,7 +284,7 @@ def execute(self):

# Submit the job to the CE
self.log.debug(f"Before self._submitJob() ({self.ceName}CE)")
if self.submissionPolicy == "Workflow":
if self.submissionPolicy == "Application":
resultSubmission = self._submitJob(
jobID=jobID,
jobParams=params,
Expand Down Expand Up @@ -421,9 +421,11 @@ def _setCEDict(self, ceDict):
ceDict["ReleaseProject"] = project

# Add a RemoteExecution entry, which can be used in the next stages
if self.submissionPolicy == "Workflow":
if self.submissionPolicy == "Application":
ceDict["RemoteExecution"] = True

ceDict["SubmissionPolicy"] = self.submissionPolicy

def _checkMatchingIssues(self, jobRequest):
"""Check the source of the matching issue
Expand Down

0 comments on commit 400b327

Please sign in to comment.