Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(job): support distributed job in pipeline #1164

Open
wants to merge 131 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
131 commits
Select commit Hold shift + click to select a range
f07800f
feat(job): support distributed job in pipeline
wanziyu May 30, 2023
c466edc
feat(job): support distributed job in pipeline
wanziyu Jun 2, 2023
91028c6
Merge branch 'develop' into distributedjob
wanziyu Jun 2, 2023
dd1aa17
feat(job): support distributed job in pipeline
wanziyu Jun 2, 2023
6e9a892
Merge remote-tracking branch 'origin/distributedjob' into distributedjob
wanziyu Jun 2, 2023
a482b3e
feat(job): support distributed job in pipeline
wanziyu Jun 2, 2023
47a7e99
update create ppl
wanziyu Jun 3, 2023
1974710
feat(pipeline): support distributed job
wanziyu Jun 4, 2023
132f7f8
feat(pipeline): support distributed job
wanziyu Jun 5, 2023
e917726
Merge branch 'develop' into distributedjob
wanziyu Jun 5, 2023
edf67b7
feat(pipeline): support distributed job
wanziyu Jun 5, 2023
357b82d
Merge remote-tracking branch 'origin/distributedjob' into distributedjob
wanziyu Jun 5, 2023
7fa5c81
feat(pipeline): support distributed job
wanziyu Jun 5, 2023
274fa26
feat(pipeline): support distributed job
wanziyu Jun 5, 2023
5acaad2
feat(pipeline): support distributed job
wanziyu Jun 5, 2023
f445ea5
feat(pipeline): support distributed job
wanziyu Jun 5, 2023
995f1d9
feat(pipeline): support distributed job
wanziyu Jun 5, 2023
fa05f5d
feat(pipeline): support distributed job
wanziyu Jun 5, 2023
5648beb
feat(pipeline): support distributed job
wanziyu Jun 6, 2023
653772b
feat(pipeline): support distributed job
wanziyu Jun 6, 2023
ce29014
feat(pipeline): support distributed job
wanziyu Jun 6, 2023
998400f
feat(pipeline): support distributed job
wanziyu Jun 6, 2023
614d3e2
feat(pipeline): support distributed job
wanziyu Jun 6, 2023
58184a9
feat(pipeline): support distributed job
wanziyu Jun 6, 2023
160ef4e
feat(pipeline): support distributed job
wanziyu Jun 6, 2023
800d73e
feat(pipeline): support distributed job
wanziyu Jun 7, 2023
e73a814
Merge branch 'develop' into distributedjob
wanziyu Jun 7, 2023
e9c5463
[pipeline] add test for supporting distributed job
wanziyu Jun 7, 2023
793b499
Merge remote-tracking branch 'origin/distributedjob' into distributedjob
wanziyu Jun 7, 2023
3ca4ad5
[pipeline] add test for supporting distributed job
wanziyu Jun 7, 2023
1a76e33
fix panic for test: initialize sever config
wanziyu Jun 8, 2023
15cb168
feat(pipeline): support distributed job
wanziyu Jun 8, 2023
8e4c8ed
fix panic for test: initialize sever config
wanziyu Jun 8, 2023
74b029c
feat(pipeline): support distributed job
wanziyu Jun 9, 2023
0070930
feat(pipeline): support distributed job
wanziyu Jun 9, 2023
922ec5e
feat(pipeline): support distributed job
wanziyu Jun 9, 2023
2c8c61a
Merge branch 'PaddlePaddle:develop' into distributedjob
wanziyu Jun 17, 2023
37c1226
feat(pipeline): support distributed job
wanziyu Jun 25, 2023
3e6346e
Merge remote-tracking branch 'origin/distributedjob' into distributedjob
wanziyu Jun 25, 2023
3ea33d6
Merge branch 'PaddlePaddle:develop' into distributedjob
wanziyu Jun 29, 2023
2e98f75
feat(pipeline): support distributed job in dsl
wanziyu Jun 29, 2023
97bc2fc
feat(pipeline): support distributed job in pipeline
wanziyu Jun 29, 2023
a023b99
feat(pipeline): support distributed job in pipeline
wanziyu Jun 30, 2023
59cbb50
feat(pipeline): support distributed job in pipeline
wanziyu Jun 30, 2023
d437a24
feat(pipeline): support distributed job in pipeline
wanziyu Jun 30, 2023
363749b
feat(pipeline): support distributed job in cache
wanziyu Jul 3, 2023
676a785
feat(pipeline): add tests for distributed job
wanziyu Jul 3, 2023
6cf8fa3
feat(pipeline): update test for distributed job
wanziyu Jul 3, 2023
60beaa9
feat(pipeline): update dsl for distributed job
wanziyu Jul 3, 2023
8f37bb6
feat(pipeline): update dsl for distributed job
wanziyu Jul 4, 2023
413f3d6
feat(pipeline): update cache key for distributed job
wanziyu Jul 4, 2023
db7ad7b
feat(pipeline): update cache key for distributed job
wanziyu Jul 4, 2023
69e0249
feat(pipeline): add dsl tests for distributed job
wanziyu Jul 4, 2023
1e29637
feat(pipeline): update json parser for distributed job
wanziyu Jul 5, 2023
d366d7f
feat(pipeline): update json parser for distributed job
wanziyu Jul 5, 2023
bf0c37d
feat(pipeline): update json parser for distributed job
wanziyu Jul 5, 2023
5002a0f
feat(pipeline): update json parser for distributed job
wanziyu Jul 5, 2023
6e9754a
feat(pipeline): update json parser for distributed job
wanziyu Jul 6, 2023
2345948
feat(pipeline): update json parser for distributed job
wanziyu Jul 6, 2023
01ac977
feat(pipeline): update parser for distributed job
wanziyu Jul 6, 2023
9b7b248
feat(pipeline): update parser for distributed job
wanziyu Jul 7, 2023
32c952b
feat(pipeline): update parser for distributed job
wanziyu Jul 7, 2023
89db8f1
feat(pipeline): update parser for distributed job
wanziyu Jul 7, 2023
e7d14f0
feat(pipeline): update docs and example for distributed job
wanziyu Jul 10, 2023
0f4367b
Merge branch 'PaddlePaddle:develop' into distributedjob
wanziyu Jul 10, 2023
88d8373
feat(pipeline): update doc example for distributed job
wanziyu Jul 14, 2023
0136f7b
Merge remote-tracking branch 'origin/distributedjob' into distributedjob
wanziyu Jul 14, 2023
80d074c
feat(job): support distributed job in pipeline
wanziyu Jul 19, 2023
3955b51
feat(job): support distributed job in pipeline
wanziyu Jul 24, 2023
a82332b
feat(job): support distributed job in pipeline
wanziyu Jul 24, 2023
798d1c1
feat(job): support distributed job in pipeline
wanziyu Jul 24, 2023
e3a697b
feat(job): support distributed job in pipeline
wanziyu Jul 24, 2023
d2f4df4
feat(job): support distributed job in pipeline
wanziyu Jul 24, 2023
b0e1694
feat(job): support distributed job in pipeline
wanziyu Jul 24, 2023
1c3d7c7
feat(job): support distributed job in pipeline
wanziyu Jul 24, 2023
0f3bd6d
feat(job): support distributed job in pipeline
wanziyu Jul 25, 2023
3ec0252
feat(job): support distributed job in pipeline
wanziyu Jul 25, 2023
54581b7
feat(job): support distributed job in pipeline
wanziyu Jul 25, 2023
726f8e3
feat(job): support distributed job in pipeline
wanziyu Jul 25, 2023
48573bf
feat(job): support distributed job in pipeline
wanziyu Jul 25, 2023
327a94a
feat(job): support distributed job in pipeline
wanziyu Jul 25, 2023
9c09fe6
feat(job): support distributed job in pipeline
wanziyu Jul 25, 2023
879bf7f
feat(job): support distributed job in pipeline
wanziyu Jul 26, 2023
ac2a576
feat(job): support distributed job in pipeline
wanziyu Jul 26, 2023
45a811a
feat(job): support distributed job in pipeline
wanziyu Jul 26, 2023
e5ff3f1
feat(job): support distributed job in pipeline
wanziyu Jul 26, 2023
b7c05e1
feat(job): support distributed job in pipeline
wanziyu Jul 27, 2023
b2ab34b
feat(job): support distributed job in pipeline
wanziyu Jul 27, 2023
bdeb464
feat(job): support distributed job in pipeline
wanziyu Jul 27, 2023
e25effb
feat(job): support distributed job in pipeline
wanziyu Jul 27, 2023
8b12d39
feat(job): support distributed job in pipeline
wanziyu Jul 29, 2023
4b3d31e
feat(job): support distributed job in pipeline
wanziyu Jul 29, 2023
a66e52c
feat(job): support distributed job in pipeline
wanziyu Jul 30, 2023
fd0bce6
feat(job): support distributed job in pipeline
wanziyu Jul 31, 2023
ad8d36c
feat(job): support distributed job in pipeline
wanziyu Jul 31, 2023
9470fea
feat(job): support distributed job in pipeline
wanziyu Jul 31, 2023
2b54e5c
Merge branch 'PaddlePaddle:develop' into distributedjob
wanziyu Jul 31, 2023
b5c9c1f
feat(job): support distributed job in pipeline
wanziyu Aug 1, 2023
ed39c24
Merge remote-tracking branch 'origin/distributedjob' into distributedjob
wanziyu Aug 1, 2023
7d49998
feat(job): support distributed job in pipeline
wanziyu Aug 1, 2023
5ea44f3
feat(job): support distributed job in pipeline
wanziyu Aug 4, 2023
379c600
feat(job): support distributed job in pipeline
wanziyu Aug 4, 2023
9f8878e
feat(job): support distributed job in pipeline
wanziyu Aug 4, 2023
38d0a17
feat(job): support distributed job in pipeline
wanziyu Aug 5, 2023
291c851
feat(job): support distributed job in pipeline
wanziyu Aug 9, 2023
bef287e
feat(job): support distributed job in pipeline
wanziyu Aug 9, 2023
833569f
feat(job): support distributed job in pipeline
wanziyu Aug 10, 2023
ac52b44
feat(job): support distributed job in pipeline
wanziyu Aug 10, 2023
90ae32c
feat(job): support distributed job in pipeline
wanziyu Aug 10, 2023
b532602
feat(job): support distributed job in pipeline
wanziyu Aug 10, 2023
b9e2274
feat(job): support distributed job in pipeline
wanziyu Aug 10, 2023
4d107ff
feat(job): support distributed job in pipeline
wanziyu Aug 10, 2023
a7bc385
feat(job): support distributed job in pipeline
wanziyu Aug 10, 2023
3729d62
feat(job): support distributed job in pipeline
wanziyu Aug 10, 2023
5ede93b
feat(job): support distributed job in pipeline
wanziyu Aug 10, 2023
b2964c0
feat(job): support distributed job in pipeline
wanziyu Aug 10, 2023
08024a4
Merge branch 'develop' into distributedjob
wanziyu Aug 10, 2023
f7d6883
feat(job): support distributed job in pipeline
wanziyu Aug 16, 2023
679c4fa
Merge branch 'develop' into distributedjob
wanziyu Nov 2, 2023
05d7e5b
Merge branch 'develop' into distributedjob
qiaoshuangshuang Jan 23, 2024
a84be99
feat(job): support distributed job in pipeline
wanziyu Jan 23, 2024
1f3ac1f
feat(job): support distributed job in pipeline
wanziyu Jan 23, 2024
d4a38c9
feat(job): support distributed job in pipeline
wanziyu Feb 18, 2024
7ac066e
Merge branch 'develop' into distributedjob
wanziyu Feb 18, 2024
7841c7d
Merge branch 'develop' into distributedjob
wanziyu Mar 6, 2024
872ff46
feat(job): support distributed job in pipeline
wanziyu Mar 6, 2024
e71cb9a
feat(job): support distributed job in pipeline
wanziyu Mar 6, 2024
f54cb0c
feat(job): support distributed job in pipeline
wanziyu Mar 7, 2024
751cb30
Merge branch 'develop' into distributedjob
wanziyu Mar 12, 2024
effe949
Merge branch 'develop' into distributedjob
wanziyu Mar 25, 2024
b8336b3
Merge branch 'develop' into distributedjob
wanziyu Apr 7, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 51 additions & 7 deletions client/paddleflow/job/job_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@
limitations under the License.
"""

#!/usr/bin/env python3
# !/usr/bin/env python3
# -*- coding:utf8 -*-
from paddleflow.common.exception import PaddleFlowSDKException


class JobInfo(object):
Expand Down Expand Up @@ -134,8 +135,10 @@ class Member(object):
Members
"""

def __init__(self, role, replicas, job_id=None, job_name=None, queue=None, labels=None, annotations=None, priority=None,
flavour=None, fs=None, extra_fs_list=None, image=None, env=None, command=None, args_list=None, port=None,
def __init__(self, role, replicas, job_id=None, job_name=None, queue=None, labels=None, annotations=None,
priority=None,
flavour=None, fs=None, extra_fs_list=None, image=None, env=None, command=None, args_list=None,
port=None,
extension_template=None):
"""

Expand Down Expand Up @@ -175,11 +178,55 @@ def __init__(self, role, replicas, job_id=None, job_name=None, queue=None, label
self.port = port
self.extension_template = extension_template

def compile(self):

result = {}

if not self.role:
raise PaddleFlowSDKException(f"the role of distributed member cannot be empty")

result["role"] = self.role

if not self.replicas:
raise PaddleFlowSDKException(f"the replicas of distributed member cannot be empty")

result["replicas"] = self.replicas

if self.image:
result["image"] = self.image

if self.command:
result["command"] = self.command

if self.flavour:
result["flavour"] = self.flavour

if self.fs:
result["fs"] = self.fs

if self.extra_fs_list:
result["extra_fs"] = self.extra_fs_list

if self.port:
result["port"] = self.port

if self.env:
result["env"] = self.env

if self.annotations:
result["annotations"] = self.annotations

if self.labels:
result["labels"] = self.labels

return result


class Flavour(object):
"""
Flavour
"""

def __init__(self, name, cpu, memory, scalar_resources):
"""

Expand All @@ -198,12 +245,9 @@ class FileSystem(object):
"""
FileSystem
"""

def __init__(self, name, mount_path, sub_path, read_only):
self.name = name
self.mount_path = mount_path
self.sub_path = sub_path
self.read_only = read_only




1 change: 1 addition & 0 deletions client/paddleflow/pipeline/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from .dsl import Artifact
from .dsl import Parameter
from .dsl import ContainerStep
from .dsl import DistributedJob
from .dsl import DAG
from .dsl import Pipeline
from .dsl import FAIL_CONTINUE
Expand Down
2 changes: 1 addition & 1 deletion client/paddleflow/pipeline/dsl/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from .options import FSOptions
from .options import ExtraFS
from .options import MainFS

from .options import DistributedJob

# import from io_types
from .io_types import Artifact
Expand Down
18 changes: 14 additions & 4 deletions client/paddleflow/pipeline/dsl/compiler/step_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from .component_compiler import ComponentCompiler

from paddleflow.pipeline.dsl.options import ExtraFS
from paddleflow.pipeline.dsl.options import DistributedJob
from paddleflow.pipeline.dsl.utils.consts import PipelineDSLError
from paddleflow.common.exception.paddleflow_sdk_exception import PaddleFlowSDKException

Expand All @@ -28,7 +29,7 @@ def __init__(self, component):
super().__init__(component)

def compile(self):
""" trans step to dicts'
""" trans step to dicts
"""
super().compile()

Expand All @@ -39,9 +40,12 @@ def compile(self):

# compile extra_fs
self._compile_extra_fs()

# compile distributed_job
self._compile_distributed_job()

return self._dict

def _compile_base_info(self):
""" compiler base info such as command, docker_env, env
"""
Expand Down Expand Up @@ -70,7 +74,7 @@ def _compile_cache_options(self):
"""
if self._component.cache_options:
self._dict["cache"] = self._component.cache_options.compile()

def _compile_extra_fs(self):
""" compile extra_fs
"""
Expand All @@ -85,4 +89,10 @@ def _compile_extra_fs(self):
raise PaddleFlowSDKException(PipelineDSLError,
self._generate_error_msg("Step's extra_fs attribute should be a list of ExtraFS instance"))

self._dict["extra_fs"].append(extra.compile())
self._dict["extra_fs"].append(extra.compile())

def _compile_distributed_job(self):
""" compile distributed_job
"""
if self._component.distributed_job:
self._dict["distributed_job"] = self._component.distributed_job.compile()
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from paddleflow.pipeline.dsl.io_types import EnvDict
from paddleflow.pipeline.dsl.options import CacheOptions
from paddleflow.pipeline.dsl.options import ExtraFS
from paddleflow.pipeline.dsl.options import DistributedJob
from paddleflow.pipeline.dsl.utils.util import validate_string_by_regex
from paddleflow.pipeline.dsl.utils.consts import COMPONENT_NAME_REGEX
from paddleflow.pipeline.dsl.utils.consts import PipelineDSLError
Expand All @@ -49,6 +50,7 @@ def __init__(
cache_options: CacheOptions=None,
condition: str=None,
loop_argument: Union[List, Parameter, Artifact, str]=None,
distributed_job: DistributedJob=None,
extra_fs: List[ExtraFS] = None
):
""" create a new instance of ContainerStep
Expand All @@ -62,7 +64,7 @@ def __init__(
parameters (str, Any): Parameter of step, the key is the name of this parameter, and the value could be int, string, Paramter, or upstream Step's Parameter
env (Dict[str, str]): enviroment varible for Step runtime
cache_options (CacheOptions): the cache options of step

distributed_job (DistributedJob): the distributed jobs defined in step
Raises:
PaddleFlowSDKException: if some args is illegal
"""
Expand All @@ -72,6 +74,7 @@ def __init__(
outputs=outputs,
parameters=parameters,
cache_options=cache_options,
distributed_job=distributed_job,
condition=condition,
loop_argument=loop_argument,
extra_fs=extra_fs
Expand Down
42 changes: 25 additions & 17 deletions client/paddleflow/pipeline/dsl/component/steps/step.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,33 +16,37 @@
import copy
from pathlib import Path
from typing import Dict
from typing import Any
from typing import List
from typing import Any
from typing import List
from typing import Union

from paddleflow.pipeline.dsl.component import Component
from paddleflow.pipeline.dsl.io_types import Artifact
from paddleflow.pipeline.dsl.io_types import Parameter
from paddleflow.pipeline.dsl.options import CacheOptions
from paddleflow.pipeline.dsl.options import ExtraFS
from paddleflow.pipeline.dsl.options import DistributedJob
from paddleflow.pipeline.dsl.utils.consts import PipelineDSLError
from paddleflow.common.exception.paddleflow_sdk_exception import PaddleFlowSDKException
from paddleflow.job import Member


class Step(Component):
""" Step is the basic scheduling unit in pipeline
"""

def __init__(
self,
name: str,
inputs: Dict[str, Artifact]=None,
outputs: Dict[str, Artifact]=None,
parameters: Dict[str, Any]=None,
cache_options: CacheOptions=None,
condition: str=None,
loop_argument: Union[List, Parameter, Artifact, str]=None,
extra_fs: List[ExtraFS] = None
):
self,
name: str,
inputs: Dict[str, Artifact] = None,
outputs: Dict[str, Artifact] = None,
parameters: Dict[str, Any] = None,
cache_options: CacheOptions = None,
condition: str = None,
loop_argument: Union[List, Parameter, Artifact, str] = None,
extra_fs: List[ExtraFS] = None,
distributed_job: DistributedJob = None,
):
""" create a new instance of Step

Args:
Expand All @@ -54,25 +58,29 @@ def __init__(
condition (str): the condition. when schedule component, would be calculate condition, if the result of it is False, then the status of this component would be set "skipped"
loop_argument (Union[List, Parameter, Artifact, str]): the loop arugment, when schedule this component, would be be traversed, and the runtime will be created once for each item in it
extra_fs (List[ExtraFS]): the paddleflow filesystem used by Step
distributed_job (DistributedJob): the distributed jobs of step
Raises:
PaddleFlowSDKException: if some args is illegal
"""
super().__init__(name, inputs, outputs, parameters, condition, loop_argument)
self._type = "step"

self.cache_options = cache_options

self.distributed_job = distributed_job

if extra_fs and not isinstance(extra_fs, list):
extra_fs = [extra_fs]

if isinstance(extra_fs, list):
self.extra_fs = []
for extra in extra_fs:
if not isinstance(extra, ExtraFS):
raise PaddleFlowSDKException(PipelineDSLError,
self._generate_error_msg("Step's extra_fs attribute should be a list of ExtraFS instance"))
raise PaddleFlowSDKException(PipelineDSLError,
self._generate_error_msg(
"Step's extra_fs attribute should be a list of ExtraFS instance"))

self.extra_fs.append(extra)
else:
self.extra_fs = []
self.extra_fs = []


1 change: 1 addition & 0 deletions client/paddleflow/pipeline/dsl/options/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,5 @@
from .fs_options import FSOptions
from .fs_options import ExtraFS
from .fs_options import MainFS
from .distributed_job import DistributedJob

61 changes: 61 additions & 0 deletions client/paddleflow/pipeline/dsl/options/distributed_job.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
from paddleflow.job import Member

from .options import Options
from typing import List

from paddleflow.common.exception import PaddleFlowSDKException
from paddleflow.pipeline.dsl.utils.consts import PipelineDSLError


class DistributedJob(object):
""" distributed job which used by step
"""

def __init__(
self,
framework: str,
members: List[Member] = None,
):
""" create a new instance for DistributedJob

Args:
framework (str): the framework of distributed job
members List(Member): the members defined in distributed job
"""
self.framework = framework

if members:
if not isinstance(members, list):
members = [members]

for member in members:
if not isinstance(member, Member):
raise PaddleFlowSDKException(PipelineDSLError,
"DistributedJob's members attribute should be a list of Member instance")

self.members = members
else:
self.members = None

def compile(self):
""" trans to dict
"""
result = {}

if not self.framework:
raise PaddleFlowSDKException(PipelineDSLError, "DistributedJob's framework attribute cannot empty")

result["framework"] = self.framework
result["members"] = []
if self.members:

if not isinstance(self.members, list):
self.members = [self.members]

for member in self.members:
if not isinstance(member, Member):
raise PaddleFlowSDKException(PipelineDSLError,
"DistributedJob's members attribute should be a list of Member instance")
result["members"].append(member.compile())

return result
2 changes: 1 addition & 1 deletion client/paddleflow/pipeline/dsl/options/fs_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def __init__(
mount_path: str=None,
read_only: bool=False
):
""" create an new instance for ExtraFS
""" create a new instance for ExtraFS

Args:
name (str): the name of paddleflow filesystem
Expand Down
2 changes: 1 addition & 1 deletion client/paddleflow/pipeline/dsl/options/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def compile(self):
"""
self._validate()

result = {}
result = {}
for attr, key in self.COMPILE_ATTR_MAP.items():
value = getattr(self, attr, None)
if value is not None:
Expand Down
Loading
Loading