diff --git a/PandaPkgInfo.py b/PandaPkgInfo.py index 8fb6f2c63..209441d25 100644 --- a/PandaPkgInfo.py +++ b/PandaPkgInfo.py @@ -1 +1 @@ -release_version = "0.0.66" +release_version = "0.0.67" diff --git a/pandaserver/srvcore/CoreUtils.py b/pandaserver/srvcore/CoreUtils.py index 198072575..1287770bf 100644 --- a/pandaserver/srvcore/CoreUtils.py +++ b/pandaserver/srvcore/CoreUtils.py @@ -1,5 +1,6 @@ import re import os +import json import datetime import subprocess from threading import Lock @@ -158,3 +159,18 @@ def __contains__(self, item): def __getitem__(self, name): self.update() return self.cachedObj[name] + + +# convert datetime to string +class NonJsonObjectEncoder(json.JSONEncoder): + def default(self, obj): + if isinstance(obj, datetime.datetime): + return {"_datetime_object": obj.strftime("%Y-%m-%d %H:%M:%S.%f")} + return json.JSONEncoder.default(self, obj) + + +# hook for json decoder +def as_python_object(dct): + if "_datetime_object" in dct: + return datetime.datetime.strptime(str(dct["_datetime_object"]), "%Y-%m-%d %H:%M:%S.%f") + return dct diff --git a/pandaserver/taskbuffer/FileSpec.py b/pandaserver/taskbuffer/FileSpec.py index 4339e6db2..0e78be627 100755 --- a/pandaserver/taskbuffer/FileSpec.py +++ b/pandaserver/taskbuffer/FileSpec.py @@ -269,3 +269,10 @@ def isAllowedNoOutput(self): except Exception: pass return False + + # dump to be json-serializable + def dump_to_json_serializable(self): + stat = self.__getstate__()[:-1] + # set None as _owner + stat.append(None) + return stat diff --git a/pandaserver/taskbuffer/JobSpec.py b/pandaserver/taskbuffer/JobSpec.py index c9bea53f0..7487f1a33 100755 --- a/pandaserver/taskbuffer/JobSpec.py +++ b/pandaserver/taskbuffer/JobSpec.py @@ -7,6 +7,8 @@ import json import datetime +from pandaserver.taskbuffer.FileSpec import FileSpec + reserveChangedState = False @@ -855,6 +857,28 @@ def set_ram_for_retry(self, val): newItems.append("{0}:{1}".format(self._tagForSH["retryRam"], val)) self.specialHandling = ",".join(newItems) + # dump to json-serializable + def dump_to_json_serializable(self): + job_state = self.__getstate__() + file_state_list = [] + for file_spec in job_state[-1]: + file_stat = file_spec.dump_to_json_serializable() + file_state_list.append(file_stat) + job_state = job_state[:-1] + # append files + job_state.append(file_state_list) + return job_state + + # load from json-serializable + def load_from_json_serializable(self, job_state): + # initialize with empty file list + self.__setstate__(job_state[:-1] + [[]]) + # add files + for file_stat in job_state[-1]: + file_spec = FileSpec() + file_spec.__setstate__(file_stat) + self.addFile(file_spec) + # utils diff --git a/pandaserver/taskbuffer/JobUtils.py b/pandaserver/taskbuffer/JobUtils.py index 8664e3b5f..4d8c4d7b5 100644 --- a/pandaserver/taskbuffer/JobUtils.py +++ b/pandaserver/taskbuffer/JobUtils.py @@ -1,4 +1,9 @@ import re +import json + +from pandaserver.taskbuffer.JobSpec import JobSpec + +from pandaserver.srvcore.CoreUtils import NonJsonObjectEncoder, as_python_object try: long @@ -179,3 +184,22 @@ def compensate_ram_count(ram_count): if ram_count is not None: ram_count = int(ram_count * 0.90) return ram_count + + +# dump jobs to serialized json +def dump_jobs_json(jobs): + state_objects = [] + for job_spec in jobs: + state_objects.append(job_spec.dump_to_json_serializable()) + return json.dumps(state_objects, cls=NonJsonObjectEncoder) + + +# load serialized json to jobs +def load_jobs_json(state): + state_objects = json.loads(state, object_hook=as_python_object) + jobs = [] + for job_state in state_objects: + job_spec = JobSpec() + job_spec.load_from_json_serializable(job_state) + jobs.append(job_spec) + return jobs diff --git a/pandaserver/taskbuffer/workflow_processor.py b/pandaserver/taskbuffer/workflow_processor.py index 830da4a7b..5bd81905b 100644 --- a/pandaserver/taskbuffer/workflow_processor.py +++ b/pandaserver/taskbuffer/workflow_processor.py @@ -179,7 +179,9 @@ def core_exec(sandbox_url, log_token, dump_workflow, ops_file, user_name, test_m # parse workflow files if is_OK: tmpLog.info("parse workflow") + workflow_name = None if ops["data"]["language"] == "cwl": + workflow_name = ops["data"].get("workflow_name") nodes, root_in = pcwl_utils.parse_workflow_file(ops["data"]["workflowSpecFile"], tmpLog) with open(ops["data"]["workflowInputFile"]) as workflow_input: data = yaml.safe_load(workflow_input) @@ -229,13 +231,14 @@ def core_exec(sandbox_url, log_token, dump_workflow, ops_file, user_name, test_m ( workflow_to_submit, dump_str_list, - ) = workflow_utils.convert_nodes_to_workflow(nodes) + ) = workflow_utils.convert_nodes_to_workflow(nodes, workflow_name=workflow_name) try: if workflow_to_submit: if not test_mode: tmpLog.info("submit workflow") wm = ClientManager(host=get_rest_host()) - request_id = wm.submit(workflow_to_submit, username=user_name) + request_id = wm.submit(workflow_to_submit, username=user_name, + use_dataset_name=False) else: dump_str = "workflow is empty" tmpLog.error(dump_str) diff --git a/pandaserver/userinterface/UserIF.py b/pandaserver/userinterface/UserIF.py index 273fb4eed..5530c8f7e 100644 --- a/pandaserver/userinterface/UserIF.py +++ b/pandaserver/userinterface/UserIF.py @@ -209,10 +209,10 @@ def checkSandboxFile(self, userName, fileSize, checkSum): return ret # get job status - def getJobStatus(self, idsStr, use_json): + def getJobStatus(self, idsStr, use_json, no_pickle=False): try: # deserialize IDs - if use_json: + if use_json or no_pickle: ids = json.loads(idsStr) else: ids = WrappedPickle.loads(idsStr) @@ -232,6 +232,8 @@ def getJobStatus(self, idsStr, use_json): # serialize if use_json: return json.dumps(ret) + if no_pickle: + return JobUtils.dump_jobs_json(ret) return WrappedPickle.dumps(ret) # get PandaID with jobexeID @@ -1181,8 +1183,8 @@ def runTaskAssignment(req, jobs): # get job status -def getJobStatus(req, ids): - return userIF.getJobStatus(ids, req.acceptJson()) +def getJobStatus(req, ids, no_pickle=None): + return userIF.getJobStatus(ids, req.acceptJson(), no_pickle) # get PandaID with jobexeID diff --git a/pandaserver/workflow/workflow_utils.py b/pandaserver/workflow/workflow_utils.py index bbcba15a6..2e1b45077 100644 --- a/pandaserver/workflow/workflow_utils.py +++ b/pandaserver/workflow/workflow_utils.py @@ -674,17 +674,18 @@ def get_dict_form(self, serial_id=None, dict_form=None): # convert nodes to workflow -def convert_nodes_to_workflow(nodes, workflow_node=None, workflow=None): +def convert_nodes_to_workflow(nodes, workflow_node=None, workflow=None, workflow_name=None): if workflow is None: is_top = True workflow = Workflow() + workflow.name = workflow_name else: is_top = False id_work_map = {} all_sub_id_work_map = {} sub_to_id_map = {} cond_dump_str = " Conditions\n" - class_dump_str = "===== Workflow ID:{} ====\n".format(workflow_node.id if workflow_node else None) + class_dump_str = "===== Workflow ID:{} ====\n".format(workflow_node.id if workflow_node else workflow_name) class_dump_str += " Works\n" dump_str_list = [] # create works or workflows