diff --git a/client/paddleflow/job/job_info.py b/client/paddleflow/job/job_info.py index 35858a961..cdb054bb4 100644 --- a/client/paddleflow/job/job_info.py +++ b/client/paddleflow/job/job_info.py @@ -13,8 +13,9 @@ limitations under the License. """ -#!/usr/bin/env python3 +# !/usr/bin/env python3 # -*- coding:utf8 -*- +from paddleflow.common.exception import PaddleFlowSDKException class JobInfo(object): @@ -134,8 +135,10 @@ class Member(object): Members """ - def __init__(self, role, replicas, job_id=None, job_name=None, queue=None, labels=None, annotations=None, priority=None, - flavour=None, fs=None, extra_fs_list=None, image=None, env=None, command=None, args_list=None, port=None, + def __init__(self, role, replicas, job_id=None, job_name=None, queue=None, labels=None, annotations=None, + priority=None, + flavour=None, fs=None, extra_fs_list=None, image=None, env=None, command=None, args_list=None, + port=None, extension_template=None): """ @@ -175,11 +178,55 @@ def __init__(self, role, replicas, job_id=None, job_name=None, queue=None, label self.port = port self.extension_template = extension_template + def compile(self): + + result = {} + + if not self.role: + raise PaddleFlowSDKException(f"the role of distributed member cannot be empty") + + result["role"] = self.role + + if not self.replicas: + raise PaddleFlowSDKException(f"the replicas of distributed member cannot be empty") + + result["replicas"] = self.replicas + + if self.image: + result["image"] = self.image + + if self.command: + result["command"] = self.command + + if self.flavour: + result["flavour"] = self.flavour + + if self.fs: + result["fs"] = self.fs + + if self.extra_fs_list: + result["extra_fs"] = self.extra_fs_list + + if self.port: + result["port"] = self.port + + if self.env: + result["env"] = self.env + + if self.annotations: + result["annotations"] = self.annotations + + if self.labels: + result["labels"] = self.labels + + return result + class Flavour(object): """ Flavour """ + def __init__(self, name, cpu, memory, scalar_resources): """ @@ -198,12 +245,9 @@ class FileSystem(object): """ FileSystem """ + def __init__(self, name, mount_path, sub_path, read_only): self.name = name self.mount_path = mount_path self.sub_path = sub_path self.read_only = read_only - - - - diff --git a/client/paddleflow/pipeline/__init__.py b/client/paddleflow/pipeline/__init__.py index bf63bd01d..0e386cf6d 100644 --- a/client/paddleflow/pipeline/__init__.py +++ b/client/paddleflow/pipeline/__init__.py @@ -28,6 +28,7 @@ from .dsl import Artifact from .dsl import Parameter from .dsl import ContainerStep +from .dsl import DistributedJob from .dsl import DAG from .dsl import Pipeline from .dsl import FAIL_CONTINUE diff --git a/client/paddleflow/pipeline/dsl/__init__.py b/client/paddleflow/pipeline/dsl/__init__.py index ff211a340..b1cc4d713 100644 --- a/client/paddleflow/pipeline/dsl/__init__.py +++ b/client/paddleflow/pipeline/dsl/__init__.py @@ -22,7 +22,7 @@ from .options import FSOptions from .options import ExtraFS from .options import MainFS - +from .options import DistributedJob # import from io_types from .io_types import Artifact diff --git a/client/paddleflow/pipeline/dsl/compiler/step_compiler.py b/client/paddleflow/pipeline/dsl/compiler/step_compiler.py index 04bb19d8f..f9dc85852 100644 --- a/client/paddleflow/pipeline/dsl/compiler/step_compiler.py +++ b/client/paddleflow/pipeline/dsl/compiler/step_compiler.py @@ -16,6 +16,7 @@ from .component_compiler import ComponentCompiler from paddleflow.pipeline.dsl.options import ExtraFS +from paddleflow.pipeline.dsl.options import DistributedJob from paddleflow.pipeline.dsl.utils.consts import PipelineDSLError from paddleflow.common.exception.paddleflow_sdk_exception import PaddleFlowSDKException @@ -28,7 +29,7 @@ def __init__(self, component): super().__init__(component) def compile(self): - """ trans step to dicts' + """ trans step to dicts """ super().compile() @@ -39,9 +40,12 @@ def compile(self): # compile extra_fs self._compile_extra_fs() + + # compile distributed_job + self._compile_distributed_job() return self._dict - + def _compile_base_info(self): """ compiler base info such as command, docker_env, env """ @@ -70,7 +74,7 @@ def _compile_cache_options(self): """ if self._component.cache_options: self._dict["cache"] = self._component.cache_options.compile() - + def _compile_extra_fs(self): """ compile extra_fs """ @@ -85,4 +89,10 @@ def _compile_extra_fs(self): raise PaddleFlowSDKException(PipelineDSLError, self._generate_error_msg("Step's extra_fs attribute should be a list of ExtraFS instance")) - self._dict["extra_fs"].append(extra.compile()) \ No newline at end of file + self._dict["extra_fs"].append(extra.compile()) + + def _compile_distributed_job(self): + """ compile distributed_job + """ + if self._component.distributed_job: + self._dict["distributed_job"] = self._component.distributed_job.compile() diff --git a/client/paddleflow/pipeline/dsl/component/steps/container_step.py b/client/paddleflow/pipeline/dsl/component/steps/container_step.py index 55f0bcab4..139a7308a 100644 --- a/client/paddleflow/pipeline/dsl/component/steps/container_step.py +++ b/client/paddleflow/pipeline/dsl/component/steps/container_step.py @@ -25,6 +25,7 @@ from paddleflow.pipeline.dsl.io_types import EnvDict from paddleflow.pipeline.dsl.options import CacheOptions from paddleflow.pipeline.dsl.options import ExtraFS +from paddleflow.pipeline.dsl.options import DistributedJob from paddleflow.pipeline.dsl.utils.util import validate_string_by_regex from paddleflow.pipeline.dsl.utils.consts import COMPONENT_NAME_REGEX from paddleflow.pipeline.dsl.utils.consts import PipelineDSLError @@ -49,6 +50,7 @@ def __init__( cache_options: CacheOptions=None, condition: str=None, loop_argument: Union[List, Parameter, Artifact, str]=None, + distributed_job: DistributedJob=None, extra_fs: List[ExtraFS] = None ): """ create a new instance of ContainerStep @@ -62,7 +64,7 @@ def __init__( parameters (str, Any): Parameter of step, the key is the name of this parameter, and the value could be int, string, Paramter, or upstream Step's Parameter env (Dict[str, str]): enviroment varible for Step runtime cache_options (CacheOptions): the cache options of step - + distributed_job (DistributedJob): the distributed jobs defined in step Raises: PaddleFlowSDKException: if some args is illegal """ @@ -72,6 +74,7 @@ def __init__( outputs=outputs, parameters=parameters, cache_options=cache_options, + distributed_job=distributed_job, condition=condition, loop_argument=loop_argument, extra_fs=extra_fs diff --git a/client/paddleflow/pipeline/dsl/component/steps/step.py b/client/paddleflow/pipeline/dsl/component/steps/step.py index ca9f4d989..69d0e5578 100644 --- a/client/paddleflow/pipeline/dsl/component/steps/step.py +++ b/client/paddleflow/pipeline/dsl/component/steps/step.py @@ -16,8 +16,8 @@ import copy from pathlib import Path from typing import Dict -from typing import Any -from typing import List +from typing import Any +from typing import List from typing import Union from paddleflow.pipeline.dsl.component import Component @@ -25,24 +25,28 @@ from paddleflow.pipeline.dsl.io_types import Parameter from paddleflow.pipeline.dsl.options import CacheOptions from paddleflow.pipeline.dsl.options import ExtraFS +from paddleflow.pipeline.dsl.options import DistributedJob from paddleflow.pipeline.dsl.utils.consts import PipelineDSLError from paddleflow.common.exception.paddleflow_sdk_exception import PaddleFlowSDKException +from paddleflow.job import Member class Step(Component): """ Step is the basic scheduling unit in pipeline """ + def __init__( - self, - name: str, - inputs: Dict[str, Artifact]=None, - outputs: Dict[str, Artifact]=None, - parameters: Dict[str, Any]=None, - cache_options: CacheOptions=None, - condition: str=None, - loop_argument: Union[List, Parameter, Artifact, str]=None, - extra_fs: List[ExtraFS] = None - ): + self, + name: str, + inputs: Dict[str, Artifact] = None, + outputs: Dict[str, Artifact] = None, + parameters: Dict[str, Any] = None, + cache_options: CacheOptions = None, + condition: str = None, + loop_argument: Union[List, Parameter, Artifact, str] = None, + extra_fs: List[ExtraFS] = None, + distributed_job: DistributedJob = None, + ): """ create a new instance of Step Args: @@ -54,6 +58,7 @@ def __init__( condition (str): the condition. when schedule component, would be calculate condition, if the result of it is False, then the status of this component would be set "skipped" loop_argument (Union[List, Parameter, Artifact, str]): the loop arugment, when schedule this component, would be be traversed, and the runtime will be created once for each item in it extra_fs (List[ExtraFS]): the paddleflow filesystem used by Step + distributed_job (DistributedJob): the distributed jobs of step Raises: PaddleFlowSDKException: if some args is illegal """ @@ -61,18 +66,21 @@ def __init__( self._type = "step" self.cache_options = cache_options - + self.distributed_job = distributed_job if extra_fs and not isinstance(extra_fs, list): extra_fs = [extra_fs] - + if isinstance(extra_fs, list): self.extra_fs = [] for extra in extra_fs: if not isinstance(extra, ExtraFS): - raise PaddleFlowSDKException(PipelineDSLError, - self._generate_error_msg("Step's extra_fs attribute should be a list of ExtraFS instance")) + raise PaddleFlowSDKException(PipelineDSLError, + self._generate_error_msg( + "Step's extra_fs attribute should be a list of ExtraFS instance")) self.extra_fs.append(extra) else: - self.extra_fs = [] \ No newline at end of file + self.extra_fs = [] + + diff --git a/client/paddleflow/pipeline/dsl/options/__init__.py b/client/paddleflow/pipeline/dsl/options/__init__.py index b3ba1fe51..db7c6b04e 100644 --- a/client/paddleflow/pipeline/dsl/options/__init__.py +++ b/client/paddleflow/pipeline/dsl/options/__init__.py @@ -21,4 +21,5 @@ from .fs_options import FSOptions from .fs_options import ExtraFS from .fs_options import MainFS +from .distributed_job import DistributedJob diff --git a/client/paddleflow/pipeline/dsl/options/distributed_job.py b/client/paddleflow/pipeline/dsl/options/distributed_job.py new file mode 100644 index 000000000..207617174 --- /dev/null +++ b/client/paddleflow/pipeline/dsl/options/distributed_job.py @@ -0,0 +1,61 @@ +from paddleflow.job import Member + +from .options import Options +from typing import List + +from paddleflow.common.exception import PaddleFlowSDKException +from paddleflow.pipeline.dsl.utils.consts import PipelineDSLError + + +class DistributedJob(object): + """ distributed job which used by step + """ + + def __init__( + self, + framework: str, + members: List[Member] = None, + ): + """ create a new instance for DistributedJob + + Args: + framework (str): the framework of distributed job + members List(Member): the members defined in distributed job + """ + self.framework = framework + + if members: + if not isinstance(members, list): + members = [members] + + for member in members: + if not isinstance(member, Member): + raise PaddleFlowSDKException(PipelineDSLError, + "DistributedJob's members attribute should be a list of Member instance") + + self.members = members + else: + self.members = None + + def compile(self): + """ trans to dict + """ + result = {} + + if not self.framework: + raise PaddleFlowSDKException(PipelineDSLError, "DistributedJob's framework attribute cannot empty") + + result["framework"] = self.framework + result["members"] = [] + if self.members: + + if not isinstance(self.members, list): + self.members = [self.members] + + for member in self.members: + if not isinstance(member, Member): + raise PaddleFlowSDKException(PipelineDSLError, + "DistributedJob's members attribute should be a list of Member instance") + result["members"].append(member.compile()) + + return result diff --git a/client/paddleflow/pipeline/dsl/options/fs_options.py b/client/paddleflow/pipeline/dsl/options/fs_options.py index f93aafe4c..7b7e6ed8d 100644 --- a/client/paddleflow/pipeline/dsl/options/fs_options.py +++ b/client/paddleflow/pipeline/dsl/options/fs_options.py @@ -30,7 +30,7 @@ def __init__( mount_path: str=None, read_only: bool=False ): - """ create an new instance for ExtraFS + """ create a new instance for ExtraFS Args: name (str): the name of paddleflow filesystem diff --git a/client/paddleflow/pipeline/dsl/options/options.py b/client/paddleflow/pipeline/dsl/options/options.py index 04d3ed7ad..7ad87b177 100644 --- a/client/paddleflow/pipeline/dsl/options/options.py +++ b/client/paddleflow/pipeline/dsl/options/options.py @@ -26,7 +26,7 @@ def compile(self): """ self._validate() - result = {} + result = {} for attr, key in self.COMPILE_ATTR_MAP.items(): value = getattr(self, attr, None) if value is not None: diff --git a/client/paddleflow/run/run_api.py b/client/paddleflow/run/run_api.py index 254ce75a6..a02e30a26 100644 --- a/client/paddleflow/run/run_api.py +++ b/client/paddleflow/run/run_api.py @@ -13,7 +13,7 @@ limitations under the License. """ -#!/usr/bin/env python3 +# !/usr/bin/env python3 # -*- coding:utf8 -*- import json @@ -34,7 +34,8 @@ def __init__(self): @classmethod def add_run(self, host, fs_name=None, name=None, desc=None, - param=None, username=None, run_yaml_path=None, run_yaml_raw_b64=None, pipeline_id=None, pipeline_version_id=None, + param=None, username=None, run_yaml_path=None, run_yaml_raw_b64=None, pipeline_id=None, + pipeline_version_id=None, header=None, disabled=None, docker_env=None, failure_options=None): """ add run """ @@ -48,14 +49,14 @@ def add_run(self, host, fs_name=None, name=None, desc=None, if desc: body['desc'] = desc if run_yaml_path: - body['runYamlPath']=run_yaml_path + body['runYamlPath'] = run_yaml_path if run_yaml_raw_b64: if isinstance(run_yaml_raw_b64, bytes): - body['runYamlRaw']=base64.b64encode(run_yaml_raw_b64).decode() + body['runYamlRaw'] = base64.b64encode(run_yaml_raw_b64).decode() else: raise PaddleFlowSDKException("InvalidRequest", "runYamlRaw must be bytes type") if pipeline_id: - body['pipelineID']= pipeline_id + body['pipelineID'] = pipeline_id if pipeline_version_id: body['pipelineVersionID'] = pipeline_version_id if param: @@ -68,7 +69,7 @@ def add_run(self, host, fs_name=None, name=None, desc=None, body["dockerEnv"] = docker_env if failure_options: body['failureOptions'] = failure_options - + response = api_client.call_api(method="POST", url=parse.urljoin(host, api.PADDLE_FLOW_RUN), headers=header, json=body) if not response: @@ -113,9 +114,9 @@ def list_run(self, host, fs_name=None, username=None, run_id=None, run_name=None if len(data['runList']): for run in data['runList']: run_info = RunInfo(run['runID'], run['fsName'], run['username'], run['status'], run['name'], - run['description'], None, None, None, None, None, run['updateTime'], - run['source'], run['runMsg'], run['scheduleID'], run['scheduledTime'], - None, None, None, None, run['createTime'], run['activateTime']) + run['description'], None, None, None, None, None, run['updateTime'], + run['source'], run['runMsg'], run['scheduleID'], run['scheduledTime'], + None, None, None, None, run['createTime'], run['activateTime']) run_list.append(run_info) return True, {'runList': run_list, 'nextMarker': data.get('nextMarker', None)} @@ -142,16 +143,20 @@ def trans_dict_to_comp_info(comp_dict): for comp in comp_list: if 'entryPoints' in comp.keys(): new_comp = DagInfo(comp['id'], comp['name'], comp['type'], comp['dagName'], - comp['parentDagID'], comp['deps'], comp['parameters'], - comp['artifacts'], comp['startTime'], comp['endTime'], - comp['status'], comp['message'], trans_dict_to_comp_info(comp['entryPoints'])) + comp['parentDagID'], comp['deps'], comp['parameters'], + comp['artifacts'], comp['startTime'], comp['endTime'], + comp['status'], comp['message'], trans_dict_to_comp_info(comp['entryPoints'])) else: + distributed_job = None + if 'distributedJob' in comp: + distributed_job = comp['distributedJob'] new_comp = JobInfo(comp['name'], comp['deps'], comp['parameters'], - comp['command'], comp['env'], comp['status'], comp['startTime'], - comp['endTime'], comp['dockerEnv'], comp['jobID'], - comp['type'], comp['stepName'], comp['parentDagID'], - comp['extraFS'], comp['artifacts'], comp['cache'], - comp['jobMessage'], comp['cacheRunID'], comp['cacheJobID']) + comp['command'], comp['env'], comp['status'], + distributed_job, comp['startTime'], comp['endTime'], + comp['dockerEnv'], comp['jobID'], comp['type'], + comp['stepName'], comp['parentDagID'], comp['extraFS'], + comp['artifacts'], comp['cache'], comp['jobMessage'], + comp['cacheRunID'], comp['cacheJobID']) new_comp_list.append(new_comp) new_comp_dict[key] = new_comp_list return new_comp_dict @@ -167,20 +172,21 @@ def trans_dict_to_comp_info(comp_dict): for key in post.keys(): comp = post[key] new_comp = JobInfo(comp['name'], comp['deps'], comp['parameters'], - comp['command'], comp['env'], comp['status'], comp['startTime'], - comp['endTime'], comp['dockerEnv'], comp['jobID'], - comp['type'], comp['stepName'], comp['parentDagID'], - comp['extraFS'], comp['artifacts'], comp['cache'], - comp['jobMessage'], comp['cacheRunID'], comp['cacheJobID']) + comp['command'], comp['env'], comp['status'], comp['startTime'], + comp['endTime'], comp['dockerEnv'], comp['jobID'], + comp['type'], comp['stepName'], comp['parentDagID'], + comp['extraFS'], comp['artifacts'], comp['cache'], + comp['jobMessage'], comp['cacheRunID'], comp['cacheJobID']) new_post_dict[key] = new_comp run_info = RunInfo(data['runID'], data['fsName'], data['username'], data['status'], data['name'], - data['description'], data['parameters'], data['runYaml'], - runtime_info, new_post_dict, - data['dockerEnv'], data['updateTime'], data['source'], data['runMsg'],data['scheduleID'], None, - data['fsOptions'], data['failureOptions'], data['disabled'], data['runCachedIDs'], - data['createTime'], data['activateTime']) - + data['description'], data['parameters'], data['runYaml'], + runtime_info, new_post_dict, + data['dockerEnv'], data['updateTime'], data['source'], data['runMsg'], data['scheduleID'], + None, + data['fsOptions'], data['failureOptions'], data['disabled'], data['runCachedIDs'], + data['createTime'], data['activateTime']) + return True, run_info @classmethod @@ -193,10 +199,10 @@ def stop_run(self, host, run_id, header=None, force=False): params = { "action": "stop" } - + body = {"stopForce": force} - response = api_client.call_api(method = "PUT", url =url, params=params, headers=header, json=body) + response = api_client.call_api(method="PUT", url=url, params=params, headers=header, json=body) if not response: raise PaddleFlowSDKException("Connection Error", "stop run failed due to HTTPError") if not response.text: @@ -234,17 +240,17 @@ def list_runcache(self, host, user_filter=None, fs_filter=None, run_filter=None, raise PaddleFlowSDKException("InvalidRequest", "paddleflow should login first") params = {} if user_filter: - params['userFilter']=user_filter + params['userFilter'] = user_filter if fs_filter: - params['fsFilter']=fs_filter + params['fsFilter'] = fs_filter if run_filter: - params['runFilter']=run_filter + params['runFilter'] = run_filter if max_keys: - params['maxKeys']=max_keys + params['maxKeys'] = max_keys if marker: - params['marker']=marker + params['marker'] = marker response = api_client.call_api(method="GET", url=parse.urljoin(host, api.PADDLE_FLOW_RUNCACHE), - params=params, headers=header) + params=params, headers=header) if not response: raise PaddleFlowSDKException("Connection Error", "runcache list failed due to HTTPError") if not response.text: @@ -256,11 +262,11 @@ def list_runcache(self, host, user_filter=None, fs_filter=None, run_filter=None, if len(data['runCacheList']): for cache in data['runCacheList']: cache_info = RunCacheInfo(cache['cacheID'], cache['firstFp'], - cache['secondFp'], cache['runID'], cache['source'], - cache['jobID'], cache['fsname'], cache['username'], - cache['expiredTime'], cache['strategy'], - cache['custom'], cache['createTime'], - cache.get('updateTime', ' ')) + cache['secondFp'], cache['runID'], cache['source'], + cache['jobID'], cache['fsname'], cache['username'], + cache['expiredTime'], cache['strategy'], + cache['custom'], cache['createTime'], + cache.get('updateTime', ' ')) cache_list.append(cache_info) return True, {'runCacheList': cache_list, 'nextMarker': data.get('nextMarker', None)} @@ -279,8 +285,8 @@ def show_runcache(self, host, run_cache_id, header=None): if 'message' in data: return False, data['message'] ri = RunCacheInfo(data['cacheID'], data['firstFp'], data['secondFp'], data['runID'], - data['source'], data['jobID'], data['fsname'], data['username'], data['expiredTime'], - data['strategy'], data['custom'], data['createTime'], data['updateTime']) + data['source'], data['jobID'], data['fsname'], data['username'], data['expiredTime'], + data['strategy'], data['custom'], data['createTime'], data['updateTime']) return True, ri @classmethod @@ -307,7 +313,7 @@ def retry_run(self, host, run_id, header=None): """ if not header: raise PaddleFlowSDKException("InvalidRequest", "paddleflow should login first") - params={'action':'retry'} + params = {'action': 'retry'} response = api_client.call_api(method="PUT", url=parse.urljoin(host, api.PADDLE_FLOW_RUN + "/%s" % run_id), params=params, headers=header) if not response: @@ -328,7 +334,7 @@ def list_artifact(self, host, user_filter=None, fs_filter=None, run_filter=None, """ if not header: raise PaddleFlowSDKException("InvalidRequest", "paddleflow should login first") - params={} + params = {} if user_filter: params['userFilter'] = user_filter if fs_filter: @@ -344,17 +350,17 @@ def list_artifact(self, host, user_filter=None, fs_filter=None, run_filter=None, if marker: params['marker'] = marker response = api_client.call_api(method="GET", - url=parse.urljoin(host, api.PADDLE_FLOW_ARTIFACT), - params=params, headers=header) + url=parse.urljoin(host, api.PADDLE_FLOW_ARTIFACT), + params=params, headers=header) if not response: raise PaddleFlowSDKException("Connection Error", "artifact failed due to HTTPError") data = json.loads(response.text) if 'message' in data: return False, data['message'] - actiface_list=[] + actiface_list = [] for i in data['artifactEventList']: actifact = ArtifactInfo(i['runID'], i['fsname'], i['username'], i['artifactPath'], - i['step'], i['type'], i['artifactName'], i['meta'], - i['createTime'], i['updateTime']) + i['step'], i['type'], i['artifactName'], i['meta'], + i['createTime'], i['updateTime']) actiface_list.append(actifact) return True, {'artifactList': actiface_list, 'nextMarker': data.get('nextMarker', None)} diff --git a/client/paddleflow/run/run_info.py b/client/paddleflow/run/run_info.py index 8a92061f0..c772f8c0b 100644 --- a/client/paddleflow/run/run_info.py +++ b/client/paddleflow/run/run_info.py @@ -49,7 +49,7 @@ def __init__(self, run_id, fs_name, username, status, name, description, paramet class JobInfo(object): """ the class of job info""" - def __init__(self, name, deps, parameters, command, env, status, start_time, end_time, docker_env, job_id, + def __init__(self, name, deps, parameters, command, env, status, distributed_job, start_time, end_time, docker_env, job_id, comp_type, step_name, parent_dag_id, extra_fs, artifacts, cache, job_message, cache_run_id, cache_job_id): self.artifacts = artifacts self.cache = cache @@ -60,6 +60,7 @@ def __init__(self, name, deps, parameters, command, env, status, start_time, end self.job_id = job_id self.name = name self.deps = deps + self.distributed_job = distributed_job self.parameters = parameters self.command = command self.env = env @@ -86,6 +87,7 @@ def get_dict(self): 'command': self.command, 'env': self.env, 'status': self.status, + 'distributedJob': self.distributed_job, 'startTime': self.start_time, 'endTime': self.end_time, 'dockerEnv': self.docker_env, diff --git a/client/test/test_pipeline/test_dsl/test_compiler/test_step_compiler.py b/client/test/test_pipeline/test_dsl/test_compiler/test_step_compiler.py index 8939d7076..17ca2a333 100644 --- a/client/test/test_pipeline/test_dsl/test_compiler/test_step_compiler.py +++ b/client/test/test_pipeline/test_dsl/test_compiler/test_step_compiler.py @@ -16,13 +16,15 @@ """ unit test for paddleflow.pipeline.dsl.compiler.step_compiler """ -import pytest +import pytest from paddleflow.pipeline import ContainerStep from paddleflow.pipeline import Artifact from paddleflow.pipeline import Parameter from paddleflow.pipeline import CacheOptions from paddleflow.pipeline import ExtraFS +from paddleflow.job import Member +from paddleflow.pipeline import DistributedJob from paddleflow.pipeline.dsl.compiler.step_compiler import StepCompiler from paddleflow.pipeline.dsl.utils.consts import PipelineDSLError from paddleflow.common.exception.paddleflow_sdk_exception import PaddleFlowSDKException @@ -42,21 +44,21 @@ def compile_part_step(self, step, part_func): def test_compile_base_info(self): """ test _compile_base_info() """ - + # 1、step 没有任何信息 step = ContainerStep(name="haha") - compiler = self.compile_part_step(step, "_compile_base_info") + compiler = self.compile_part_step(step, "_compile_base_info") assert compiler._dict == {} # 2、step 只有部分基础信息 step = ContainerStep(name="hahaha", command="echo hahaha") - compiler = self.compile_part_step(step, "_compile_base_info") + compiler = self.compile_part_step(step, "_compile_base_info") assert compiler._dict == {"command": "echo hahaha"} # 3、step 有全量的信息 step = ContainerStep(name="hahaha", command="echo hahaha", env={"name": "xiaodu"}, docker_env="python:3.7") - compiler = self.compile_part_step(step, "_compile_base_info") + compiler = self.compile_part_step(step, "_compile_base_info") assert compiler._dict == {"command": "echo hahaha", "docker_env": "python:3.7", "env": {"name": "xiaodu"}} @@ -74,7 +76,7 @@ def test_compile_artifacts(self): "train_data": Artifact(), "validate_data": Artifact(), } - + step = ContainerStep(name="step", outputs=outputs) compiler = self.compile_part_step(step, "_compile_artifacts") assert len(compiler._dict["artifacts"]["output"]) == 2 and \ @@ -88,7 +90,7 @@ def test_compile_artifacts(self): compiler = self.compile_part_step(step2, "_compile_artifacts") assert compiler._dict["artifacts"]["input"] == {"data": "{{step.train_data}}"} and \ "output" not in compiler._dict - + # 4. 既有输出 artifact 又有 输入artifact step2.outputs["model"] = Artifact() compiler = self.compile_part_step(step2, "_compile_artifacts") @@ -147,7 +149,7 @@ def test_compile_dependences(self): compiler = self.compile_part_step(step2, "compile") deps = compiler._dict["deps"].split(",") assert len(deps) == 2 and "step" in deps and \ - "step1" in deps + "step1" in deps @pytest.mark.cache def test_compiler_cache_options(self): @@ -187,7 +189,7 @@ def test_compile(self): assert step_dict == { "docker_env": "python:3.7", "command": "echo 1234", - "env": env, + "env": env, "artifacts": {"output": ["model"]}, "parameters": {"data": "/data", "epoch": {"type": "int", "default": 1}}, "type": "step" @@ -223,7 +225,7 @@ def test_validate_io_names(self): step = ContainerStep(name="step1", docker_env="python:3.7", env=env, parameters=parameters, outputs=outputs, command="echo 1234") - + step2 = ContainerStep(name="step2", docker_env="python:3.7", parameters={"data": step.parameters["data"]}, inputs={"model": step.outputs["model"]}, command="echo 456") @@ -272,4 +274,39 @@ def test_compile_extra_fs(self): assert step_dict["extra_fs"] == [ {"name": "abc", "mount_path": "/home/work", "read_only": False}, {"name": "abc", "mount_path": "/home/work2", "read_only": True}, + ] + + @pytest.mark.distributed_job + def test_compile_distributed_job(self): + """ unit test for _compile_distributed_job + """ + + step = ContainerStep( + name="train", + parameters={ + "epoch": 5, + }, + env={"PS_NUM": "2", "WORKER_NUM": "2"}, + command="", + ) + + step_dict = StepCompiler(step).compile() + assert "distributed_job" not in step_dict + + dist_job = DistributedJob( + framework="paddle", + members=[Member(role="pworker", replicas=2, image="paddlepaddle/paddle:2.0.2-gpu-cuda10.1-cudnn7", + command="sleep 30; echo worker"), + Member(role="pserver", replicas=2, image="paddlepaddle/paddle:2.0.2-gpu-cuda10.1-cudnn7", + command="sleep 30; echo ps")] + ) + + step.distributed_job = dist_job + step_dict = StepCompiler(step).compile() + print(step_dict["distributed_job"]["members"]) + assert step_dict["distributed_job"]["members"] == [ + {"role": "pworker", "replicas": 2, "command": "sleep 30; echo worker", + "image": "paddlepaddle/paddle:2.0.2-gpu-cuda10.1-cudnn7"}, + {"role": "pserver", "replicas": 2, "command": "sleep 30; echo ps", + "image": "paddlepaddle/paddle:2.0.2-gpu-cuda10.1-cudnn7"}, ] \ No newline at end of file diff --git a/client/test/test_pipeline/test_dsl/test_options/test_distributed_job.py b/client/test/test_pipeline/test_dsl/test_options/test_distributed_job.py new file mode 100644 index 000000000..4dc3b4fbe --- /dev/null +++ b/client/test/test_pipeline/test_dsl/test_options/test_distributed_job.py @@ -0,0 +1,40 @@ +import pytest +from paddleflow.pipeline import DistributedJob +from paddleflow.job import Member + + +class TestDistributedJob(object): + """ unit test for DistributedJob + """ + @pytest.mark.compile + def test_compile(self): + distributed_job = DistributedJob( + framework="paddle" + ) + distributed_job_dict = distributed_job.compile() + + assert distributed_job_dict == { + "framework": "paddle", + "members": [], + } + + member_worker = Member(role="pworker", replicas=2, image="paddlepaddle/paddle:2.0.2-gpu-cuda10.1-cudnn7", + command="sleep 30; echo worker") + + member_ps = Member(role="pserver", replicas=2, image="paddlepaddle/paddle:2.0.2-gpu-cuda10.1-cudnn7", + command="sleep 30; echo ps") + + distributed_job_dict = DistributedJob( + framework="paddle", + members=[member_worker, member_ps] + ).compile() + + assert distributed_job_dict == { + "framework": "paddle", + "members": [{'role': 'pworker', 'replicas': 2, 'image': 'paddlepaddle/paddle:2.0.2-gpu-cuda10.1-cudnn7', + 'command': 'sleep 30; echo worker'}, + {'role': 'pserver', 'replicas': 2, 'image': 'paddlepaddle/paddle:2.0.2-gpu-cuda10.1-cudnn7', + 'command': 'sleep 30; echo ps'}], + } + + diff --git a/docs/zh_cn/reference/pipeline/dsl_definition/10_distributed_job.md b/docs/zh_cn/reference/pipeline/dsl_definition/10_distributed_job.md new file mode 100644 index 000000000..37a0b6b94 --- /dev/null +++ b/docs/zh_cn/reference/pipeline/dsl_definition/10_distributed_job.md @@ -0,0 +1,68 @@ +本章节主要介绍如何使用DSL来定义分布式任务。 + +# 1 pipeline定义 +下面是添加了distributed_job的DSL格式pipeline定义。 +> 该示例中pipeline定义,以及示例相关运行脚本,来自Paddleflow项目下example/pipeline/distributed_job_example示例。 + +```python3 +def preprocess(): + """ data preprocess step + """ + step = ContainerStep( + name="preprocess", + docker_env="centos:centos7", + parameters={"data_path": f"./distributed_job_example/data/{PF_RUN_ID}"}, + env={"USER_ABC": "123_{{PF_USER_NAME}}"}, + command="bash distributed_job_example/shells/data.sh {{data_path}}" + ) + return step + +def train(epoch, train_data): + """ distributed job + """ + dist_jobs = DistributedJob( + framework="paddle", + members=[Member(role="pworker", replicas=2, image="paddlepaddle/paddle:2.0.2-gpu-cuda10.1-cudnn7", + command="sleep 30; echo worker {{epoch}} {{train_data}} {{model_path}}"), + Member(role="pserver", replicas=2, image="paddlepaddle/paddle:2.0.2-gpu-cuda10.1-cudnn7", + command="sleep 30; echo ps {{epoch}} {{train_data}} {{model_path}}")] + ) + + """ train step + """ + step = ContainerStep( + name="train", + parameters={ + "epoch": epoch, + "model_path": f"./output/{PF_RUN_ID}", + "train_data": train_data + }, + env={"PS_NUM": "2", "WORKER_NUM": "2"}, + command="", + distributed_job=dist_jobs, + ) + return step + + +@Pipeline(name="distributed_pipeline", docker_env="nginx:1.7.9", parallelism=1) +def distributed_pipeline(epoch=15): + """ pipeline example for distributed job + """ + preprocess_step = preprocess() + train_step = train(epoch, preprocess_step.parameters["data_path"]) + + +if __name__ == "__main__": + ppl = distributed_pipeline() + + main_fs = MainFS(name="ppl") + ppl.fs_options = FSOptions(main_fs) + print(ppl.run()) +``` + +# 2、定义DistributedJob +将多个member成员组装成一个DistributedJob可以分成如下三步: +1. 创建DistributedJob实例 +2. 定义分布式任务的framework +3. 根据role创建Member列表,依次配置副本数、镜像、flavour等字段。 + diff --git a/docs/zh_cn/reference/pipeline/yaml_definition/11_distributed_job.md b/docs/zh_cn/reference/pipeline/yaml_definition/11_distributed_job.md new file mode 100644 index 000000000..7850f92ed --- /dev/null +++ b/docs/zh_cn/reference/pipeline/yaml_definition/11_distributed_job.md @@ -0,0 +1,103 @@ +在深度学习场景下,分布式训练任务的应用较为广泛。本文介绍如何在PaddleFlow Pipeline中配置分布式任务。 +# 1 pipeline定义 +下面是基于 [1_pipeline_basic.md] 示例,增加了distributed_job的yaml格式pipeline定义。 +> 该示例中pipeline定义,以及示例相关运行脚本,来自Paddleflow项目下example/pipeline/distributed_job_example示例。 + + +```yaml +name: distributed_pipeline + +entry_points: + preprocess: + command: bash distributed_job_example/shells/data.sh {{data_path}} + docker_env: centos:centos7 + env: + USER_ABC: 123_{{PF_USER_NAME}} + parameters: + data_path: ./distributed_job_example/data/{{PF_RUN_ID}} + + train: + deps: preprocess + env: + PS_NUM: "2" + WORKER_NUM: "2" + parameters: + epoch: 15 + model_path: ./output/{{PF_RUN_ID}} + train_data: '{{preprocess.data_path}}' + distributed_job: + framework: "paddle" + members: + - {"role": "pserver", "image": "paddlepaddle/paddle:2.0.2-gpu-cuda10.1-cudnn7", "command": "sleep 30; echo ps {{epoch}} {{train_data}} {{model_path}}", "replicas": 2, "flavour": { "name": "flavour1" } } + - {"role": "pworker", "image": "paddlepaddle/paddle:2.0.2-gpu-cuda10.1-cudnn7", "command": "sleep 30; echo worker {{epoch}} {{train_data}} {{model_path}}", "replicas": 2, "flavour": { "name": "flavour1" } } + +parallelism: 1 + +fs_options: + main_fs: {name: "ppl"} +``` + +# 2 分布式任务详解 +在Paddleflow pipeline中,配置分布式任务的方式非常简单,只需在节点中添加distributed_job,并配置框架(framework)和成员(members)信息即可。 +下面基于上述pipeline定义,介绍每个字段的作用。 + +## 2.1 framework +分布式任务的框架名称 + +## 2.2 members +分布式任务的成员定义,Members中支持定义的字段如下: + +### 2.2.1 role +- 当前member在分布式任务中的角色,例如pserver、pworker等。 +- members针对角色进行分组定义,一个role对应members列表中的一个list。 + +### 2.2.2 replicas +replicas字段需指定当前member的副本数,int类型。 + +### 2.2.3 image +member镜像名称,string类型。 +如果未定义,取该Step的docker_env中定义的镜像。 + +### 2.2.4 port +member的端口号,int类型 + +### 2.2.5 flavour +member使用的flavour定义,例如: { "name": "flavour1", "cpu": "1", "mem": "1G", "scalar_resources": {"nvidia.com/gpu": "1"}} + +### 2.2.6 command +member运行的command,string类型。 +- 如果Member中定义了command字段,Member使用自己的command。 +- 如果Member中没有定义command字段,Member使用所在Step的command。 + +### 2.2.7 annotations +member自定义的annotations,map类型。 + +### 2.2.8 labels +member自定义的labels,map类型。 + +### 2.2.9 env +member运行的自定义环境变量,map类型。 +- 如果Member中没有定义env字段,Member的env将包含所在Step的env。 +- 如果Member中定义了环境变量,所在Step的env中不包含Member中定义的env名称,member的env将会在此基础上追加所在Step的env。 +- 如果Member中定义了环境变量,且所在Step的env中包含Member中定义的env名称,则Member实际使用的该环境变量值为Member指定的env环境变量值,即Member指定的非系统环境变量优先级高于其所在Step的非系统环境变量。 + +### 2.2.10 fs +member运行的自定义fs,map类型。 +- 如果Member中定义了fs,Member使用自己的fs +- 如果Member中没有定义fs,Member使用所在Step的fs + +### 2.2.11 extra_fs +member运行的自定义extra_fs,列表类型。 +- 如果Member中定义了extra_fs,Member使用自己的extra_fs +- 如果Member中没有定义extra_fs,Member使用所在Step的extra_fs + +# 3 Pipeline运行过程 +当Paddleflow开始调度执行分布式任务的节点时,会检查distributed_job字段是否有值,如果有值则会开始执行如下流程: + +1. 解析Members中的配置字段,包括role、replicas、image、port、flavour、command等。 + +2. 替换Member的command字段中的模板。 如果是artifact模板,则会使用artifact的文件目录来替换相应的模板。 + +3. 根据Framework和各Member的配置信息创建分布式PaddleFlow Job。 + +4. 运行多个分布式Pod。 \ No newline at end of file diff --git a/example/pipeline/distributed_job_example/run.py b/example/pipeline/distributed_job_example/run.py new file mode 100644 index 000000000..a7edb407d --- /dev/null +++ b/example/pipeline/distributed_job_example/run.py @@ -0,0 +1,77 @@ +""" +Copyright (c) 2023 PaddlePaddle Authors. All Rights Reserve. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" +from paddleflow.pipeline import ContainerStep +from paddleflow.pipeline import Pipeline +from paddleflow.pipeline import PF_RUN_ID +from paddleflow.pipeline import MainFS +from paddleflow.pipeline import FSOptions +from paddleflow.job import Member +from paddleflow.pipeline import DistributedJob + + +def preprocess(): + """ data preprocess step + """ + step = ContainerStep( + name="preprocess", + docker_env="centos:centos7", + parameters={"data_path": f"./distributed_job_example/data/{PF_RUN_ID}"}, + env={"USER_ABC": "123_{{PF_USER_NAME}}"}, + command="bash distributed_job_example/shells/data.sh {{data_path}}" + ) + return step + + +def train(epoch, train_data): + """ distributed job + """ + dist_jobs = DistributedJob( + framework="paddle", + members=[Member(role="pworker", replicas=2, image="paddlepaddle/paddle:2.0.2-gpu-cuda10.1-cudnn7", + command="sleep 30; echo worker {{epoch}} {{train_data}} {{model_path}}"), + Member(role="pserver", replicas=2, image="paddlepaddle/paddle:2.0.2-gpu-cuda10.1-cudnn7", + command="sleep 30; echo ps {{epoch}} {{train_data}} {{model_path}}")] + ) + + """ train step + """ + step = ContainerStep( + name="train", + parameters={ + "epoch": epoch, + "model_path": f"./output/{PF_RUN_ID}", + "train_data": train_data + }, + env={"PS_NUM": "2", "WORKER_NUM": "2"}, + command="", + distributed_job=dist_jobs, + ) + return step + + +@Pipeline(name="distributed_pipeline", docker_env="nginx:1.7.9", parallelism=1) +def distributed_pipeline(epoch=15): + """ pipeline example for distributed job + """ + preprocess_step = preprocess() + train_step = train(epoch, preprocess_step.parameters["data_path"]) + + +if __name__ == "__main__": + ppl = distributed_pipeline() + + main_fs = MainFS(name="ppl") + ppl.fs_options = FSOptions(main_fs) + print(ppl.run()) diff --git a/example/pipeline/distributed_job_example/run.yaml b/example/pipeline/distributed_job_example/run.yaml new file mode 100644 index 000000000..b5f5bf034 --- /dev/null +++ b/example/pipeline/distributed_job_example/run.yaml @@ -0,0 +1,30 @@ +name: distributed_pipeline + +entry_points: + preprocess: + command: bash distributed_job_example/shells/data.sh {{data_path}} + docker_env: centos:centos7 + env: + USER_ABC: 123_{{PF_USER_NAME}} + parameters: + data_path: ./distributed_job_example/data/{{PF_RUN_ID}} + + train: + deps: preprocess + env: + PS_NUM: "2" + WORKER_NUM: "2" + parameters: + epoch: 15 + model_path: ./output/{{PF_RUN_ID}} + train_data: '{{preprocess.data_path}}' + distributed_job: + framework: "paddle" + members: + - {"role": "pserver", "image": "paddlepaddle/paddle:2.0.2-gpu-cuda10.1-cudnn7", "command": "sleep 30; echo ps {{epoch}} {{train_data}} {{model_path}}", "replicas": 2, "flavour": { "name": "flavour1" }} + - {"role": "pworker", "image": "paddlepaddle/paddle:2.0.2-gpu-cuda10.1-cudnn7", "command": "sleep 30; echo worker {{epoch}} {{train_data}} {{model_path}}", "replicas": 2, "flavour": { "name": "flavour1" }} + +parallelism: 1 + +fs_options: + main_fs: {name: "ppl"} \ No newline at end of file diff --git a/example/pipeline/distributed_job_example/shells/data.sh b/example/pipeline/distributed_job_example/shells/data.sh new file mode 100644 index 000000000..87c5a0f4d --- /dev/null +++ b/example/pipeline/distributed_job_example/shells/data.sh @@ -0,0 +1,13 @@ +#!/bin/bash + +printenv + +echo "$@" + +echo "starting..." + +mkdir -p $1 +echo "hello paddleflow\n" >> $1/data +sleep 5 + +echo "ending..." \ No newline at end of file diff --git a/example/pipeline/queue_example/run.yaml b/example/pipeline/queue_example/run.yaml index 673fece9f..1b41f339b 100644 --- a/example/pipeline/queue_example/run.yaml +++ b/example/pipeline/queue_example/run.yaml @@ -6,7 +6,7 @@ entry_points: docker_env: centos:centos7 env: PF_JOB_FLAVOUR: flavour1 - PF_JOB_QUEUE_NAME: ppl-queue + PF_JOB_QUEUE_NAME: default-queue PF_JOB_TYPE: single USER_ABC: 123_{{PF_USER_NAME}} parameters: diff --git a/go-sdk/examples/pipeline/pipeline.go b/go-sdk/examples/pipeline/pipeline.go index 196ddc3e8..0cf542e4a 100644 --- a/go-sdk/examples/pipeline/pipeline.go +++ b/go-sdk/examples/pipeline/pipeline.go @@ -5,7 +5,7 @@ Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 + http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, @@ -57,10 +57,10 @@ func getToken(pfClient *service.PaddleFlowClient) string { func createRun(pfClient *service.PaddleFlowClient, token string, request *v1.CreateRunRequest) (createResult *v1.CreateRunResponse) { createResult, err := pfClient.APIV1().Run().Create(context.TODO(), request, token) - if err != nil { panic(err) } + fmt.Printf("create Run result %v\n", createResult) return } @@ -221,9 +221,9 @@ func UpdatePipelineByRaw(filepath string, pplID string) (string, string) { } func main() { - GetPipelineFromFile("") - CreateRunByRunYamlRaw("") - CreatePipelineByRaw("") - UpdatePipelineByRaw("", "ppl-000096") - CreateRunSpecifyFailureOptions("", schema.FailureStrategyFailFast) + //GetPipelineFromFile("") + CreateRunByRunYamlRaw("/Users/wanziyu/wanziyu/PaddleFlow/example/pipeline/distributed_job_example/run.yaml") + //CreatePipelineByRaw("") + //UpdatePipelineByRaw("", "ppl-000096") + //CreateRunSpecifyFailureOptions("", schema.FailureStrategyFailFast) } diff --git a/installer/database/paddleflow.sql b/installer/database/paddleflow.sql index f127ff9c2..a237bf78f 100644 --- a/installer/database/paddleflow.sql +++ b/installer/database/paddleflow.sql @@ -226,6 +226,7 @@ CREATE TABLE IF NOT EXISTS `run_job` ( `cache_run_id` varchar(60), `cache_job_id` varchar(60), `extra_fs_json` text, + `distributed_job_json` text, `created_at` datetime(3) DEFAULT NULL, `activated_at` datetime(3) DEFAULT NULL, `updated_at` datetime(3) DEFAULT NULL, diff --git a/pkg/apiserver/controller/job/create.go b/pkg/apiserver/controller/job/create.go index ec78e92a0..7e331f31e 100644 --- a/pkg/apiserver/controller/job/create.go +++ b/pkg/apiserver/controller/job/create.go @@ -70,6 +70,7 @@ func CreatePFJob(ctx *logger.RequestContext, request *CreateJobInfo) (*CreateJob if request.ID == "" { request.ID = uuid.GenerateIDWithLength(schema.JobPrefix, uuid.JobIDLength) } + if err := common.CheckPermission(ctx.UserName, ctx.UserName, common.ResourceTypeJob, request.ID); err != nil { ctx.ErrorCode = common.ActionNotAllowed ctx.Logging().Errorln(err.Error()) @@ -705,12 +706,33 @@ func buildMembers(request *CreateJobInfo) []schema.Member { for _, reqMember := range request.Members { log.Debugf("reqMember %#v, role is %s", reqMember, reqMember.Role) member := newMember(reqMember, schema.MemberRole(reqMember.Role)) - buildCommonInfo(&member.Conf, &request.CommonJobInfo) + buildMemberCommonInfo(&member.Conf, &request.CommonJobInfo, reqMember) members = append(members, member) } return members } +func buildMemberCommonInfo(conf *schema.Conf, commonJobInfo *CommonJobInfo, memberRequest MemberSpec) { + log.Debugf("patch envs for job %s", commonJobInfo.Name) + // basic fields required + conf.Labels = commonJobInfo.Labels + conf.Annotations = commonJobInfo.Annotations + // insert user specific labels and annotations + for k, v := range memberRequest.Labels { + conf.SetLabels(k, v) + } + for k, v := range memberRequest.Annotations { + conf.SetAnnotations(k, v) + } + // info in SchedulingPolicy: queue,Priority,ClusterId,Namespace + schedulingPolicy := commonJobInfo.SchedulingPolicy + conf.SetQueueID(schedulingPolicy.QueueID) + conf.SetQueueName(schedulingPolicy.Queue) + conf.SetPriority(schedulingPolicy.Priority) + conf.SetClusterID(schedulingPolicy.ClusterId) + conf.SetNamespace(schedulingPolicy.Namespace) +} + func buildCommonInfo(conf *schema.Conf, commonJobInfo *CommonJobInfo) { log.Debugf("patch envs for job %s", commonJobInfo.Name) // basic fields required @@ -862,12 +884,7 @@ func CreatePPLJob(conf schema.PFJobConf) (string, error) { return jobResponse.ID, nil } -func ValidatePPLJob(conf schema.PFJobConf) error { - createJobInfo, err := jobConfToCreateJobInfo(conf) - if err != nil { - log.Errorf("convert job config to CreateJobInfo failed. err: %s", err) - return err - } +func ValidatePPLJob(createJobInfo *CreateJobInfo) error { // pipeline job check if len(createJobInfo.Name) == 0 { return errors.EmptyJobNameError() diff --git a/pkg/apiserver/controller/pipeline/run.go b/pkg/apiserver/controller/pipeline/run.go index 8f927f7ea..68f19ceae 100644 --- a/pkg/apiserver/controller/pipeline/run.go +++ b/pkg/apiserver/controller/pipeline/run.go @@ -45,16 +45,16 @@ import ( var wfMap = sync.Map{} const ( - JsonFsOptions = "fs_options" // 由于在获取BodyMap的FsOptions前已经转为下划线形式,因此这里为fs_options - JsonUserName = "username" - JsonDescription = "description" - JsonFlavour = "flavour" - JsonQueue = "queue" - JsonJobType = "jobType" - JsonEnv = "env" - - FinalRunStatus = "FINAL_RUN_STATUS" - FinalRunMsg = "FINAL_RUN_MSG" + JsonFsOptions = "fs_options" // 由于在获取BodyMap的FsOptions前已经转为下划线形式,因此这里为fs_options + JsonUserName = "username" + JsonDescription = "description" + JsonFlavour = "flavour" + JsonQueue = "queue" + JsonJobType = "jobType" + JsonEnv = "env" + JsonDistributedJob = "distributed_job" + FinalRunStatus = "FINAL_RUN_STATUS" + FinalRunMsg = "FINAL_RUN_MSG" ) type CreateRunRequest struct { @@ -237,6 +237,7 @@ func buildWorkflowSource(ctx *logger.RequestContext, req CreateRunRequest, fsID logger.Logger().Errorf("runYamlAndReqToWfs failed. err:%v", err) return schema.WorkflowSource{}, "", "", err } + return wfs, source, runYaml, nil } @@ -649,6 +650,7 @@ func CreateRunByJson(ctx *logger.RequestContext, bodyMap map[string]interface{}) } reqFsName = fsOptions.MainFS.Name } + if _, ok := bodyMap[JsonUserName].(string); ok { reqUserName = bodyMap[JsonUserName].(string) } @@ -787,7 +789,6 @@ func ValidateAndStartRun(ctx *logger.RequestContext, run *models.Run, userName s // 在ValidateAndCreateRun已经校验过requestId非空 requestId := ctx.RequestID - // update trace logger key _ = trace_logger.UpdateKey(requestId, runID) trace_logger.Key(runID).Infof("create run in db success") diff --git a/pkg/apiserver/controller/pipeline/run_test.go b/pkg/apiserver/controller/pipeline/run_test.go index eb9453bce..a74457a14 100644 --- a/pkg/apiserver/controller/pipeline/run_test.go +++ b/pkg/apiserver/controller/pipeline/run_test.go @@ -237,6 +237,13 @@ func TestCallback(t *testing.T) { common.WfEventKeyRunID: run1.ID, common.WfEventKeyStatus: common.StatusRunRunning, common.WfEventKeyStartTime: "2022-07-07 13:15:04", + common.WfEventKeyView: schema.JobView{ + Name: "test-runjob", + DistributedJob: schema.DistributedJob{ + Framework: "paddle", + Members: nil, + }, + }, }, } f := UpdateRuntimeFunc @@ -245,7 +252,6 @@ func TestCallback(t *testing.T) { assert.Nil(t, err) assert.NotEmpty(t, updatedRun.UpdateTime) assert.Equal(t, common.StatusRunRunning, updatedRun.Status) - // test not update activated_at run3 := getMockRun1_3() run3.ID, err = models.CreateRun(ctx.Logging(), &run3) @@ -319,7 +325,6 @@ func TestCreateRunByJson(t *testing.T) { wfs, err := getWorkFlowSourceByJson(bodyMap) assert.Nil(t, err) fmt.Println(wfs.EntryPoints.EntryPoints["main"].(*schema.WorkflowSourceStep).Cache) - run := models.Run{ Name: "full_run", Source: "run.yaml", @@ -343,6 +348,7 @@ func TestCreateRunByJson(t *testing.T) { defer patch.Reset() ctx := &logger.RequestContext{UserName: MockRootUser} + CreateRunByJson(ctx, bodyMap) assert.Equal(t, common.InvalidPipeline, ctx.ErrorCode) diff --git a/pkg/apiserver/controller/pipeline/testcase/run.yaml b/pkg/apiserver/controller/pipeline/testcase/run.yaml index c47c1fcd7..a75624ede 100644 --- a/pkg/apiserver/controller/pipeline/testcase/run.yaml +++ b/pkg/apiserver/controller/pipeline/testcase/run.yaml @@ -24,6 +24,14 @@ entry_points: PF_JOB_MODE: Pod PF_JOB_FLAVOUR: flavour1 PF_JOB_PRIORITY: HIGH + distStep: + parameters: + epoch: 15 + distributed_job: + framework: "paddle" + members: + - { "role": "pserver", "image": "paddlepaddle/paddle:2.0.2-gpu-cuda10.1-cudnn7", "command": "echo ps {{epoch}}", "replicas": 2 } + - { "role": "pworker", "image": "paddlepaddle/paddle:2.0.2-gpu-cuda10.1-cudnn7", "command": "echo worker {{epoch}}", "replicas": 2} post_process: post: parameters: diff --git a/pkg/apiserver/controller/pipeline/testcase/run_dag.json b/pkg/apiserver/controller/pipeline/testcase/run_dag.json index ded30034f..a32647bd4 100644 --- a/pkg/apiserver/controller/pipeline/testcase/run_dag.json +++ b/pkg/apiserver/controller/pipeline/testcase/run_dag.json @@ -50,6 +50,33 @@ "read_only": false } ] + }, + "train": { + "deps": "main", + "env": { + "PS_NUM": "2", + "WORKER_NUM": "2" + }, + "parameters": { + "epoch": 15 + }, + "distributedJob": { + "framework": "paddle", + "members": [ + { + "role": "pserver", + "image": "paddlepaddle/paddle:2.0.2-gpu-cuda10.1-cudnn7", + "command": "sleep 30; echo {{epoch}}", + "replicas": 2 + }, + { + "role": "pworker", + "image": "paddlepaddle/paddle:2.0.2-gpu-cuda10.1-cudnn7", + "command": "sleep 30; echo {{epoch}}", + "replicas": 2 + } + ] + } } }, "failureOptions": { diff --git a/pkg/apiserver/controller/pipeline/testcase/run_dag.yaml b/pkg/apiserver/controller/pipeline/testcase/run_dag.yaml index 9e0e8f626..d9655e9f9 100644 --- a/pkg/apiserver/controller/pipeline/testcase/run_dag.yaml +++ b/pkg/apiserver/controller/pipeline/testcase/run_dag.yaml @@ -107,7 +107,17 @@ entry_points: disStep: command: "echo dis" - + parameters: + epoch: 5 + PF_JOB_QUEUE_NAME: train-queue + PF_JOB_PRIORITY: high + distributed_job: + framework: "paddle" + members: + - { "role": "pserver", "port": 8080, "command": "sleep 30; echo {{epoch}}", "image": paddlepaddle/paddle:2.0.2-gpu-cuda10.1-cudnn7, "replicas": 2, + "flavour": { "name": "flavour1", "cpu": "1", "mem": "1G", "scalarResources": {"nvidia.com/gpu": "1"}}, "env": {"ROLE": "ps", "PS_NUM": 2}, "annotations": {"paddle-dist-type": "PS"}, "fs": {"name": "ppl"}, "extra_fs": [{"name": "ppl", "hostPath": "/path/to/run", "mountPath": "/path/to/mount", "subPath": "/ps", "readOnly": true}]} + - { "role": "pworker", "port": 8080, "command": "sleep 30; echo {{epoch}}", "image": paddlepaddle/paddle:2.0.2-gpu-cuda10.1-cudnn7, "replicas": 2, + "flavour": { "name": "flavour1", "cpu": "1", "mem": "1G", "scalarResources": {"nvidia.com/gpu": "1"}}, "env": {"ROLE": "ps", "WORKER_NUM": 2}, "labels": {"paddle-dist-type": "PS"}, "fs": {"name": "ppl"}, "extra_fs": [{"name": "ppl", "hostPath": "/path/to/run", "mountPath": "/path/to/mount", "subPath": "/worker", "readOnly": true}]} components: process-negetive: diff --git a/pkg/apiserver/models/run_job.go b/pkg/apiserver/models/run_job.go index cea267894..4c53387a0 100644 --- a/pkg/apiserver/models/run_job.go +++ b/pkg/apiserver/models/run_job.go @@ -30,36 +30,38 @@ import ( ) type RunJob struct { - Pk int64 `gorm:"primaryKey;autoIncrement;not null" json:"-"` - ID string `gorm:"type:varchar(60);not null" json:"jobID"` - RunID string `gorm:"type:varchar(60);not null" json:"runID"` - ParentDagID string `gorm:"type:varchar(60);not null" json:"parentDagID"` - Name string `gorm:"type:varchar(60);not null" json:"name"` - StepName string `gorm:"type:varchar(60);not null" json:"step_name"` - Command string `gorm:"type:text;size:65535;not null" json:"command"` - Parameters map[string]string `gorm:"-" json:"parameters"` - ParametersJson string `gorm:"type:text;size:65535;not null" json:"-"` - Artifacts schema.Artifacts `gorm:"-" json:"artifacts"` - ArtifactsJson string `gorm:"type:text;size:65535;not null" json:"-"` - Env map[string]string `gorm:"-" json:"env"` - EnvJson string `gorm:"type:text;size:65535;not null" json:"-"` - DockerEnv string `gorm:"type:varchar(128);not null" json:"docker_env"` - LoopSeq int `gorm:"type:int;not null" json:"-"` - Status schema.JobStatus `gorm:"type:varchar(32);not null" json:"status"` - Message string `gorm:"type:text;size:65535;not null" json:"message"` - Cache schema.Cache `gorm:"-" json:"cache"` - CacheJson string `gorm:"type:text;size:65535;not null" json:"-"` - CacheRunID string `gorm:"type:varchar(60);not null" json:"cacheRunID"` - CacheJobID string `gorm:"type:varchar(60);not null" json:"cacheJobID"` - ExtraFS []schema.FsMount `gorm:"-" json:"extraFs"` - ExtraFSJson string `gorm:"type:text;size:65535;not null" json:"-"` - CreateTime string `gorm:"-" json:"createTime"` - ActivateTime string `gorm:"-" json:"activateTime"` - UpdateTime string `gorm:"-" json:"updateTime,omitempty"` - CreatedAt time.Time ` json:"-"` - ActivatedAt sql.NullTime ` json:"-"` - UpdatedAt time.Time ` json:"-"` - DeletedAt gorm.DeletedAt `gorm:"index" json:"-"` + Pk int64 `gorm:"primaryKey;autoIncrement;not null" json:"-"` + ID string `gorm:"type:varchar(60);not null" json:"jobID"` + RunID string `gorm:"type:varchar(60);not null" json:"runID"` + ParentDagID string `gorm:"type:varchar(60);not null" json:"parentDagID"` + Name string `gorm:"type:varchar(60);not null" json:"name"` + StepName string `gorm:"type:varchar(60);not null" json:"step_name"` + Command string `gorm:"type:text;size:65535;not null" json:"command"` + Parameters map[string]string `gorm:"-" json:"parameters"` + ParametersJson string `gorm:"type:text;size:65535;not null" json:"-"` + Artifacts schema.Artifacts `gorm:"-" json:"artifacts"` + ArtifactsJson string `gorm:"type:text;size:65535;not null" json:"-"` + Env map[string]string `gorm:"-" json:"env"` + EnvJson string `gorm:"type:text;size:65535;not null" json:"-"` + DockerEnv string `gorm:"type:varchar(128);not null" json:"docker_env"` + LoopSeq int `gorm:"type:int;not null" json:"-"` + Status schema.JobStatus `gorm:"type:varchar(32);not null" json:"status"` + Message string `gorm:"type:text;size:65535;not null" json:"message"` + Cache schema.Cache `gorm:"-" json:"cache"` + CacheJson string `gorm:"type:text;size:65535;not null" json:"-"` + CacheRunID string `gorm:"type:varchar(60);not null" json:"cacheRunID"` + CacheJobID string `gorm:"type:varchar(60);not null" json:"cacheJobID"` + ExtraFS []schema.FsMount `gorm:"-" json:"extraFs"` + ExtraFSJson string `gorm:"type:text;size:65535;not null" json:"-"` + DistributedJob schema.DistributedJob `gorm:"-" json:"distributedJob"` + DistributedJobJson string `gorm:"type:text;size:65535;not null" json:"-"` + CreateTime string `gorm:"-" json:"createTime"` + ActivateTime string `gorm:"-" json:"activateTime"` + UpdateTime string `gorm:"-" json:"updateTime,omitempty"` + CreatedAt time.Time ` json:"-"` + ActivatedAt sql.NullTime ` json:"-"` + UpdatedAt time.Time ` json:"-"` + DeletedAt gorm.DeletedAt `gorm:"index" json:"-"` } func CreateRunJob(logEntry *log.Entry, runJob *RunJob) (int64, error) { @@ -134,6 +136,13 @@ func (rj *RunJob) Encode() error { } rj.CacheJson = string(cacheJson) + distJson, err := json.Marshal(rj.DistributedJob) + if err != nil { + logger.Logger().Errorf("encode run job distributed_job failed. error:%v", err) + return err + } + rj.DistributedJobJson = string(distJson) + parametersJson, err := json.Marshal(rj.Parameters) if err != nil { logger.Logger().Errorf("encode run job parameters failed. error:%v", err) @@ -210,6 +219,14 @@ func (rj *RunJob) decode() error { rj.ExtraFS = fsMount } + if len(rj.DistributedJobJson) > 0 { + distJob := schema.DistributedJob{} + if err := json.Unmarshal([]byte(rj.DistributedJobJson), &distJob); err != nil { + logger.Logger().Errorf("decode run job distributedJob failed. error: %v", err) + } + rj.DistributedJob = distJob + } + // format time rj.CreateTime = rj.CreatedAt.Format("2006-01-02 15:04:05") rj.UpdateTime = rj.UpdatedAt.Format("2006-01-02 15:04:05") @@ -249,26 +266,27 @@ func (rj *RunJob) Trans2JobView() schema.JobView { newFsMount := append(rj.ExtraFS, []schema.FsMount{}...) return schema.JobView{ - PK: rj.Pk, - JobID: rj.ID, - Name: rj.Name, - Type: "step", - StepName: rj.StepName, - ParentDagID: rj.ParentDagID, - LoopSeq: rj.LoopSeq, - Command: rj.Command, - Parameters: newParameters, - Env: newEnv, - StartTime: rj.ActivateTime, - EndTime: newEndTime, - Status: rj.Status, - DockerEnv: rj.DockerEnv, - Artifacts: *rj.Artifacts.DeepCopy(), - Cache: rj.Cache, - JobMessage: rj.Message, - CacheRunID: rj.CacheRunID, - CacheJobID: rj.CacheJobID, - ExtraFS: newFsMount, + PK: rj.Pk, + JobID: rj.ID, + Name: rj.Name, + Type: "step", + StepName: rj.StepName, + ParentDagID: rj.ParentDagID, + LoopSeq: rj.LoopSeq, + Command: rj.Command, + Parameters: newParameters, + DistributedJob: rj.DistributedJob, + Env: newEnv, + StartTime: rj.ActivateTime, + EndTime: newEndTime, + Status: rj.Status, + DockerEnv: rj.DockerEnv, + Artifacts: *rj.Artifacts.DeepCopy(), + Cache: rj.Cache, + JobMessage: rj.Message, + CacheRunID: rj.CacheRunID, + CacheJobID: rj.CacheJobID, + ExtraFS: newFsMount, } } @@ -290,23 +308,23 @@ func ParseRunJob(jobView *schema.JobView) RunJob { } newFsMount := append(jobView.ExtraFS, []schema.FsMount{}...) - return RunJob{ - ID: jobView.JobID, - Name: jobView.Name, - ParentDagID: jobView.ParentDagID, - Command: jobView.Command, - Parameters: newParameters, - Artifacts: *jobView.Artifacts.DeepCopy(), - Env: newEnv, - DockerEnv: jobView.DockerEnv, - LoopSeq: jobView.LoopSeq, - Status: jobView.Status, - Message: jobView.JobMessage, - Cache: jobView.Cache, - CacheRunID: jobView.CacheRunID, - CacheJobID: jobView.CacheJobID, - ActivateTime: jobView.StartTime, - ExtraFS: newFsMount, + ID: jobView.JobID, + Name: jobView.Name, + ParentDagID: jobView.ParentDagID, + Command: jobView.Command, + Parameters: newParameters, + Artifacts: *jobView.Artifacts.DeepCopy(), + Env: newEnv, + DockerEnv: jobView.DockerEnv, + LoopSeq: jobView.LoopSeq, + Status: jobView.Status, + Message: jobView.JobMessage, + Cache: jobView.Cache, + CacheRunID: jobView.CacheRunID, + CacheJobID: jobView.CacheJobID, + DistributedJob: jobView.DistributedJob, + ActivateTime: jobView.StartTime, + ExtraFS: newFsMount, } } diff --git a/pkg/apiserver/models/run_job_test.go b/pkg/apiserver/models/run_job_test.go index 0f19717a8..c1cfa58d6 100644 --- a/pkg/apiserver/models/run_job_test.go +++ b/pkg/apiserver/models/run_job_test.go @@ -17,6 +17,7 @@ limitations under the License. package models import ( + "encoding/json" "testing" "github.com/stretchr/testify/assert" @@ -67,3 +68,121 @@ func TestTransJob(t *testing.T) { jobView = rj.Trans2JobView() assert.Equal(t, jobView.EndTime, "") } + +func TestEncodeJob(t *testing.T) { + rj := RunJob{ + Status: schema.StatusJobRunning, + UpdateTime: "2022-01-01 00:01:11", + } + rj.DistributedJobJson = "" + rj.DistributedJob = schema.DistributedJob{ + Framework: "paddle", + Members: []schema.Member{ + { + Replicas: 2, + Role: "pworker", + Conf: schema.Conf{ + QueueName: "train-queue", + Command: "echo worker", + Image: "paddlepaddle/paddle:2.0.2-gpu-cuda10.1-cudnn7", + Env: map[string]string{"Worker": "1"}, + Flavour: schema.Flavour{Name: "flavour1"}, + Priority: "high", + }, + }, + { + Replicas: 2, + Role: "pserver", + Conf: schema.Conf{ + QueueName: "train-queue", + Command: "echo server", + Image: "paddlepaddle/paddle:2.0.2-gpu-cuda10.1-cudnn7", + Env: map[string]string{"PS": "1"}, + Flavour: schema.Flavour{Name: "flavour1"}, + Priority: "high", + }, + }, + }, + } + rj.Encode() + + distributedJob := schema.DistributedJob{} + err := json.Unmarshal([]byte(rj.DistributedJobJson), &distributedJob) + assert.Nil(t, err) + assert.Equal(t, string(distributedJob.Framework), "paddle") + assert.Equal(t, len(distributedJob.Members), 2) +} + +func TestDecode(t *testing.T) { + rj := RunJob{ + Status: schema.StatusJobRunning, + UpdateTime: "2022-01-01 00:01:11", + } + + distJob := schema.DistributedJob{ + Framework: "paddle", + Members: []schema.Member{ + { + Replicas: 2, + Role: "pworker", + Conf: schema.Conf{ + QueueName: "train-queue", + Command: "echo worker", + Image: "paddlepaddle/paddle:2.0.2-gpu-cuda10.1-cudnn7", + Env: map[string]string{"Worker": "1"}, + Flavour: schema.Flavour{Name: "flavour1"}, + Priority: "high", + }, + }, + { + Replicas: 2, + Role: "pserver", + Conf: schema.Conf{ + QueueName: "train-queue", + Command: "echo server", + Image: "paddlepaddle/paddle:2.0.2-gpu-cuda10.1-cudnn7", + Env: map[string]string{"PS": "1"}, + Flavour: schema.Flavour{Name: "flavour1"}, + Priority: "high", + }, + }, + }, + } + distJson, err := json.Marshal(distJob) + assert.Nil(t, err) + rj.DistributedJobJson = string(distJson) + rj.decode() + + distributedJob := schema.DistributedJob{ + Framework: "paddle", + Members: []schema.Member{ + { + Replicas: 2, + Role: "pworker", + Conf: schema.Conf{ + QueueName: "train-queue", + Command: "echo worker", + Image: "paddlepaddle/paddle:2.0.2-gpu-cuda10.1-cudnn7", + Env: map[string]string{"Worker": "1"}, + Flavour: schema.Flavour{Name: "flavour1"}, + Priority: "high", + }, + }, + { + Replicas: 2, + Role: "pserver", + Conf: schema.Conf{ + QueueName: "train-queue", + Command: "echo server", + Image: "paddlepaddle/paddle:2.0.2-gpu-cuda10.1-cudnn7", + Env: map[string]string{"PS": "1"}, + Flavour: schema.Flavour{Name: "flavour1"}, + Priority: "high", + }, + }, + }, + } + + assert.Equal(t, rj.DistributedJob.Framework, distributedJob.Framework) + assert.Equal(t, rj.DistributedJob.Members, distributedJob.Members) +} diff --git a/pkg/apiserver/models/schedule.go b/pkg/apiserver/models/schedule.go index 7312dedf7..2a6d534a8 100644 --- a/pkg/apiserver/models/schedule.go +++ b/pkg/apiserver/models/schedule.go @@ -139,7 +139,7 @@ func NewScheduleOptions(logEntry *log.Entry, catchup bool, expireInterval int, c return so, fmt.Errorf(errMsg) } - // 校验 currencyPolicy + // 校验 currencyPolicy, 默认值为suspend if concurrencyPolicy == "" { concurrencyPolicy = ConcurrencyPolicySuspend } diff --git a/pkg/common/schema/flavor.go b/pkg/common/schema/flavor.go index cf9858ba4..ca4be7bd4 100644 --- a/pkg/common/schema/flavor.go +++ b/pkg/common/schema/flavor.go @@ -58,6 +58,51 @@ func (r ResourceInfo) ToMap() map[string]string { return res } +func ParseFlavour(flavourMap map[string]interface{}, flavour *Flavour) error { + for flavourKey, flavourValue := range flavourMap { + switch flavourKey { + case "name": + refValue, ok := flavourValue.(string) + if !ok { + return fmt.Errorf("[name] defined in flavour should be string type") + } + flavour.Name = refValue + case "cpu": + refValue, ok := flavourValue.(string) + if !ok { + return fmt.Errorf("[cpu] defined in flavour should be string type") + } + flavour.CPU = refValue + case "mem": + refValue, ok := flavourValue.(string) + if !ok { + return fmt.Errorf("[memory] defined in flavour should be string type") + } + flavour.Mem = refValue + case "scalarResources": + refValue, ok := flavourValue.(map[string]interface{}) + if !ok { + return fmt.Errorf("[scalarResources] defined in flavour should be map type") + } + + res := make(map[ResourceName]string) + for scalarKey, scalaValue := range refValue { + value, ok := scalaValue.(string) + if !ok { + return fmt.Errorf("scalarResources [%v] defined in flavour should be string type", scalarKey) + } + res[ResourceName(scalarKey)] = value + } + if err := ValidateScalarResourceInfo(res, []string{}); err != nil { + return fmt.Errorf("validate scalar resource failed, error: %s", err.Error()) + } + flavour.ScalarResources = res + } + + } + return nil +} + func isValidScalarResource(r string, scalarResourcesType []string) bool { // TODO: get scalarResourcesType from cluster if len(scalarResourcesType) == 0 { diff --git a/pkg/common/schema/flavour_test.go b/pkg/common/schema/flavour_test.go index 4077376a4..f84920f95 100644 --- a/pkg/common/schema/flavour_test.go +++ b/pkg/common/schema/flavour_test.go @@ -1,6 +1,7 @@ package schema import ( + "fmt" "testing" "github.com/stretchr/testify/assert" @@ -44,3 +45,82 @@ func TestCheckResource(t *testing.T) { } } } + +func TestParseFlavour(t *testing.T) { + testCases := []struct { + name string + flavourMap map[string]interface{} + wantRes Flavour + wantErr error + }{ + { + name: "flavour with no scalaResources", + wantRes: Flavour{ + Name: "flavour1", + ResourceInfo: ResourceInfo{ + CPU: "1", + Mem: "1Gi", + }, + }, + flavourMap: map[string]interface{}{"name": "flavour1", "cpu": "1", "mem": "1Gi"}, + }, + { + name: "flavour with scalaResources", + wantRes: Flavour{ + Name: "flavour2", + ResourceInfo: ResourceInfo{ + CPU: "4", + Mem: "8Gi", + ScalarResources: ScalarResourcesType{ + "nvidia.com/gpu": "1", + }, + }, + }, + flavourMap: map[string]interface{}{"name": "flavour2", "cpu": "4", "mem": "8Gi", "scalarResources": map[string]interface{}{"nvidia.com/gpu": "1"}}, + }, + { + name: "flavour with unexcepted name", + wantRes: Flavour{}, + wantErr: fmt.Errorf("[name] defined in flavour should be string type"), + flavourMap: map[string]interface{}{"name": 1, "cpu": "4", "mem": "8Gi"}, + }, + { + name: "flavour with unexcepted cpu", + wantRes: Flavour{}, + wantErr: fmt.Errorf("[cpu] defined in flavour should be string type"), + flavourMap: map[string]interface{}{"name": "flavour3", "cpu": 4, "mem": "8Gi"}, + }, + { + name: "flavour with unexcepted mem", + wantRes: Flavour{}, + wantErr: fmt.Errorf("[memory] defined in flavour should be string type"), + flavourMap: map[string]interface{}{"name": "flavour4", "cpu": "4", "mem": 8}, + }, + { + name: "flavour with unexcepted scalaResources", + wantRes: Flavour{}, + wantErr: fmt.Errorf("scalarResources [nvidia.com/gpu] defined in flavour should be string type"), + flavourMap: map[string]interface{}{"name": "flavour5", "cpu": "4", "mem": "8Gi", "scalarResources": map[string]interface{}{"nvidia.com/gpu": 2}}, + }, + { + name: "flavour with invalid scalaResources", + wantRes: Flavour{}, + wantErr: fmt.Errorf("validate scalar resource failed, error: memory resource cannot be negative"), + flavourMap: map[string]interface{}{"name": "flavour6", "cpu": "4", "mem": "8Gi", "scalarResources": map[string]interface{}{"memory": "-1"}}, + }, + } + + for _, tc := range testCases { + flavour := Flavour{} + t.Run(tc.name, func(t *testing.T) { + err := ParseFlavour(tc.flavourMap, &flavour) + if err != nil { + assert.Equal(t, tc.wantErr, err) + } else { + assert.NoError(t, err) + assert.Equal(t, tc.wantRes, flavour) + } + }) + } + +} diff --git a/pkg/common/schema/fs.go b/pkg/common/schema/fs.go index bf9631e94..d132f9392 100644 --- a/pkg/common/schema/fs.go +++ b/pkg/common/schema/fs.go @@ -17,6 +17,7 @@ limitations under the License. package schema import ( + "fmt" "path" "strings" ) @@ -78,3 +79,57 @@ func ConcatenatePVName(namespace, fsID string) string { func ConcatenatePVCName(fsID string) string { return strings.Replace(PVCNameTemplate, FSIDFormat, fsID, -1) } + +func ParseFs(fsMap map[string]interface{}, fs *FileSystem) error { + for fsKey, fsValue := range fsMap { + switch fsKey { + case "name": + refValue, ok := fsValue.(string) + if !ok { + return fmt.Errorf("[name] defined in fs should be string type") + } + fs.Name = refValue + case "hostPath": + refValue, ok := fsValue.(string) + if !ok { + return fmt.Errorf("[hostPath] defined in fs should be string type") + } + fs.HostPath = refValue + case "mountPath": + refValue, ok := fsValue.(string) + if !ok { + return fmt.Errorf("[mountPath] defined in fs should be string type") + } + fs.MountPath = refValue + case "subPath": + refValue, ok := fsValue.(string) + if !ok { + return fmt.Errorf("[subPath] defined in fs should be string type") + } + fs.SubPath = refValue + case "readOnly": + refValue, ok := fsValue.(bool) + if !ok { + return fmt.Errorf("[readOnly] defined in fs should be bool type") + } + fs.ReadOnly = refValue + } + } + return nil +} + +func ParseExtraFs(values []interface{}, index int) ([]FileSystem, error) { + extraFsList := make([]FileSystem, 0) + for i, value := range values { + extraMap, ok := value.(map[string]interface{}) + extra := FileSystem{} + if !ok { + return []FileSystem{}, fmt.Errorf("[extra_fs %v] defined in member %v should be map type", i, index) + } + if err := ParseFs(extraMap, &extra); err != nil { + return []FileSystem{}, fmt.Errorf("parse [extra_fs %v] in member %v failed, error: %s", i, index, err.Error()) + } + extraFsList = append(extraFsList, extra) + } + return extraFsList, nil +} diff --git a/pkg/common/schema/job.go b/pkg/common/schema/job.go index 6476f5a2d..64ea824fb 100644 --- a/pkg/common/schema/job.go +++ b/pkg/common/schema/job.go @@ -30,6 +30,7 @@ type MemberRole string const ( EnvJobType = "PF_JOB_TYPE" EnvJobQueueName = "PF_JOB_QUEUE_NAME" + EnvJobPriority = "PF_JOB_PRIORITY" EnvJobNamespace = "PF_JOB_NAMESPACE" EnvJobUserName = "PF_USER_NAME" EnvJobMode = "PF_JOB_MODE" @@ -208,6 +209,7 @@ type PFJobConf interface { GetEnv() map[string]string GetEnvValue(key string) string GetEnvSubset(prefix string) map[string]string + GetCommand() string GetImage() string @@ -477,8 +479,13 @@ func (c *Conf) GetProcessedFileSystem() []FileSystem { } type Member struct { - ID string `json:"id"` - Replicas int `json:"replicas"` - Role MemberRole `json:"role"` - Conf `json:",inline"` + ID string `yaml:"-" json:"id"` + Replicas int `yaml:"replicas" json:"replicas"` + Role MemberRole `yaml:"role" json:"role"` + Conf `yaml:",inline" json:",inline"` +} + +type DistributedJob struct { + Members []Member `yaml:"members" json:"members,omitempty"` + Framework Framework `yaml:"framework" json:"framework,omitempty"` } diff --git a/pkg/common/schema/parser.go b/pkg/common/schema/parser.go index dd1796386..f2d3ec2aa 100644 --- a/pkg/common/schema/parser.go +++ b/pkg/common/schema/parser.go @@ -20,6 +20,8 @@ import ( "fmt" "strconv" "strings" + + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" ) type Parser struct { @@ -79,7 +81,6 @@ func (p *Parser) ParseWorkflowSource(bodyMap map[string]interface{}, wfs *Workfl } wfs.Cache = cache case "parallelism": - value1, ok1 := value.(int64) value2, ok2 := value.(float64) // 这里是为了兼容一个由json.Unmarshal得到的parallelism值 if ok1 { @@ -89,7 +90,6 @@ func (p *Parser) ParseWorkflowSource(bodyMap map[string]interface{}, wfs *Workfl } else { return fmt.Errorf("[parallelism] of workflow should be int type") } - case "disabled": value, ok := value.(string) if !ok { @@ -142,6 +142,7 @@ func (p *Parser) ParseWorkflowSource(bodyMap map[string]interface{}, wfs *Workfl return err } wfs.FsOptions = fsOptions + default: return fmt.Errorf("workflow has no attribute [%s]", key) } @@ -176,6 +177,7 @@ func (p *Parser) ParseComponents(entryPoints map[string]interface{}) (map[string } func (p *Parser) ParseStep(params map[string]interface{}, step *WorkflowSourceStep) error { + for key, value := range params { if value == nil { continue @@ -258,6 +260,41 @@ func (p *Parser) ParseStep(params map[string]interface{}, step *WorkflowSourceSt } } step.Artifacts = artifacts + case "distributed_job": + value, ok := value.(map[string]interface{}) + if !ok { + return fmt.Errorf("[distributed_job] in step should be map type") + } + + distJobs := DistributedJob{} + // parse framework + if value["framework"] != nil { + framework, ok := value["framework"].(string) + if !ok { + return fmt.Errorf("extract framework from [distributed_job] failed") + } + distJobs.Framework = Framework(framework) + } + // parse members + if value["members"] != nil { + members, ok, err := unstructured.NestedSlice(value, "members") + if !ok { + return fmt.Errorf("extract members from [distributed_job] failed because [%v]", err) + } + distJobs.Members = make([]Member, 0) + for index, member := range members { + mem := Member{} + memberMap, ok := member.(map[string]interface{}) + if !ok { + return fmt.Errorf("the member %v defined in [distributed_job] should be map type", index) + } + if err := p.ParseMember(memberMap, &mem, index); err != nil { + return fmt.Errorf("parse [member %v] in [distributed_job] failed, error: %s", index, err.Error()) + } + distJobs.Members = append(distJobs.Members, mem) + } + } + step.DistributedJob = distJobs case "env": value, ok := value.(map[string]interface{}) if !ok { @@ -425,6 +462,7 @@ func (p *Parser) ParseDag(params map[string]interface{}, dagComp *WorkflowSource if err != nil { return err } + dagComp.EntryPoints = entryPoints case "type": value, ok := value.(string) @@ -591,6 +629,139 @@ func (p *Parser) ParseFsMount(fsMap map[string]interface{}, fs *FsMount) error { return nil } +func (p *Parser) ParseMember(memberMap map[string]interface{}, member *Member, index int) error { + for memberKey, memberValue := range memberMap { + switch memberKey { + case "role": + refValue, ok := memberValue.(string) + if !ok { + return fmt.Errorf("[role] defined in member %v should be string type", index) + } + member.Role = MemberRole(refValue) + case "command": + refValue, ok := memberValue.(string) + if !ok { + return fmt.Errorf("[command] defined in member %v should be string type", index) + } + member.Command = refValue + case "replicas": + value1, ok1 := memberValue.(int64) + value2, ok2 := memberValue.(float64) // 这里是为了兼容一个由json.Unmarshal得到的replicas值 + if ok1 { + member.Replicas = int(value1) + } else if ok2 { + member.Replicas = int(value2) + } else { + return fmt.Errorf("[replicas] defined in member %v should be int type", index) + } + case "image": + refValue, ok := memberValue.(string) + if !ok { + return fmt.Errorf("[image] defined in member %v should be string type", index) + } + member.Image = refValue + case "port": + value1, ok1 := memberValue.(int64) + value2, ok2 := memberValue.(float64) // 这里是为了兼容一个由json.Unmarshal得到的port值 + if ok1 { + member.Port = int(value1) + } else if ok2 { + member.Port = int(value2) + } else { + return fmt.Errorf("[port] defined in member %v should be int type", index) + } + case "flavour": + refValue, ok := memberValue.(map[string]interface{}) + flavour := Flavour{} + if !ok { + return fmt.Errorf("[flavour] defined in member %v should be map type", index) + } + + if err := ParseFlavour(refValue, &flavour); err != nil { + return fmt.Errorf("parse [flavour] in member %v failed, error: %s", index, err.Error()) + } + member.Flavour = flavour + case "fs": + refValue, ok := memberValue.(map[string]interface{}) + fs := FileSystem{} + if !ok { + return fmt.Errorf("[fs] defined in member %v should be map type", index) + } + if err := ParseFs(refValue, &fs); err != nil { + return fmt.Errorf("parse [fs] in member %v failed, error: %s", index, err.Error()) + } + member.FileSystem = fs + case "extra_fs": + refValue, ok := memberValue.([]interface{}) + if !ok { + return fmt.Errorf("[extra_fs] defined in member %v should be list type", index) + } + extra, err := ParseExtraFs(refValue, index) + if err != nil { + return fmt.Errorf("parse [extra_fs] in member %v failed, error: %s", index, err.Error()) + } + member.ExtraFileSystem = extra + case "annotations": + if memberValue == nil { + continue + } + refValue, ok := memberValue.(map[string]interface{}) + if !ok { + return fmt.Errorf("[annotations] defined in member %v should be map type", index) + } + for annoKey, annoValue := range refValue { + value, ok := annoValue.(string) + if !ok { + return fmt.Errorf("values in [annotations] should be string type") + } + member.SetAnnotations(annoKey, value) + } + case "labels": + if memberValue == nil { + continue + } + refValue, ok := memberValue.(map[string]interface{}) + if !ok { + return fmt.Errorf("[labels] defined in member %v should be map type", index) + } + for labelKey, labelValue := range refValue { + value, ok := labelValue.(string) + if !ok { + return fmt.Errorf("values in [labels] should be string type") + } + member.SetLabels(labelKey, value) + } + case "env": + refValue, ok := memberValue.(map[string]interface{}) + if !ok { + return fmt.Errorf("[env] defined in member %v should be map type", index) + } + if member.Env == nil { + member.Env = map[string]string{} + } + // 设置在env里的变量优先级最高,通过其他字段设置的env变量,在这里会被覆盖值 + for envKey, envValue := range refValue { + resEnv := "" + switch envValue := envValue.(type) { + case string: + resEnv = envValue + case int64: + resEnv = strconv.FormatInt(envValue, 10) + case float64: + resEnv = strings.TrimRight(strconv.FormatFloat(envValue, 'f', 8, 64), "0") + default: + return fmt.Errorf("values in [env] should be string type") + } + member.SetEnv(envKey, resEnv) + } + case "id": + // 该字段不暴露给用户 + continue + } + } + return nil +} + func (p *Parser) IsDag(comp map[string]interface{}) bool { if _, ok := comp["entry_points"]; ok { return true @@ -643,6 +814,12 @@ func (p *Parser) TransJsonMap2Yaml(jsonMap map[string]interface{}) error { } jsonMap["fs_options"] = value delete(jsonMap, "fsOptions") + case "distributedJob": + if err := p.transJsonDistributedJobs2Yaml(value); err != nil { + return err + } + jsonMap["distributed_job"] = value + delete(jsonMap, "distributedJob") } } return nil @@ -750,3 +927,66 @@ func (p *Parser) transJsonFsOptions2Yaml(value interface{}) error { } return nil } + +func (p *Parser) transJsonDistributedJobs2Yaml(value interface{}) error { + if value == nil { + return nil + } + distJobMap, ok := value.(map[string]interface{}) + if !ok { + return fmt.Errorf("[distributedJob] should be map type") + } + for distKey, distValue := range distJobMap { + switch distKey { + case "framework": + distJobMap["framework"] = distValue + case "members": + if err := p.transJsonMembers2Yaml(distValue); err != nil { + return err + } + distJobMap["members"] = distValue + } + } + return nil +} + +func (p *Parser) transJsonMembers2Yaml(value interface{}) error { + if value == nil { + return nil + } + memberList, ok := value.([]interface{}) + if !ok { + return fmt.Errorf("[members] should be list type") + } + + for i, member := range memberList { + if err := p.transJsonMember2Yaml(member); err != nil { + return err + } + memberList[i] = member + } + return nil +} + +func (p *Parser) transJsonMember2Yaml(value interface{}) error { + memberMap, ok := value.(map[string]interface{}) + if !ok { + return fmt.Errorf("[member] should map type") + } + for memberKey, memberValue := range memberMap { + switch memberKey { + case "id": + // 该字段不暴露给用户 + continue + case "replicas": + memberMap["replicas"] = memberValue + case "role": + memberMap["role"] = memberValue + case "image": + memberMap["image"] = memberValue + case "command": + memberMap["command"] = memberValue + } + } + return nil +} diff --git a/pkg/common/schema/schema.go b/pkg/common/schema/schema.go index 56bc2c684..f48067507 100644 --- a/pkg/common/schema/schema.go +++ b/pkg/common/schema/schema.go @@ -37,27 +37,28 @@ type ComponentView interface { // JobView is view of job info responded to user, while Job is for pipeline and job engine to process type JobView struct { - PK int64 `json:"-"` - JobID string `json:"jobID"` - Name string `json:"name"` - Type string `json:"type"` - StepName string `json:"stepName"` - ParentDagID string `json:"parentDagID"` - LoopSeq int `json:"-"` - Command string `json:"command"` - Parameters map[string]string `json:"parameters"` - Env map[string]string `json:"env"` - ExtraFS []FsMount `json:"extraFS"` - StartTime string `json:"startTime"` - EndTime string `json:"endTime"` - Status JobStatus `json:"status"` - Deps string `json:"deps"` - DockerEnv string `json:"dockerEnv"` - Artifacts Artifacts `json:"artifacts"` - Cache Cache `json:"cache"` - JobMessage string `json:"jobMessage"` - CacheRunID string `json:"cacheRunID"` - CacheJobID string `json:"cacheJobID"` + PK int64 `json:"-"` + JobID string `json:"jobID"` + Name string `json:"name"` + Type string `json:"type"` + StepName string `json:"stepName"` + ParentDagID string `json:"parentDagID"` + LoopSeq int `json:"-"` + Command string `json:"command"` + Parameters map[string]string `json:"parameters"` + Env map[string]string `json:"env"` + ExtraFS []FsMount `json:"extraFS"` + DistributedJob DistributedJob `json:"distributedJob,omitempty"` + StartTime string `json:"startTime"` + EndTime string `json:"endTime"` + Status JobStatus `json:"status"` + Deps string `json:"deps"` + DockerEnv string `json:"dockerEnv"` + Artifacts Artifacts `json:"artifacts"` + Cache Cache `json:"cache"` + JobMessage string `json:"jobMessage"` + CacheRunID string `json:"cacheRunID"` + CacheJobID string `json:"cacheJobID"` } func (j JobView) GetComponentName() string { diff --git a/pkg/common/schema/workflow.go b/pkg/common/schema/workflow.go index bbfde5b48..f1a8c4ea8 100644 --- a/pkg/common/schema/workflow.go +++ b/pkg/common/schema/workflow.go @@ -23,11 +23,12 @@ import ( "reflect" "strings" - "github.com/PaddlePaddle/PaddleFlow/pkg/common/logger" "gopkg.in/yaml.v2" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" k8syaml "k8s.io/apimachinery/pkg/util/yaml" + + "github.com/PaddlePaddle/PaddleFlow/pkg/common/logger" ) const ( @@ -47,9 +48,10 @@ const ( FsPrefix = "fs-" - CompTypeComponents = "components" - CompTypeEntryPoints = "entryPoints" - CompTypePostProcess = "postProcess" + CompTypeDistributedJob = "distributed_job" + CompTypeComponents = "components" + CompTypeEntryPoints = "entryPoints" + CompTypePostProcess = "postProcess" ) func ID(userName, fsName string) string { @@ -134,18 +136,19 @@ type Component interface { } type WorkflowSourceStep struct { - Name string `yaml:"-" json:"name"` - LoopArgument interface{} `yaml:"loop_argument" json:"loopArgument"` - Condition string `yaml:"condition" json:"condition"` - Parameters map[string]interface{} `yaml:"parameters" json:"parameters"` - Command string `yaml:"command" json:"command"` - Deps string `yaml:"deps" json:"deps"` - Artifacts Artifacts `yaml:"artifacts" json:"artifacts"` - Env map[string]string `yaml:"env" json:"env"` - DockerEnv string `yaml:"docker_env" json:"dockerEnv"` - Cache Cache `yaml:"cache" json:"cache"` - Reference Reference `yaml:"reference" json:"reference"` - ExtraFS []FsMount `yaml:"extra_fs" json:"extraFS"` + Name string `yaml:"-" json:"name"` + LoopArgument interface{} `yaml:"loop_argument" json:"loopArgument"` + Condition string `yaml:"condition" json:"condition"` + Parameters map[string]interface{} `yaml:"parameters" json:"parameters"` + Command string `yaml:"command" json:"command"` + Deps string `yaml:"deps" json:"deps"` + Artifacts Artifacts `yaml:"artifacts" json:"artifacts"` + Env map[string]string `yaml:"env" json:"env"` + DockerEnv string `yaml:"docker_env" json:"dockerEnv"` + Cache Cache `yaml:"cache" json:"cache"` + Reference Reference `yaml:"reference" json:"reference"` + ExtraFS []FsMount `yaml:"extra_fs" json:"extraFS"` + DistributedJob DistributedJob `yaml:"distributed_job" json:"distributedJob"` } func (s *WorkflowSourceStep) GetName() string { @@ -173,6 +176,10 @@ func (s *WorkflowSourceStep) GetParameters() map[string]interface{} { return s.Parameters } +func (s *WorkflowSourceStep) GetDistributedJob() DistributedJob { + return s.DistributedJob +} + func (s *WorkflowSourceStep) GetCondition() string { return s.Condition } @@ -280,18 +287,19 @@ func (s *WorkflowSourceStep) DeepCopy() Component { fsMount := append(s.ExtraFS, []FsMount{}...) ns := &WorkflowSourceStep{ - Name: s.Name, - LoopArgument: s.LoopArgument, - Condition: s.Condition, - Parameters: params, - Command: s.Command, - Deps: s.Deps, - Env: env, - Artifacts: *s.Artifacts.DeepCopy(), - DockerEnv: s.DockerEnv, - Cache: s.Cache, - Reference: s.Reference, - ExtraFS: fsMount, + Name: s.Name, + LoopArgument: s.LoopArgument, + Condition: s.Condition, + Parameters: params, + Command: s.Command, + Deps: s.Deps, + Env: env, + Artifacts: *s.Artifacts.DeepCopy(), + DockerEnv: s.DockerEnv, + Cache: s.Cache, + Reference: s.Reference, + ExtraFS: fsMount, + DistributedJob: s.DistributedJob, } return ns @@ -521,7 +529,6 @@ func (wfs *WorkflowSource) UnmarshalJSON(data []byte) error { if err := p.TransJsonMap2Yaml(bodyMap); err != nil { return err } - if err := p.ParseWorkflowSource(bodyMap, wfs); err != nil { return err } @@ -697,10 +704,10 @@ func (wfs *WorkflowSource) ProcessRuntimeComponents(components map[string]Compon if step.Env == nil { step.Env = map[string]string{} } - // Reference节点不用替换 if step.Reference.Component == "" { // DockerEnv字段替换检查 + if step.DockerEnv == "" { step.DockerEnv = wfs.DockerEnv } diff --git a/pkg/pipeline/cache.go b/pkg/pipeline/cache.go index fbb020ac2..8a041687f 100644 --- a/pkg/pipeline/cache.go +++ b/pkg/pipeline/cache.go @@ -40,6 +40,8 @@ type conservativeFirstCacheKey struct { OutputArtifacts map[string]string `json:",omitempty"` MainFS schema.FsMount `json:",omitempty"` ExtraFS []schema.FsMount `json:",omitempty"` + Framework string `json:",omitempty"` + Members []schema.Member `json:",omitempty"` } type PathToModTime struct { @@ -121,6 +123,8 @@ func (cc *conservativeCacheCalculator) generateFirstCacheKey() error { Env: envWithoutSystmeEnv, ExtraFS: cc.extraFS, MainFS: *cc.mainFS, + Framework: string(cc.job.Framework), + Members: cc.job.Members, } logMsg := fmt.Sprintf("FirstCacheKey: \nDockerEnv: %s, Parameters: %s, Command: %s, InputArtifacts: %s, "+ diff --git a/pkg/pipeline/dagRuntime.go b/pkg/pipeline/dagRuntime.go index af180d576..08a626a52 100644 --- a/pkg/pipeline/dagRuntime.go +++ b/pkg/pipeline/dagRuntime.go @@ -191,6 +191,7 @@ func (drt *DagRuntime) getReadyComponent() map[string]schema.Component { } drt.logger.Infof("get ready subStep or subDag[%v] for dag[%s]", readyComponent, drt.name) + return readyComponent } @@ -346,7 +347,7 @@ func (drt *DagRuntime) Start() { view := drt.newView("begin to run") drt.syncToApiServerAndParent(WfEventDagUpdate, &view, "begin to run") - // 监听子节点已经父节点传递过来的事件或者信号 + // 监听子节点以及父节点传递过来的事件或者信号 go drt.Listen() // 开始调度子节点 diff --git a/pkg/pipeline/dagRuntime_test.go b/pkg/pipeline/dagRuntime_test.go index 90e436afe..2700c22fd 100644 --- a/pkg/pipeline/dagRuntime_test.go +++ b/pkg/pipeline/dagRuntime_test.go @@ -19,6 +19,7 @@ package pipeline import ( "context" "fmt" + "github.com/PaddlePaddle/PaddleFlow/pkg/common/config" "reflect" "strings" "testing" @@ -261,10 +262,11 @@ func TestDagRuntimeStart(t *testing.T) { } func TestScheduleSubComponent(t *testing.T) { + config.GlobalServerConfig = &config.ServerConfig{} + config.GlobalServerConfig.Job.SchedulerName = "testSchedulerName" eventChan := make(chan WorkflowEvent) drt, err := mockerDagRuntime(eventChan) assert.Nil(t, err) - drt.getworkflowSouceDag().EntryPoints["square-loop"].UpdateLoopArguemt([]int{1, 2, 3}) stepStarted := false diff --git a/pkg/pipeline/job.go b/pkg/pipeline/job.go index fd0aabedd..22828849a 100644 --- a/pkg/pipeline/job.go +++ b/pkg/pipeline/job.go @@ -19,19 +19,23 @@ package pipeline import ( "errors" "fmt" + "reflect" "time" + log "github.com/sirupsen/logrus" + "github.com/PaddlePaddle/PaddleFlow/pkg/apiserver/common" "github.com/PaddlePaddle/PaddleFlow/pkg/apiserver/controller/job" comErrors "github.com/PaddlePaddle/PaddleFlow/pkg/common/errors" "github.com/PaddlePaddle/PaddleFlow/pkg/common/logger" "github.com/PaddlePaddle/PaddleFlow/pkg/common/schema" + "github.com/PaddlePaddle/PaddleFlow/pkg/common/uuid" "github.com/PaddlePaddle/PaddleFlow/pkg/storage" ) type Job interface { Job() BaseJob - Update(cmd string, params map[string]string, envs map[string]string, artifacts *schema.Artifacts) + Update(cmd string, params map[string]string, envs map[string]string, artifacts *schema.Artifacts, distributedJob *schema.DistributedJob) Validate() error Start() (string, error) Stop() error @@ -44,6 +48,8 @@ type Job interface { Skipped() bool NotEnded() bool JobID() string + MemberInfo() []schema.Member + FrameworkInfo() schema.Framework } func NewBaseJob(name string) *BaseJob { @@ -77,9 +83,12 @@ type PaddleFlowJob struct { mainFS *schema.FsMount extraFS []schema.FsMount eventChannel chan<- WorkflowEvent + Members []schema.Member + Framework schema.Framework } -func NewPaddleFlowJob(name, image, userName string, eventChannel chan<- WorkflowEvent, mainFS *schema.FsMount, extraFS []schema.FsMount) *PaddleFlowJob { +func NewPaddleFlowJob(name, image, userName string, eventChannel chan<- WorkflowEvent, mainFS *schema.FsMount, + extraFS []schema.FsMount, framework schema.Framework, members []schema.Member) *PaddleFlowJob { return &PaddleFlowJob{ BaseJob: *NewBaseJob(name), Image: image, @@ -87,11 +96,13 @@ func NewPaddleFlowJob(name, image, userName string, eventChannel chan<- Workflow eventChannel: eventChannel, mainFS: mainFS, extraFS: extraFS, + Members: members, + Framework: framework, } } func NewPaddleFlowJobWithJobView(view *schema.JobView, image string, eventChannel chan<- WorkflowEvent, - mainFS *schema.FsMount, extraFS []schema.FsMount, userName string) *PaddleFlowJob { + mainFS *schema.FsMount, extraFS []schema.FsMount, userName string, framework schema.Framework, members []schema.Member) *PaddleFlowJob { pfj := PaddleFlowJob{ BaseJob: BaseJob{ ID: view.JobID, @@ -110,6 +121,8 @@ func NewPaddleFlowJobWithJobView(view *schema.JobView, image string, eventChanne mainFS: mainFS, extraFS: extraFS, userName: userName, + Framework: framework, + Members: members, } pfj.Status = common.StatusRunRunning @@ -119,7 +132,7 @@ func NewPaddleFlowJobWithJobView(view *schema.JobView, image string, eventChanne // 发起作业接口 func (pfj *PaddleFlowJob) Update(cmd string, params map[string]string, envs map[string]string, - artifacts *schema.Artifacts) { + artifacts *schema.Artifacts, distributedJob *schema.DistributedJob) { if cmd != "" { pfj.Command = cmd } @@ -132,11 +145,183 @@ func (pfj *PaddleFlowJob) Update(cmd string, params map[string]string, envs map[ pfj.Env = envs } + // members和framework添加到PaddleFlowJob中 + if distributedJob != nil { + pfj.Framework = distributedJob.Framework + pfj.Members = distributedJob.Members + } + if artifacts != nil { pfj.Artifacts = *artifacts } } +func generateJobID(param string) string { + return uuid.GenerateID(fmt.Sprintf("%s-%s", schema.JobPrefix, param)) +} + +func (pfj *PaddleFlowJob) generateCreateJobInfo() *job.CreateJobInfo { + mainfs := schema.FileSystem{} + if pfj.mainFS != nil { + mainfs = schema.FileSystem{ + ID: pfj.mainFS.ID, + Name: pfj.mainFS.Name, + SubPath: pfj.mainFS.SubPath, + MountPath: pfj.mainFS.MountPath, + ReadOnly: pfj.mainFS.ReadOnly, + } + } + efs := make([]schema.FileSystem, 0) + for _, fsMount := range pfj.extraFS { + fs := schema.FileSystem{ + ID: fsMount.ID, + Name: fsMount.Name, + SubPath: fsMount.SubPath, + MountPath: fsMount.MountPath, + ReadOnly: fsMount.ReadOnly, + } + efs = append(efs, fs) + } + + queueName := "" + if _, ok := pfj.Env[schema.EnvJobQueueName]; ok { + queueName = pfj.Env[schema.EnvJobQueueName] + } + priority := "" + if _, ok := pfj.Env[schema.EnvJobPriority]; ok { + priority = pfj.Env[schema.EnvJobPriority] + } + + commonInfo := job.CommonJobInfo{ + ID: generateJobID(pfj.Name), + Name: pfj.Name, + SchedulingPolicy: job.SchedulingPolicy{ + Queue: queueName, + Priority: priority, + }, + UserName: pfj.userName, + } + createJobInfo := &job.CreateJobInfo{ + CommonJobInfo: commonInfo, + } + + // 生成single或distributed job的createJobInfo信息 + if len(pfj.Members) == 0 { + createJobInfo.Type = schema.TypeSingle + createJobInfo.Framework = schema.FrameworkStandalone + createJobInfo.Members = []job.MemberSpec{ + { + CommonJobInfo: createJobInfo.CommonJobInfo, + JobSpec: job.JobSpec{ + Flavour: schema.Flavour{ + Name: pfj.Env[schema.EnvJobFlavour], + }, + LimitFlavour: schema.Flavour{ + Name: pfj.Env[schema.EnvJobLimitFlavour], + }, + FileSystem: mainfs, + ExtraFileSystems: efs, + Image: pfj.Image, + Env: pfj.Env, + Command: pfj.Command, + }, + Role: string(schema.RoleWorker), + Replicas: 1, + }, + } + + } else { + createJobInfo.Type = schema.TypeDistributed + createJobInfo.Framework = pfj.Framework + members := make([]job.MemberSpec, 0) + for _, member := range pfj.Members { + mem := job.MemberSpec{ + CommonJobInfo: createJobInfo.CommonJobInfo, + Role: string(member.Role), + Replicas: member.Replicas, + } + + image := "" + if member.GetImage() != "" { + image = member.GetImage() + } else { + image = pfj.Image + } + + annotations := make(map[string]string) + if member.GetAnnotations() != nil { + for k, v := range member.GetAnnotations() { + annotations[k] = v + } + mem.Annotations = annotations + } + + labels := make(map[string]string) + if member.GetLabels() != nil { + for k, v := range member.GetLabels() { + labels[k] = v + } + mem.Labels = labels + } + + var env map[string]string + if member.GetEnv() != nil { + env = pfj.Env + if env == nil { + env = make(map[string]string) + } + // 设置在member里的环境变量优先级最高 + for k, v := range member.GetEnv() { + env[k] = v + } + } else { + env = pfj.Env + } + + command := "" + if member.GetCommand() != "" { + command = member.GetCommand() + } else { + command = pfj.Command + } + + flavour := schema.Flavour{} + if !reflect.DeepEqual(flavour, member.Flavour) { + flavour = member.Flavour + } else { + flavour.Name = pfj.Env[schema.EnvJobFlavour] + } + + memberFs := schema.FileSystem{} + if !reflect.DeepEqual(memberFs, member.GetFileSystem()) { + memberFs = member.GetFileSystem() + } else { + memberFs = mainfs + } + + if member.GetExtraFS() != nil { + efs = member.GetExtraFS() + } + + jobInfo := job.JobSpec{ + Flavour: flavour, + LimitFlavour: member.LimitFlavour, + FileSystem: memberFs, + ExtraFileSystems: efs, + Env: env, + Command: command, + Image: image, + Port: member.Port, + Args: member.GetArgs(), + } + mem.JobSpec = jobInfo + members = append(members, mem) + } + createJobInfo.Members = members + } + return createJobInfo +} + // 生成job 的conf 信息 func (pfj *PaddleFlowJob) generateJobConf() schema.Conf { fs := schema.FileSystem{} @@ -192,9 +377,9 @@ func (pfj *PaddleFlowJob) Validate() error { var err error // 调用job子系统接口进行校验 - conf := pfj.generateJobConf() + jobInfo := pfj.generateCreateJobInfo() - err = job.ValidatePPLJob(&conf) + err = job.ValidatePPLJob(jobInfo) if err != nil { return err } @@ -207,14 +392,19 @@ func (pfj *PaddleFlowJob) Start() (string, error) { // 此函数不更新job.Status,job.startTime,统一通过watch更新 var err error + // 生成CreateJobInfo + createJobInfo := pfj.generateCreateJobInfo() + ctx := &logger.RequestContext{ + UserName: createJobInfo.UserName, + } // 调用job子系统接口发起运行 - conf := pfj.generateJobConf() - - pfj.ID, err = job.CreatePPLJob(&conf) + jobResponse, err := job.CreatePFJob(ctx, createJobInfo) if err != nil { + log.Errorf("create pipeline job failed. err: %s", err) return "", err } + pfj.ID = jobResponse.ID if pfj.ID == "" { err = fmt.Errorf("watch paddleflow job[%s] failed, job not started, id is empty", pfj.Job().Name) return "", err @@ -289,6 +479,10 @@ func (pfj *PaddleFlowJob) Watch() { pfj.eventChannel <- *wfe pfj.Status = jobInstance.Status pfj.Message = jobInstance.Message + // 更新Members状态 + if len(pfj.Members) > 0 { + pfj.Members = jobInstance.Members + } } if pfj.Succeeded() || pfj.Terminated() || pfj.Failed() { @@ -335,6 +529,14 @@ func (pfj *PaddleFlowJob) JobID() string { return pfj.ID } +func (pfj *PaddleFlowJob) MemberInfo() []schema.Member { + return pfj.Members +} + +func (pfj *PaddleFlowJob) FrameworkInfo() schema.Framework { + return pfj.Framework +} + // ---------------------------------------------------------------------------- // Local Process Job // ---------------------------------------------------------------------------- diff --git a/pkg/pipeline/job_test.go b/pkg/pipeline/job_test.go index 81ca35973..c4cbf4c69 100644 --- a/pkg/pipeline/job_test.go +++ b/pkg/pipeline/job_test.go @@ -9,10 +9,11 @@ import ( "github.com/PaddlePaddle/PaddleFlow/pkg/apiserver/controller/job" "github.com/PaddlePaddle/PaddleFlow/pkg/common/logger" + "github.com/PaddlePaddle/PaddleFlow/pkg/common/schema" ) func TestStopJob(t *testing.T) { - pfj := NewPaddleFlowJob("abc", "abc:qe", "root", make(chan<- WorkflowEvent), nil, nil) + pfj := NewPaddleFlowJob("abc", "abc:qe", "root", make(chan<- WorkflowEvent), nil, nil, "paddle", nil) assert.Equal(t, "root", pfj.userName) @@ -29,3 +30,183 @@ func TestStopJob(t *testing.T) { pfj.Stop() } + +func TestGenerateCreateJobInfo(t *testing.T) { + testCases := []struct { + name string + image string + userName string + mainFS *schema.FsMount + extraFs []schema.FsMount + framework string + members []schema.Member + wantRes job.CreateJobInfo + }{ + { + name: "CreateJobInfo for distributed paddle job", + image: "paddlepaddle/paddle:2.0.2-gpu-cuda10.1-cudnn7", + userName: "root", + mainFS: &schema.FsMount{Name: "xd"}, + extraFs: []schema.FsMount{}, + framework: "paddle", + members: []schema.Member{ + { + Replicas: 2, + Role: "pworker", + Conf: schema.Conf{ + Command: "echo worker", + Image: "paddlepaddle/paddle:2.0.2-gpu-cuda10.1-cudnn7", + Env: map[string]string{"Worker": "1"}, + Flavour: schema.Flavour{Name: "flavour1"}, + Labels: map[string]string{"Worker": "1"}, + }, + }, + { + Replicas: 2, + Role: "pserver", + Conf: schema.Conf{ + Command: "echo server", + Image: "paddlepaddle/paddle:2.0.2-gpu-cuda10.1-cudnn7", + Env: map[string]string{"PS": "1"}, + Flavour: schema.Flavour{Name: "flavour1"}, + Labels: map[string]string{"Worker": "1"}, + }, + }, + }, + wantRes: job.CreateJobInfo{ + CommonJobInfo: job.CommonJobInfo{ + Name: "CreateJobInfo for distributed paddle job", + UserName: "root", + SchedulingPolicy: job.SchedulingPolicy{ + Queue: "default-queue", + }, + }, + Framework: "paddle", + Type: schema.TypeDistributed, + Members: []job.MemberSpec{ + { + CommonJobInfo: job.CommonJobInfo{ + Name: "CreateJobInfo for distributed paddle job", + UserName: "root", + Labels: map[string]string{"Worker": "1"}, + }, + Replicas: 2, + Role: "pworker", + JobSpec: job.JobSpec{Image: "paddlepaddle/paddle:2.0.2-gpu-cuda10.1-cudnn7", Flavour: schema.Flavour{Name: "flavour1"}, Command: "echo worker", + Env: map[string]string{"Worker": "1"}, FileSystem: schema.FileSystem{Name: "xd"}, ExtraFileSystems: []schema.FileSystem{}}, + }, + { + CommonJobInfo: job.CommonJobInfo{ + Name: "CreateJobInfo for distributed paddle job", + UserName: "root", + Labels: map[string]string{"Worker": "1"}, + }, + Replicas: 2, + Role: "pserver", + JobSpec: job.JobSpec{Image: "paddlepaddle/paddle:2.0.2-gpu-cuda10.1-cudnn7", Flavour: schema.Flavour{Name: "flavour1"}, Command: "echo server", + Env: map[string]string{"PS": "1"}, FileSystem: schema.FileSystem{Name: "xd"}, ExtraFileSystems: []schema.FileSystem{}}, + }, + }, + }, + }, + { + name: "CreateJobInfo for distributed job", + image: "paddlepaddle/paddle:2.0.2-gpu-cuda10.1-cudnn7", + userName: "root", + mainFS: &schema.FsMount{Name: "xd"}, + extraFs: []schema.FsMount{}, + framework: "paddle", + members: []schema.Member{ + { + Replicas: 2, + Role: "pworker", + Conf: schema.Conf{ + FileSystem: schema.FileSystem{Name: "xd"}, + Annotations: map[string]string{"Worker": "1"}, + }, + }, + { + Replicas: 2, + Role: "pserver", + Conf: schema.Conf{ + FileSystem: schema.FileSystem{Name: "xd"}, + Annotations: map[string]string{"Worker": "1"}, + }, + }, + }, + wantRes: job.CreateJobInfo{ + CommonJobInfo: job.CommonJobInfo{ + Name: "CreateJobInfo for distributed job", + UserName: "root", + }, + Framework: "paddle", + Type: schema.TypeDistributed, + Members: []job.MemberSpec{ + { + CommonJobInfo: job.CommonJobInfo{ + Name: "CreateJobInfo for distributed job", + UserName: "root", + Annotations: map[string]string{"Worker": "1"}, + }, + Replicas: 2, + Role: "pworker", + JobSpec: job.JobSpec{Image: "paddlepaddle/paddle:2.0.2-gpu-cuda10.1-cudnn7", FileSystem: schema.FileSystem{Name: "xd"}, ExtraFileSystems: []schema.FileSystem{}}, + }, + { + CommonJobInfo: job.CommonJobInfo{ + Name: "CreateJobInfo for distributed job", + UserName: "root", + Annotations: map[string]string{"Worker": "1"}, + }, + Replicas: 2, + Role: "pserver", + JobSpec: job.JobSpec{Image: "paddlepaddle/paddle:2.0.2-gpu-cuda10.1-cudnn7", FileSystem: schema.FileSystem{Name: "xd"}, ExtraFileSystems: []schema.FileSystem{}}, + }, + }, + }, + }, + { + name: "single", + image: "paddlepaddle/paddle:2.0.2-gpu-cuda10.1-cudnn7", + userName: "root", + mainFS: &schema.FsMount{Name: "xd"}, + extraFs: []schema.FsMount{}, + framework: "paddle", + wantRes: job.CreateJobInfo{ + CommonJobInfo: job.CommonJobInfo{ + Name: "single", + UserName: "root", + SchedulingPolicy: job.SchedulingPolicy{ + Queue: "default-queue", + }, + }, + Framework: schema.FrameworkStandalone, + Type: schema.TypeSingle, + + Members: []job.MemberSpec{ + { + CommonJobInfo: job.CommonJobInfo{ + Name: "single", + UserName: "root", + }, + Replicas: 1, + Role: string(schema.RoleWorker), + JobSpec: job.JobSpec{Image: "paddlepaddle/paddle:2.0.2-gpu-cuda10.1-cudnn7", Flavour: schema.Flavour{}, FileSystem: schema.FileSystem{Name: "xd"}, ExtraFileSystems: []schema.FileSystem{}}, + }, + }, + }, + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + pfj := NewPaddleFlowJob(tc.name, tc.image, tc.userName, make(chan<- WorkflowEvent), tc.mainFS, tc.extraFs, schema.Framework(tc.framework), tc.members) + jobInfo := pfj.generateCreateJobInfo() + tc.wantRes.ID = jobInfo.ID + for index, member := range tc.wantRes.Members { + member.ID = tc.wantRes.ID + tc.wantRes.Members[index] = member + } + assert.Equal(t, tc.wantRes.Members, jobInfo.Members) + }) + } +} diff --git a/pkg/pipeline/paramTemplateSolver.go b/pkg/pipeline/paramTemplateSolver.go index 2b5c0645a..24c5fea22 100644 --- a/pkg/pipeline/paramTemplateSolver.go +++ b/pkg/pipeline/paramTemplateSolver.go @@ -253,6 +253,24 @@ func (isv *innerSolver) resolveCommand(forCache bool) error { return nil } +func (isv *innerSolver) resolveDistributedJobCommand(forCache bool) error { + // 调用方需要保证此时的 component 是一个Step + if members := isv.Component.(*schema.WorkflowSourceStep).GetDistributedJob().Members; members != nil { + for index, member := range members { + command := member.Command + newCommand, err := isv.resolveTemplate(command, FieldCommand, forCache) + if err != nil { + return err + } + isv.Component.(*schema.WorkflowSourceStep).GetDistributedJob().Members[index].Command = newCommand.(string) + isv.logger.Infof("after resolve template, the command of member %v [role %s] is: %s", + index, member.Role, newCommand) + } + + } + return nil +} + func (isv *innerSolver) resolveCondition() error { condition := isv.Component.GetCondition() newCondition, err := isv.resolveTemplate(condition, FieldCondition, false) diff --git a/pkg/pipeline/stepRuntime.go b/pkg/pipeline/stepRuntime.go index 1cb25fc81..2dcca2b18 100644 --- a/pkg/pipeline/stepRuntime.go +++ b/pkg/pipeline/stepRuntime.go @@ -59,12 +59,12 @@ func NewStepRuntime(name, fullName string, step *schema.WorkflowSourceStep, seq jobName := generateJobName(config.runID, step.GetName(), seq) job := NewPaddleFlowJob(jobName, srt.getWorkFlowStep().DockerEnv, srt.userName, srt.receiveEventChildren, - srt.runConfig.mainFS, srt.getWorkFlowStep().ExtraFS) + srt.runConfig.mainFS, srt.getWorkFlowStep().ExtraFS, step.DistributedJob.Framework, step.DistributedJob.Members) srt.job = job srt.logger.Infof("step[%s] of runid[%s] before starting job: param[%s], env[%s], command[%s], artifacts[%s], deps[%s], "+ - "extraFS[%v]", srt.getName(), srt.runID, step.Parameters, step.Env, step.Command, - step.Artifacts, step.Deps, step.ExtraFS) + "extraFS[%v], distributedJob[%v]", srt.getName(), srt.runID, step.Parameters, step.Env, step.Command, + step.Artifacts, step.Deps, step.ExtraFS, step.DistributedJob) return srt } @@ -160,7 +160,6 @@ func (srt *StepRuntime) Start() { // TODO: 此时是否需要同步至数据库? srt.logger.Infof("begin to run step[%s], and current parallelism is %d", srt.name, srt.parallelismManager.CurrentParallelism()) - err := srt.setSysParams() if err != nil { errMsg := fmt.Sprintf("set the sysparams for dag[%s] failed: %s", srt.name, err.Error()) @@ -228,8 +227,9 @@ func (srt *StepRuntime) Resume(view *schema.JobView) { defer srt.catchPanic() + distributedJobs := srt.getWorkFlowStep().DistributedJob srt.job = NewPaddleFlowJobWithJobView(view, srt.getWorkFlowStep().DockerEnv, - srt.receiveEventChildren, srt.runConfig.mainFS, srt.getWorkFlowStep().ExtraFS, srt.userName) + srt.receiveEventChildren, srt.runConfig.mainFS, srt.getWorkFlowStep().ExtraFS, srt.userName, distributedJobs.Framework, distributedJobs.Members) srt.pk = view.PK err := srt.updateStatus(view.Status) @@ -320,6 +320,12 @@ func (srt *StepRuntime) updateJob(forCacheFingerprint bool) error { params[paramName] = fmt.Sprintf("%v", paramValue) } + // 替换DistributedJob Member的command + if err := srt.innerSolver.resolveDistributedJobCommand(forCacheFingerprint); err != nil { + return err + } + distributedJobs := srt.getWorkFlowStep().DistributedJob + artifacts := schema.Artifacts{Input: map[string]string{}, Output: map[string]string{}} for atfName, atfValue := range srt.GetArtifacts().Input { artifacts.Input[atfName] = atfValue @@ -352,7 +358,7 @@ func (srt *StepRuntime) updateJob(forCacheFingerprint bool) error { } } - srt.job.Update(srt.getWorkFlowStep().Command, params, newEnvs, &artifacts) + srt.job.Update(srt.getWorkFlowStep().Command, params, newEnvs, &artifacts, &distributedJobs) srt.logger.Infof("step[%s] after resolve template: param[%s], artifacts[%s], command[%s], env[%s], FsMount[%v]", srt.name, params, artifacts, srt.getWorkFlowStep().Command, newEnvs, srt.getWorkFlowStep().ExtraFS) return nil @@ -598,6 +604,7 @@ func (srt *StepRuntime) startJob() (err error) { // TODO: 正式运行前,需要将更新后的参数更新到数据库中(通过传递workflow event到runtime即可) srt.logger.Debugf("begin to launch job for step [%s]", srt.name) + _, err = srt.job.Start() if err != nil { err = fmt.Errorf("start job for step[%s] with runid[%s] failed: [%s]", srt.name, srt.runID, err.Error()) @@ -832,27 +839,33 @@ func (srt *StepRuntime) newJobView(msg string) schema.JobView { art := srt.getWorkFlowStep().GetArtifacts() newArt := (&art).DeepCopy() + distJob := schema.DistributedJob{ + Framework: srt.job.FrameworkInfo(), + Members: srt.job.MemberInfo(), + } + view := schema.JobView{ - JobID: job.ID, - Name: job.Name, - Command: job.Command, - Parameters: params, - Env: job.Env, - StartTime: job.StartTime, - EndTime: job.EndTime, - Status: srt.status, - Deps: step.Deps, - DockerEnv: step.DockerEnv, - JobMessage: msg, - ParentDagID: srt.parentDagID, - CacheRunID: srt.CacheRunID, - CacheJobID: srt.CacheJobID, - StepName: srt.getComponent().GetName(), - Cache: srt.getWorkFlowStep().Cache, - PK: srt.pk, - LoopSeq: srt.loopSeq, - Artifacts: *newArt, - ExtraFS: srt.getWorkFlowStep().ExtraFS, + JobID: job.ID, + Name: job.Name, + Command: job.Command, + Parameters: params, + Env: job.Env, + StartTime: job.StartTime, + EndTime: job.EndTime, + Status: srt.status, + Deps: step.Deps, + DockerEnv: step.DockerEnv, + DistributedJob: distJob, + JobMessage: msg, + ParentDagID: srt.parentDagID, + CacheRunID: srt.CacheRunID, + CacheJobID: srt.CacheJobID, + StepName: srt.getComponent().GetName(), + Cache: srt.getWorkFlowStep().Cache, + PK: srt.pk, + LoopSeq: srt.loopSeq, + Artifacts: *newArt, + ExtraFS: srt.getWorkFlowStep().ExtraFS, } return view diff --git a/pkg/pipeline/stepRuntime_test.go b/pkg/pipeline/stepRuntime_test.go index c0881d47c..55bf7b58b 100644 --- a/pkg/pipeline/stepRuntime_test.go +++ b/pkg/pipeline/stepRuntime_test.go @@ -15,6 +15,7 @@ import ( apicommon "github.com/PaddlePaddle/PaddleFlow/pkg/apiserver/common" "github.com/PaddlePaddle/PaddleFlow/pkg/apiserver/handler" "github.com/PaddlePaddle/PaddleFlow/pkg/apiserver/models" + "github.com/PaddlePaddle/PaddleFlow/pkg/common/config" "github.com/PaddlePaddle/PaddleFlow/pkg/common/schema" "github.com/PaddlePaddle/PaddleFlow/pkg/pipeline/common" pplcommon "github.com/PaddlePaddle/PaddleFlow/pkg/pipeline/common" @@ -283,6 +284,24 @@ func TestUpdateJob(t *testing.T) { expectedCommand := "python train.py -r 0.1 -d ./data/pre --output ./data/model" assert.Equal(t, expectedCommand, srt.job.Job().Command) } + if stepName == "distributed-step" { + assert.Equal(t, 2, len(srt.job.Job().Parameters)) + assert.Contains(t, srt.job.Job().Parameters, "epoch") + + assert.Equal(t, "5", srt.job.Job().Parameters["epoch"]) + assert.Equal(t, 5+sysNum, len(srt.job.Job().Env)) // 4 env + 6 sys param + 2 artifact + + assert.Contains(t, srt.job.Job().Env, "PF_JOB_QUEUE") + assert.Contains(t, srt.job.Job().Env, "PF_JOB_PRIORITY") + assert.Contains(t, srt.job.Job().Env, "PF_JOB_FLAVOUR") + assert.Contains(t, srt.job.Job().Env, "PF_PS_NUM") + assert.Contains(t, srt.job.Job().Env, "PF_WORKER_NUM") + + expectedCommand := "sleep 30; echo ps 5 100" + assert.Equal(t, expectedCommand, srt.getWorkFlowStep().DistributedJob.Members[0].Command) + expectedCommand = "sleep 30; echo worker 5 100" + assert.Equal(t, expectedCommand, srt.getWorkFlowStep().DistributedJob.Members[1].Command) + } if stepName == "validate" { assert.Equal(t, 4, len(srt.job.Job().Parameters)) assert.Contains(t, srt.job.Job().Parameters, "refSystem") @@ -434,6 +453,8 @@ func mockToListenEvent(ec chan WorkflowEvent, ep *WorkflowEvent) { } func TestNewStepRuntimeWithStatus(t *testing.T) { + config.GlobalServerConfig = &config.ServerConfig{} + config.GlobalServerConfig.Job.SchedulerName = "testSchedulerName" handler.NewFsHandlerWithServer = handler.MockerNewFsHandlerWithServer testCase := loadcase(runYamlPath) wfs, err := schema.GetWorkflowSource([]byte(testCase)) @@ -474,7 +495,8 @@ func TestNewStepRuntimeWithStatus(t *testing.T) { } func TestExecute(t *testing.T) { - + config.GlobalServerConfig = &config.ServerConfig{} + config.GlobalServerConfig.Job.SchedulerName = "testSchedulerName" handler.NewFsHandlerWithServer = handler.MockerNewFsHandlerWithServer testCase := loadcase(runYamlPath) wfs, err := schema.GetWorkflowSource([]byte(testCase)) diff --git a/pkg/pipeline/testcase/run.yaml b/pkg/pipeline/testcase/run.yaml index eb9dc9dfd..f2b93e68c 100644 --- a/pkg/pipeline/testcase/run.yaml +++ b/pkg/pipeline/testcase/run.yaml @@ -40,6 +40,25 @@ entry_points: output: - train_model + distributed-step: + deps: main + parameters: + epoch: 5 + iteration: 100 + env: + PF_JOB_QUEUE: v100-16G + PF_JOB_PRIORITY: high + PF_JOB_FLAVOUR: v100-10 + PF_PS_NUM: "1" + PF_WORKER_NUM: "4" + distributed_job: + framework: "paddle" + members: + - { "role": "pserver", "port": 8080, "command": "sleep 30; echo ps {{epoch}} {{iteration}}","image": paddlepaddle/paddle:2.0.2-gpu-cuda10.1-cudnn7,"replicas": 2, + "flavour": { "name": "flavour1", "cpu": "1", "mem": "1G", "scalar_resources": { "nvidia.com/gpu": "1" } },"env": { "role": "ps" } } + - { "role": "pworker", "port": 8080, "command": "sleep 30; echo worker {{epoch}} {{iteration}}","image": paddlepaddle/paddle:2.0.2-gpu-cuda10.1-cudnn7,"replicas": 2, + "flavour": { "name": "flavour1", "cpu": "1", "mem": "1G", "scalar_resources": { "nvidia.com/gpu": "1" } },"env": { "role": "worker" } } + validate: deps: main,data-preprocess parameters: diff --git a/pkg/pipeline/testcase/run_dag.yaml b/pkg/pipeline/testcase/run_dag.yaml index e7eb0887d..b4819b1d5 100644 --- a/pkg/pipeline/testcase/run_dag.yaml +++ b/pkg/pipeline/testcase/run_dag.yaml @@ -107,7 +107,11 @@ entry_points: disStep: command: "echo dis" - + distributed_job: + framework: "paddle" + members: + - { "role": "pserver", "command": "","replicas": 2, "flavour": { "name": "flavour1" }} + - { "role": "pworker", "command": "","replicas": 2, "flavour": { "name": "flavour1" }} components: process-negetive: