diff --git a/README.md b/README.md index 9fe7c07..90a2abf 100644 --- a/README.md +++ b/README.md @@ -3,10 +3,10 @@ Backend for executing MLFlow projects on Slurm batch system ## Usage Install this package in the environment from which you will be submitting jobs. -If you are submitting jobs from inside jobs, make sure you have this package +If you are submitting jobs from inside jobs, make sure you have this package listed in your conda or pip environment. -Just list this as your `--backend` in the job run. You should include a json +Just list this as your `--backend` in the job run. You should include a json config file to control how the batch script is constructed: ```shell mlflow run --backend slurm \ @@ -34,7 +34,20 @@ properties in this file are: | time | Max CPU time job may run | | sbatch-script-file | Name of batch file to be produced. Leave blank to have service generate a script file name based on the run ID | +## Sequential Worker Jobs +There are occaisions where you have a job that can't finish in the maxiumum +allowable wall time. If you are able to write out a checkpoint file, you can +use sequential worker jobs to continue the job where it left off. This is +useful for training deep learning models or other long running jobs. + +To use this, you just need to provide a parameter to the `mlflow run` command +```shell + mlflow run --backend slurm -c ../../slurm_config.json -P sequential_workers=3 . +``` +This will the submit the job as normal, but also submit 3 additional jobs that +each depend on the previous job. As soon as the first job terminates, the next +job will start. This will continue until all jobs have completed. + ## Development The slurm docker deployment is handy for testing and development. You can start up a slurm environment with the included docker-compose file - diff --git a/examples/sequential/MLProject b/examples/sequential/MLProject new file mode 100644 index 0000000..bce21d1 --- /dev/null +++ b/examples/sequential/MLProject @@ -0,0 +1,5 @@ +python_env: python_env.yaml + +entry_points: + main: + command: "./demo.sh" diff --git a/examples/sequential/README.md b/examples/sequential/README.md new file mode 100644 index 0000000..be7f05e --- /dev/null +++ b/examples/sequential/README.md @@ -0,0 +1,11 @@ +# Example of Sequential Worker Jobs +This simple example shows how to use the `sequential_workers` parameter to submit a job that will be split into multiple jobs that depend on each other. + +```shell +mlflow run --backend slurm -c ../../slurm_config.json -P sequential_workers=3 . +``` + +Each job appends to a file named `restart.log` with the time the job is run. +MLFlow will submit three jobs that depend on each other. As soon as the first job terminates, the next job will start. This will continue until all jobs have completed. + +When the jobs are complete, you can check the `restart.log` file to see the order in which the jobs were run. diff --git a/examples/sequential/demo.sh b/examples/sequential/demo.sh new file mode 100755 index 0000000..645dfca --- /dev/null +++ b/examples/sequential/demo.sh @@ -0,0 +1,5 @@ +#!/bin/bash +echo "Hello World" +sleep 2 +date >> restart.log +exit 0 diff --git a/examples/sequential/python_env.yaml b/examples/sequential/python_env.yaml new file mode 100644 index 0000000..5106e59 --- /dev/null +++ b/examples/sequential/python_env.yaml @@ -0,0 +1,5 @@ +# Python version required to run the project. +python: "3.10" +# Dependencies required to run the project. +dependencies: + - mlflow diff --git a/mlflow_slurm/slurm_backend.py b/mlflow_slurm/slurm_backend.py index 6bb85a2..83a965c 100644 --- a/mlflow_slurm/slurm_backend.py +++ b/mlflow_slurm/slurm_backend.py @@ -44,10 +44,10 @@ class SlurmSubmittedRun(SubmittedRun): :param mlflow_run_id: ID of the MLflow project run. """ - def __init__(self, mlflow_run_id: str, slurm_job_id: str) -> None: + def __init__(self, mlflow_run_id: str, slurm_job_ids: List[str]) -> None: super().__init__() self._mlflow_run_id = mlflow_run_id - self.slurm_job_id = slurm_job_id + self.slurm_job_ids = slurm_job_ids self._status = RunStatus.SCHEDULED self._status_lock = RLock() @@ -58,6 +58,13 @@ def __init__(self, mlflow_run_id: str, slurm_job_id: str) -> None: def run_id(self) -> str: return self._mlflow_run_id + @property + def job_id(self) -> str: + """ + :return: The final Slurm Job ID of the submitted job list. + """ + return self.slurm_job_ids[-1] + def is_terminated_or_gone(self): self._update_status() return not self._status or RunStatus.is_terminated(self._status) @@ -119,14 +126,20 @@ def _update_status(self) -> RunStatus: class SlurmProjectBackend(AbstractBackend): @staticmethod - def sbatch(script: str) -> str: + def sbatch(script: str, previous_job: str="") -> str: """ Submit a script to the slurm batch manager :param script: The filename of the script :return: The Slurm Job ID or None of the submit fails """ job_re = "Submitted batch job (\\d+)" - with subprocess.Popen(f"sbatch {script}", + + # If there is a previous job in a set of sequential workers, we want to + # wait for it to finish before starting this one + sbatch_command = f"sbatch {script}" if not previous_job \ + else f"sbatch --dependency=afterok:{previous_job} {script}" + + with subprocess.Popen(sbatch_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) as p: return_code = p.wait() @@ -148,6 +161,7 @@ def run(self, project_uri: str, entry_point: str, params: dict, version: str, backend_config: dict, tracking_uri: str, experiment_id: str) -> SlurmSubmittedRun: + print(f"Ready to submit with params: {params}") work_dir = fetch_and_validate_project(project_uri, version, entry_point, params) active_run = get_or_create_run(None, project_uri, experiment_id, work_dir, version, @@ -204,11 +218,17 @@ def run(self, project_uri: str, entry_point: str, params: dict, version: str, generate_sbatch_script(command_str, backend_config, active_run.info.run_id, sbatch_file) - job_id = SlurmProjectBackend.sbatch(sbatch_file) - MlflowClient().set_tag(active_run.info.run_id, "slurm_job_id", job_id) + previous_job = "" + job_ids = [] + for worker in range(int(params.get("sequential_workers", 1))): + job_id = SlurmProjectBackend.sbatch(sbatch_file, previous_job) + job_ids.append(job_id) + previous_job = job_id + + MlflowClient().set_tag(active_run.info.run_id, "slurm_job_id", job_ids) _logger.info(f"slurm job id={job_id}") - return SlurmSubmittedRun(active_run.info.run_id, job_id) + return SlurmSubmittedRun(active_run.info.run_id, job_ids) def __init__(self): pass