Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement parallel execution for DAG tasks #4128

Open
wants to merge 56 commits into
base: advanced-dag
Choose a base branch
from

Conversation

andylizf
Copy link
Contributor

Closes #4055

This PR implements parallel execution for DAG tasks in the jobs controller, addressing issue #4055. The changes allow for efficient execution of complex DAGs with independent tasks running concurrently, significantly improving performance for workflows with parallel components.

Changes

  • Modified JobsController to identify and execute parallel task groups
  • Implemented thread-safe task execution and monitoring
  • Added concurrent resource management and cleanup

Tested (run the relevant ones):

  • Code formatting: bash format.sh
  • Any manual or new tests for this PR (please specify below)
  • All smoke tests: pytest tests/test_smoke.py
  • Relevant individual smoke tests: pytest tests/test_smoke.py::test_fill_in_the_name
  • Backward compatibility tests: conda deactivate; bash -i tests/backward_compatibility_tests.sh

@andylizf
Copy link
Contributor Author

@cblmemo I'm currently working on implementing a cancellation mechanism for tasks that have already started or are queued for execution (similar to your setup with replicas preparing to launch). I'm currently using future.cancel, but it doesn't seem to fully address cancellation of tasks that are already in progress. I haven't switched to using thread.event yet, which might improve this.

That said, I noticed you used Process for managing the launch and termination of replicas. I don't see any clear advantages to using Process over Thread, especially since Thread should handle task cancellation just as well without the overhead of creating separate processes. Could you clarify your reasoning for choosing Process here? Is there a specific limitation you're addressing with this approach?

@cblmemo
Copy link
Collaborator

cblmemo commented Oct 19, 2024

@cblmemo I'm currently working on implementing a cancellation mechanism for tasks that have already started or are queued for execution (similar to your setup with replicas preparing to launch). I'm currently using future.cancel, but it doesn't seem to fully address cancellation of tasks that are already in progress. I haven't switched to using thread.event yet, which might improve this.

That said, I noticed you used Process for managing the launch and termination of replicas. I don't see any clear advantages to using Process over Thread, especially since Thread should handle task cancellation just as well without the overhead of creating separate processes. Could you clarify your reasoning for choosing Process here? Is there a specific limitation you're addressing with this approach?

This is mainly due to logging. Threading will share a same sys.stdout which makes the following code not feasible.

I cannot find a way to do this kind of logging redirection back then. If you figured out a way, pls let me know ;)

class RedirectOutputForProcess:
"""Redirects stdout and stderr to a file.
This class enabled output redirect for multiprocessing.Process.
Example usage:
p = multiprocessing.Process(
target=RedirectOutputForProcess(func, file_name).run, args=...)
This is equal to:
p = multiprocessing.Process(target=func, args=...)
Plus redirect all stdout/stderr to file_name.
"""
def __init__(self, func: Callable, file: str, mode: str = 'w') -> None:
self.func = func
self.file = file
self.mode = mode
def run(self, *args, **kwargs):
with open(self.file, self.mode, encoding='utf-8') as f:
sys.stdout = f
sys.stderr = f
# reconfigure logger since the logger is initialized before
# with previous stdout/stderr
sky_logging.reload_logger()
logger = sky_logging.init_logger(__name__)
# The subprocess_util.run('sky status') inside
# sky.execution::_execute cannot be redirect, since we cannot
# directly operate on the stdout/stderr of the subprocess. This
# is because some code in skypilot will specify the stdout/stderr
# of the subprocess.
try:
self.func(*args, **kwargs)
except Exception as e: # pylint: disable=broad-except
logger.error(f'Failed to run {self.func.__name__}. '
f'Details: {common_utils.format_exception(e)}')
with ux_utils.enable_traceback():
logger.error(f' Traceback:\n{traceback.format_exc()}')
raise

@andylizf andylizf changed the base branch from master to advanced-dag October 23, 2024 21:27
@andylizf
Copy link
Contributor Author

Left some design considerations for review @cblmemo:

  1. User feedback: Should we show "submitting to launch pool" or similar messages in console/controller logs to improve visibility?

  2. Log accessibility: After separating thread logs, how should we expose them to users? Consider adding "View logs at: ..." messages for each subtask.

  3. Task cancellation propagation: Need to define behavior of downstream tasks when their upstream tasks are cancelled (which happens when the upstream's upstream tasks fail). Current pending state might keep job running indefinitely.

@cblmemo
Copy link
Collaborator

cblmemo commented Oct 24, 2024

Left some design considerations for review @cblmemo:

  1. User feedback: Should we show "submitting to launch pool" or similar messages in console/controller logs to improve visibility?
  2. Log accessibility: After separating thread logs, how should we expose them to users? Consider adding "View logs at: ..." messages for each subtask.
  3. Task cancellation propagation: Need to define behavior of downstream tasks when their upstream tasks are cancelled (which happens when the upstream's upstream tasks fail). Current pending state might keep job running indefinitely.
  1. I think running workload in parallel is expected and the technical terms would confuse users. Maybe lets leave them out for now.
  2. Yes, that would be great. For previous pipeline, I believe we are just tailing the log one-by-one, as they are sequential. I think we can start with only printing out the file name (and command to tail the logs) in the main controller log. We should also think of supporting commands to tail a specific task's log. Maybe add an argument --task-name in sky jobs logs (and print this in the controller log).
  3. Could you elaborate on what case would we encounter this problem? I think we will only start a job if all of its upstream is finished (thus cannot be cancelled)? IIUC if we want to cancel a job, we only need to stop all currently running jobs.

Copy link
Collaborator

@cblmemo cblmemo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for adding this feature @andylizf ! It is awesome. Left some comments to discuss ;)

Comment on lines 62 to 64
self._completed_tasks: Set[int] = set()
self._failed_tasks: Set[int] = set()
self._block_tasks: Set[int] = set()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use a map from id to its status? maintain 3 lists feels a little bit redundant for me.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, is it possible to use its name as the identifier? And related to previous PR, maybe instead of the related less meaningful timestamps, we could use its index in a topo order as its default name (if user does not specified one). e.g. task_1.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The name block_tasks feels a little bit strange to me as it might indicate it will be executed at some point, which is not true (it is permatently cancelled). should we rename it to CANCELLED?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, is it possible to use its name as the identifier? And related to previous PR, maybe instead of the related less meaningful timestamps, we could use its index in a topo order as its default name (if user does not specified one). e.g. task_1.

@cblmemo Could you clarify the second point? Not sure how using names relates to tasks here. Thanks!

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently we generate task name for tasks if user does not specify a name. The name is sth like f'task_{current_timestamp}' IIRC. Maybe we could change the naming convention to f'task_{id_in_topo_order}'

sky/jobs/controller.py Outdated Show resolved Hide resolved
sky/jobs/controller.py Show resolved Hide resolved
sky/jobs/controller.py Outdated Show resolved Hide resolved
sky/jobs/controller.py Outdated Show resolved Hide resolved
sky/jobs/controller.py Show resolved Hide resolved
sky/jobs/recovery_strategy.py Outdated Show resolved Hide resolved
sky/jobs/state.py Outdated Show resolved Hide resolved
sky/serve/serve_utils.py Outdated Show resolved Hide resolved
sky/utils/ux_utils.py Outdated Show resolved Hide resolved
@andylizf
Copy link
Contributor Author

andylizf commented Oct 27, 2024

TODOs:

  • Update sky.launch to direct thread output to separate files, preventing interleaved logs in the controller's run.log.
  • Adjust the logic in stream_logs_by_id or sky jobs logs 1 --controller to avoid relying on get_latest_task_id_status, ensuring that each task’s run.log outputs fully and independently.

@andylizf
Copy link
Contributor Author

andylizf commented Oct 28, 2024

Finished implementing the initial TODOs:

TODOs:

  • Update sky.launch to direct thread output to separate files, preventing interleaved logs in the controller's run.log.
  • Adjust the logic in stream_logs_by_id or sky jobs logs 1 --controller to avoid relying on get_latest_task_id_status, ensuring that each task’s run.log outputs fully and independently.

However, there are two remaining issues to be addressed:

  • Controller-side cluster's launch logs accessibility: Currently, logs like ~/sky_logs/sky-2024-10-28-04-14-45-846061/task_3_launch.log are only available on the controller machine and can only be accessed remotely.
  • Cluster-side task run log persistence: As demonstrated in the example where task 0's logs are no longer available after completion, we need to implement proper log retention for completed tasks.

The current behavior is demonstrated in the attached logs, showing:

Output
andyl@DESKTOP-7FP6SMO ~/skypilot (dag-execute)> sky jobs launch ./examples/dag/diamond.yml  -y --cloud gcp    (skypilot) 
Task from YAML spec: ./examples/dag/diamond.yml
WARNING: override params {'cloud': GCP} are ignored, since the yaml file contains multiple tasks.
Managed job 'pipeline' will be launched on (estimated):
Task 'data-processing' requires AWS which is not enabled. To enable access, change the task cloud requirement or run: sky check aws
Task 'infer_A' requires AWS which is not enabled. To enable access, change the task cloud requirement or run: sky check aws
Best plan: 
----------------------------------------------------------------------------------------------------------------------------
 TASK              #NODES   CLOUD        INSTANCE              vCPUs   Mem(GB)   ACCELERATORS   REGION/ZONE                 
----------------------------------------------------------------------------------------------------------------------------
 data-processing   1        GCP          n2-standard-2[Spot]   2       8         -              northamerica-northeast2-c   
 infer_A           1        GCP          n2-standard-2         2       8         -              us-central1-a               
 infer_B           1        Kubernetes   2CPU--2GB             2       2         -              kind-skypilot               
 eval_A_B          1        Kubernetes   2CPU--2GB             2       2         -              kind-skypilot               
----------------------------------------------------------------------------------------------------------------------------
Considered resources for task 'data-processing' (1 node):
----------------------------------------------------------------------------------------------------------------
 CLOUD   INSTANCE              vCPUs   Mem(GB)   ACCELERATORS   REGION/ZONE                 COST ($)   CHOSEN   
----------------------------------------------------------------------------------------------------------------
 GCP     n2-standard-2[Spot]   2       8         -              northamerica-northeast2-c   0.01          ✔     
----------------------------------------------------------------------------------------------------------------
Considered resources for task 'infer_A' (1 node):
----------------------------------------------------------------------------------------------
 CLOUD   INSTANCE        vCPUs   Mem(GB)   ACCELERATORS   REGION/ZONE     COST ($)   CHOSEN   
----------------------------------------------------------------------------------------------
 GCP     n2-standard-2   2       8         -              us-central1-a   0.10          ✔     
----------------------------------------------------------------------------------------------
Considered resources for task 'infer_B' (1 node):
---------------------------------------------------------------------------------------------------
 CLOUD        INSTANCE        vCPUs   Mem(GB)   ACCELERATORS   REGION/ZONE     COST ($)   CHOSEN   
---------------------------------------------------------------------------------------------------
 Kubernetes   2CPU--2GB       2       2         -              kind-skypilot   0.00          ✔     
 GCP          n2-standard-2   2       8         -              us-central1-a   0.10                
---------------------------------------------------------------------------------------------------
Considered resources for task 'eval_A_B' (1 node):
---------------------------------------------------------------------------------------------------
 CLOUD        INSTANCE        vCPUs   Mem(GB)   ACCELERATORS   REGION/ZONE     COST ($)   CHOSEN   
---------------------------------------------------------------------------------------------------
 Kubernetes   2CPU--2GB       2       2         -              kind-skypilot   0.00          ✔     
 GCP          n2-standard-2   2       8         -              us-central1-a   0.10                
---------------------------------------------------------------------------------------------------
Launching managed job 'pipeline' from jobs controller...
⚙︎ Launching managed jobs controller on Kubernetes.
└── Pod is up.
✓ Cluster launched: sky-jobs-controller-6eabc0cb.  View logs at: ~/sky_logs/sky-2024-10-27-21-14-08-796591/provision.log
⚙︎ Mounting files.
  Syncing (to 1 node): /tmp/managed-dag-pipeline-81wrv4dy -> ~/.sky/managed_jobs/pipeline-ba6f.yaml
  Syncing (to 1 node): /tmp/tmpnk_zyq_5 -> ~/.sky/managed_jobs/pipeline-ba6f.config_yaml
✓ Files synced.  View logs at: ~/sky_logs/sky-2024-10-27-21-14-08-796591/file_mounts.log
⚙︎ Running setup on managed jobs controller.
  Check & install cloud dependencies on controller: done.                   
✓ Setup completed.  View logs at: ~/sky_logs/sky-2024-10-27-21-14-08-796591/setup-*.log
Auto-stop is not supported for Kubernetes and RunPod clusters. Skipping.
⚙︎ Job submitted, ID: 4
├── To stream job logs:                 sky jobs logs 4 --controller or sky jobs logs 4 --task-id task_id

📋 Useful Commands
Managed Job ID: 4
├── To cancel the job: sky jobs cancel 4
├── To stream job logs: sky jobs logs 4
├── To stream controller logs: sky jobs logs --controller 4
├── To view all managed jobs: sky jobs queue
└── To view managed job dashboard: sky jobs dashboard

Controller Logs
andyl@DESKTOP-7FP6SMO ~/skypilot (dag-execute)> sky jobs logs 4 --controller    (skypilot) 
├── Waiting for task resources on 1 node.
└── Job started. Streaming logs... (Ctrl-C to exit log streaming; job will not be killed)
(pipeline, pid=2474641) I 10-28 04:14:45 controller.py:63] DAG(pipeline: data-processing(infer_B,infer_A) infer_A(eval_A_B) infer_B(eval_A_B) eval_A_B(-))
(pipeline, pid=2474641) I 10-28 04:14:45 controller.py:442] Task 0 is submitted to run. To prevent from interleaving, the launch logs are redirected to ~/sky_logs/sky-2024-10-28-04-14-45-846061/task_0_launch.log
(pipeline, pid=2474641) I 10-28 04:17:33 controller.py:457] Task 0 completed.
(pipeline, pid=2474641) I 10-28 04:17:33 controller.py:400] Task 0 completed with result: True
(pipeline, pid=2474641) I 10-28 04:17:33 controller.py:442] Task 2 is submitted to run. To prevent from interleaving, the launch logs are redirected to ~/sky_logs/sky-2024-10-28-04-14-45-846061/task_2_launch.log
(pipeline, pid=2474641) I 10-28 04:17:33 controller.py:442] Task 1 is submitted to run. To prevent from interleaving, the launch logs are redirected to ~/sky_logs/sky-2024-10-28-04-14-45-846061/task_1_launch.log
(pipeline, pid=2474641) I 10-28 04:19:35 controller.py:457] Task 2 completed.
(pipeline, pid=2474641) I 10-28 04:19:35 controller.py:400] Task 2 completed with result: True
(pipeline, pid=2474641) I 10-28 04:20:06 controller.py:457] Task 1 completed.
(pipeline, pid=2474641) I 10-28 04:20:06 controller.py:400] Task 1 completed with result: True
(pipeline, pid=2474641) I 10-28 04:20:06 controller.py:442] Task 3 is submitted to run. To prevent from interleaving, the launch logs are redirected to ~/sky_logs/sky-2024-10-28-04-14-45-846061/task_3_launch.log
(pipeline, pid=2474641) I 10-28 04:22:06 controller.py:457] Task 3 completed.
(pipeline, pid=2474641) I 10-28 04:22:06 controller.py:400] Task 3 completed with result: True
(pipeline, pid=2474641) I 10-28 04:22:07 controller.py:570] Killing controller process 2511381.
(pipeline, pid=2474641) I 10-28 04:22:07 controller.py:578] Controller process 2511381 killed.
(pipeline, pid=2474641) I 10-28 04:22:07 controller.py:580] Cleaning up any cluster for job 4.
(pipeline, pid=2474641) I 10-28 04:22:07 controller.py:589] Cluster of managed job 4 has been cleaned up.
✓ Job finished (status: SUCCEEDED).

But for now things like ~/sky_logs/sky-2024-10-28-04-14-45-846061/task_3_launch.log are all unreachable. They're only on controller machine.

Cluster's Run Log
andyl@DESKTOP-7FP6SMO ~/skypilot (dag-execute)> sky jobs logs 4 --task-id 1                                                               (skypilot) 
├── Waiting for task resources on 1 node.
└── Job started. Streaming logs... (Ctrl-C to exit log streaming; job will not be killed)
(setup pid=2321) setup for infer_A
(infer_A, pid=2321) infer_A starts
(infer_A, pid=2321) infer_A ends
✓ Job finished (status: SUCCEEDED).
Shared connection to 34.27.213.54 closed.
✓ Managed job finished: 4 (status: PENDING).

andyl@DESKTOP-7FP6SMO ~/skypilot (dag-execute)> sky jobs logs 4 --task-id 0 (skypilot)
✓ Managed job finished: 4 (status: STARTING).

From here we can see the necessity of the second TODO, after task 0 completed, the log was not reserved anymore.

@cblmemo PTAL at these remaining issues.

@cblmemo
Copy link
Collaborator

cblmemo commented Oct 28, 2024

For logs like ~/sky_logs/sky-2024-10-28-04-14-45-846061/task_3_launch.log, we can add some command to the controller log. e.g.

andyl@DESKTOP-7FP6SMO ~/skypilot (dag-execute)> sky jobs logs 4 --controller    (skypilot) 
├── Waiting for task resources on 1 node.
└── Job started. Streaming logs... (Ctrl-C to exit log streaming; job will not be killed)
(pipeline, pid=2474641) I 10-28 04:14:45 controller.py:63] DAG(pipeline: data-processing(infer_B,infer_A) infer_A(eval_A_B) infer_B(eval_A_B) eval_A_B(-))
(pipeline, pid=2474641) I 10-28 04:14:45 controller.py:442] Task 0 is submitted to run. To see logs: sky jobs logs 4 --task-id 0 <<<<<<<<< NOTICE HERE
(pipeline, pid=2474641) I 10-28 04:17:33 controller.py:457] Task 0 completed.
(pipeline, pid=2474641) I 10-28 04:17:33 controller.py:400] Task 0 completed with result: True

@cblmemo
Copy link
Collaborator

cblmemo commented Oct 28, 2024

From here we can see the necessity of the second TODO, after task 0 completed, the log was not reserved anymore.

We can download logs before terminating the cluster. Reference:

if sync_down_logs:
_download_and_stream_logs(info)
logger.info(f'preempted: {info.status_property.preempted}, '
f'replica_id: {replica_id}')
p = multiprocessing.Process(
target=ux_utils.RedirectOutputForProcess(terminate_cluster,
log_file_name, 'a').run,
args=(info.cluster_name, replica_drain_delay_seconds),
)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Jobs] Parallel execution for DAG
2 participants