Skip to content

Commit

Permalink
Add ability to submit multiple sequential worker jobs.
Browse files Browse the repository at this point in the history
By setting a Run parameter of sequential_workers we will submit a series of
worker jobs where each depends on the previous job's completion. This is
only really useful if each job persists the current state of the overall
workflow so the next job can pick up where the prevous job left off.
  • Loading branch information
BenGalewsky committed Jun 6, 2024
1 parent ca3ded0 commit b0aaec3
Showing 6 changed files with 69 additions and 10 deletions.
19 changes: 16 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
@@ -3,10 +3,10 @@ Backend for executing MLFlow projects on Slurm batch system

## Usage
Install this package in the environment from which you will be submitting jobs.
If you are submitting jobs from inside jobs, make sure you have this package
If you are submitting jobs from inside jobs, make sure you have this package
listed in your conda or pip environment.

Just list this as your `--backend` in the job run. You should include a json
Just list this as your `--backend` in the job run. You should include a json
config file to control how the batch script is constructed:
```shell
mlflow run --backend slurm \
@@ -34,7 +34,20 @@ properties in this file are:
| time | Max CPU time job may run |
| sbatch-script-file | Name of batch file to be produced. Leave blank to have service generate a script file name based on the run ID |

## Sequential Worker Jobs
There are occaisions where you have a job that can't finish in the maxiumum
allowable wall time. If you are able to write out a checkpoint file, you can
use sequential worker jobs to continue the job where it left off. This is
useful for training deep learning models or other long running jobs.

To use this, you just need to provide a parameter to the `mlflow run` command
```shell
mlflow run --backend slurm -c ../../slurm_config.json -P sequential_workers=3 .
```
This will the submit the job as normal, but also submit 3 additional jobs that
each depend on the previous job. As soon as the first job terminates, the next
job will start. This will continue until all jobs have completed.

## Development
The slurm docker deployment is handy for testing and development. You can start
up a slurm environment with the included docker-compose file

5 changes: 5 additions & 0 deletions examples/sequential/MLProject
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
python_env: python_env.yaml

entry_points:
main:
command: "./demo.sh"
11 changes: 11 additions & 0 deletions examples/sequential/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# Example of Sequential Worker Jobs
This simple example shows how to use the `sequential_workers` parameter to submit a job that will be split into multiple jobs that depend on each other.

```shell
mlflow run --backend slurm -c ../../slurm_config.json -P sequential_workers=3 .
```

Each job appends to a file named `restart.log` with the time the job is run.
MLFlow will submit three jobs that depend on each other. As soon as the first job terminates, the next job will start. This will continue until all jobs have completed.

When the jobs are complete, you can check the `restart.log` file to see the order in which the jobs were run.
5 changes: 5 additions & 0 deletions examples/sequential/demo.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
#!/bin/bash
echo "Hello World"
sleep 2
date >> restart.log
exit 0
5 changes: 5 additions & 0 deletions examples/sequential/python_env.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Python version required to run the project.
python: "3.10"
# Dependencies required to run the project.
dependencies:
- mlflow
34 changes: 27 additions & 7 deletions mlflow_slurm/slurm_backend.py
Original file line number Diff line number Diff line change
@@ -44,10 +44,10 @@ class SlurmSubmittedRun(SubmittedRun):
:param mlflow_run_id: ID of the MLflow project run.
"""

def __init__(self, mlflow_run_id: str, slurm_job_id: str) -> None:
def __init__(self, mlflow_run_id: str, slurm_job_ids: List[str]) -> None:
super().__init__()
self._mlflow_run_id = mlflow_run_id
self.slurm_job_id = slurm_job_id
self.slurm_job_ids = slurm_job_ids
self._status = RunStatus.SCHEDULED
self._status_lock = RLock()

@@ -58,6 +58,13 @@ def __init__(self, mlflow_run_id: str, slurm_job_id: str) -> None:
def run_id(self) -> str:
return self._mlflow_run_id

@property
def job_id(self) -> str:
"""
:return: The final Slurm Job ID of the submitted job list.
"""
return self.slurm_job_ids[-1]

def is_terminated_or_gone(self):
self._update_status()
return not self._status or RunStatus.is_terminated(self._status)
@@ -119,14 +126,20 @@ def _update_status(self) -> RunStatus:

class SlurmProjectBackend(AbstractBackend):
@staticmethod
def sbatch(script: str) -> str:
def sbatch(script: str, previous_job: str="") -> str:
"""
Submit a script to the slurm batch manager
:param script: The filename of the script
:return: The Slurm Job ID or None of the submit fails
"""
job_re = "Submitted batch job (\\d+)"
with subprocess.Popen(f"sbatch {script}",

# If there is a previous job in a set of sequential workers, we want to
# wait for it to finish before starting this one
sbatch_command = f"sbatch {script}" if not previous_job \
else f"sbatch --dependency=afterok:{previous_job} {script}"

with subprocess.Popen(sbatch_command,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE, shell=True) as p:
return_code = p.wait()
@@ -148,6 +161,7 @@ def run(self, project_uri: str, entry_point: str, params: dict, version: str,
backend_config: dict, tracking_uri: str,
experiment_id: str) -> SlurmSubmittedRun:

print(f"Ready to submit with params: {params}")
work_dir = fetch_and_validate_project(project_uri, version, entry_point, params)
active_run = get_or_create_run(None, project_uri, experiment_id, work_dir,
version,
@@ -204,11 +218,17 @@ def run(self, project_uri: str, entry_point: str, params: dict, version: str,
generate_sbatch_script(command_str, backend_config, active_run.info.run_id,
sbatch_file)

job_id = SlurmProjectBackend.sbatch(sbatch_file)
MlflowClient().set_tag(active_run.info.run_id, "slurm_job_id", job_id)
previous_job = ""
job_ids = []
for worker in range(int(params.get("sequential_workers", 1))):
job_id = SlurmProjectBackend.sbatch(sbatch_file, previous_job)
job_ids.append(job_id)
previous_job = job_id

MlflowClient().set_tag(active_run.info.run_id, "slurm_job_id", job_ids)
_logger.info(f"slurm job id={job_id}")

return SlurmSubmittedRun(active_run.info.run_id, job_id)
return SlurmSubmittedRun(active_run.info.run_id, job_ids)

def __init__(self):
pass

0 comments on commit b0aaec3

Please sign in to comment.