From b0aaec3ac1fdf02b155b190087282d881fe02b97 Mon Sep 17 00:00:00 2001 From: Ben Galewsky Date: Thu, 6 Jun 2024 17:26:34 -0500 Subject: [PATCH] Add ability to submit multiple sequential worker jobs. By setting a Run parameter of sequential_workers we will submit a series of worker jobs where each depends on the previous job's completion. This is only really useful if each job persists the current state of the overall workflow so the next job can pick up where the prevous job left off. --- README.md | 19 +++++++++++++--- examples/sequential/MLProject | 5 +++++ examples/sequential/README.md | 11 ++++++++++ examples/sequential/demo.sh | 5 +++++ examples/sequential/python_env.yaml | 5 +++++ mlflow_slurm/slurm_backend.py | 34 +++++++++++++++++++++++------ 6 files changed, 69 insertions(+), 10 deletions(-) create mode 100644 examples/sequential/MLProject create mode 100644 examples/sequential/README.md create mode 100755 examples/sequential/demo.sh create mode 100644 examples/sequential/python_env.yaml diff --git a/README.md b/README.md index 9fe7c07..90a2abf 100644 --- a/README.md +++ b/README.md @@ -3,10 +3,10 @@ Backend for executing MLFlow projects on Slurm batch system ## Usage Install this package in the environment from which you will be submitting jobs. -If you are submitting jobs from inside jobs, make sure you have this package +If you are submitting jobs from inside jobs, make sure you have this package listed in your conda or pip environment. -Just list this as your `--backend` in the job run. You should include a json +Just list this as your `--backend` in the job run. You should include a json config file to control how the batch script is constructed: ```shell mlflow run --backend slurm \ @@ -34,7 +34,20 @@ properties in this file are: | time | Max CPU time job may run | | sbatch-script-file | Name of batch file to be produced. Leave blank to have service generate a script file name based on the run ID | +## Sequential Worker Jobs +There are occaisions where you have a job that can't finish in the maxiumum +allowable wall time. If you are able to write out a checkpoint file, you can +use sequential worker jobs to continue the job where it left off. This is +useful for training deep learning models or other long running jobs. + +To use this, you just need to provide a parameter to the `mlflow run` command +```shell + mlflow run --backend slurm -c ../../slurm_config.json -P sequential_workers=3 . +``` +This will the submit the job as normal, but also submit 3 additional jobs that +each depend on the previous job. As soon as the first job terminates, the next +job will start. This will continue until all jobs have completed. + ## Development The slurm docker deployment is handy for testing and development. You can start up a slurm environment with the included docker-compose file - diff --git a/examples/sequential/MLProject b/examples/sequential/MLProject new file mode 100644 index 0000000..bce21d1 --- /dev/null +++ b/examples/sequential/MLProject @@ -0,0 +1,5 @@ +python_env: python_env.yaml + +entry_points: + main: + command: "./demo.sh" diff --git a/examples/sequential/README.md b/examples/sequential/README.md new file mode 100644 index 0000000..be7f05e --- /dev/null +++ b/examples/sequential/README.md @@ -0,0 +1,11 @@ +# Example of Sequential Worker Jobs +This simple example shows how to use the `sequential_workers` parameter to submit a job that will be split into multiple jobs that depend on each other. + +```shell +mlflow run --backend slurm -c ../../slurm_config.json -P sequential_workers=3 . +``` + +Each job appends to a file named `restart.log` with the time the job is run. +MLFlow will submit three jobs that depend on each other. As soon as the first job terminates, the next job will start. This will continue until all jobs have completed. + +When the jobs are complete, you can check the `restart.log` file to see the order in which the jobs were run. diff --git a/examples/sequential/demo.sh b/examples/sequential/demo.sh new file mode 100755 index 0000000..645dfca --- /dev/null +++ b/examples/sequential/demo.sh @@ -0,0 +1,5 @@ +#!/bin/bash +echo "Hello World" +sleep 2 +date >> restart.log +exit 0 diff --git a/examples/sequential/python_env.yaml b/examples/sequential/python_env.yaml new file mode 100644 index 0000000..5106e59 --- /dev/null +++ b/examples/sequential/python_env.yaml @@ -0,0 +1,5 @@ +# Python version required to run the project. +python: "3.10" +# Dependencies required to run the project. +dependencies: + - mlflow diff --git a/mlflow_slurm/slurm_backend.py b/mlflow_slurm/slurm_backend.py index 6bb85a2..83a965c 100644 --- a/mlflow_slurm/slurm_backend.py +++ b/mlflow_slurm/slurm_backend.py @@ -44,10 +44,10 @@ class SlurmSubmittedRun(SubmittedRun): :param mlflow_run_id: ID of the MLflow project run. """ - def __init__(self, mlflow_run_id: str, slurm_job_id: str) -> None: + def __init__(self, mlflow_run_id: str, slurm_job_ids: List[str]) -> None: super().__init__() self._mlflow_run_id = mlflow_run_id - self.slurm_job_id = slurm_job_id + self.slurm_job_ids = slurm_job_ids self._status = RunStatus.SCHEDULED self._status_lock = RLock() @@ -58,6 +58,13 @@ def __init__(self, mlflow_run_id: str, slurm_job_id: str) -> None: def run_id(self) -> str: return self._mlflow_run_id + @property + def job_id(self) -> str: + """ + :return: The final Slurm Job ID of the submitted job list. + """ + return self.slurm_job_ids[-1] + def is_terminated_or_gone(self): self._update_status() return not self._status or RunStatus.is_terminated(self._status) @@ -119,14 +126,20 @@ def _update_status(self) -> RunStatus: class SlurmProjectBackend(AbstractBackend): @staticmethod - def sbatch(script: str) -> str: + def sbatch(script: str, previous_job: str="") -> str: """ Submit a script to the slurm batch manager :param script: The filename of the script :return: The Slurm Job ID or None of the submit fails """ job_re = "Submitted batch job (\\d+)" - with subprocess.Popen(f"sbatch {script}", + + # If there is a previous job in a set of sequential workers, we want to + # wait for it to finish before starting this one + sbatch_command = f"sbatch {script}" if not previous_job \ + else f"sbatch --dependency=afterok:{previous_job} {script}" + + with subprocess.Popen(sbatch_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) as p: return_code = p.wait() @@ -148,6 +161,7 @@ def run(self, project_uri: str, entry_point: str, params: dict, version: str, backend_config: dict, tracking_uri: str, experiment_id: str) -> SlurmSubmittedRun: + print(f"Ready to submit with params: {params}") work_dir = fetch_and_validate_project(project_uri, version, entry_point, params) active_run = get_or_create_run(None, project_uri, experiment_id, work_dir, version, @@ -204,11 +218,17 @@ def run(self, project_uri: str, entry_point: str, params: dict, version: str, generate_sbatch_script(command_str, backend_config, active_run.info.run_id, sbatch_file) - job_id = SlurmProjectBackend.sbatch(sbatch_file) - MlflowClient().set_tag(active_run.info.run_id, "slurm_job_id", job_id) + previous_job = "" + job_ids = [] + for worker in range(int(params.get("sequential_workers", 1))): + job_id = SlurmProjectBackend.sbatch(sbatch_file, previous_job) + job_ids.append(job_id) + previous_job = job_id + + MlflowClient().set_tag(active_run.info.run_id, "slurm_job_id", job_ids) _logger.info(f"slurm job id={job_id}") - return SlurmSubmittedRun(active_run.info.run_id, job_id) + return SlurmSubmittedRun(active_run.info.run_id, job_ids) def __init__(self): pass