Skip to content

Commit

Permalink
some notes on adaptive mode and errors
Browse files Browse the repository at this point in the history
  • Loading branch information
tgalvin committed Nov 23, 2023
1 parent d0f95e4 commit d18ba43
Showing 1 changed file with 38 additions and 0 deletions.
38 changes: 38 additions & 0 deletions docs/source/start.rst
Original file line number Diff line number Diff line change
Expand Up @@ -223,3 +223,41 @@ To set up a Prefect Server, fist install Prefect with `pip`. You will also need
then
prefect server start --host 0.0.0.0
fi
Tips on adaptive scaling:
========================

There can be strange failure modes when a prefect based workflow is being executed on a Dask task runner on a `dask_jobqueue.SLURMCluster` object with adaptive scaling enabled. Commonly, this presents as a previously completed taskrun restarting. Depending on the actual workflow, this may outright fail (e.g. if a data product that is expected has been removed), or may run perfectly fine (e.g. wsclean clobbering existing files and reimaging). Naturally, this is not behaviour that should be encouraged.

It appears as those the issue is related job stealing among a dask workers established in an adaptive scaling mode. The error mode is not entirely clear, but as workers are started (or shutdown, whether in a controlled manner or by SLURM itself restarting the node) the dask scheduler will attempt to rebalance work. For whatever reason, keys representing tasks are marked as needing to be repeated (perhaps as an attempt by dask believing it needs to recover data that was not persistent?) and are rescheduled.

The dask environment variables below are intended to try to limit these failure modes. These should be exported in the `sbatch` launch script before the python prefect / dask entry point.

.. code-block:: bash
# See https://docs.dask.org/en/latest/configuration.html#distributed-scheduler
# For more information on these variables
export DASK_DISTRIBUTED__SCHEDULER__WORKER_SATURATION=0.01
export DASK_DISTRIBUTED__SCHEDULER__WORK_STEALING=True
export DASK_DISTRIBUTED__SCHEDULER__WORK_STEALING_INTERVAL="120s"
export DASK_DISTRIBUTED__SCHEDULER__WORKER_TTL="3600s"
export DASK_DISTRIBUTED__SCHEDULER__ALLOWED_FAILURES=100
export DASK_DISTRIBUTED__WORKER__PROFILE__INTERVAL="10000ms"
export DASK_DISTRIBUTED__WORKER__PROFILE__CYCLE="1000000ms"
export DASK_DISTRIBUTED__COMM__TIMEOUTS__CONNECT="300s"
export DASK_DISTRIBUTED__COMM__SOCKET_BACKLOG="16384"
export DASK_DISTRIBUTED__COMM__TIMEOUTS__TCP="300s"
export DASK_DISTRIBUTED__COMM__RETRY__COUNT=12
Additionally, these should be provided to the `.adapt` call that would automatically scale the dask cluster (whether in code directory or through an appropriate cluster YAML definition file).

.. code-block:: python
{
minimum: 2,
maximum: 36,
wait_count: 20,
target_interval: "300s",
interval: "30s"
}

0 comments on commit d18ba43

Please sign in to comment.