Skip to content

Commit

Permalink
Fix pre-commit
Browse files Browse the repository at this point in the history
  • Loading branch information
AlecThomson committed Nov 28, 2023
1 parent 7f4b94f commit 41b26e5
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 25 deletions.
18 changes: 9 additions & 9 deletions docs/source/start.rst
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ The pipeline is run using the `Prefect <https://docs.prefect.io/core/>`_ workflo
To set up a Prefect Server, fist install Prefect with `pip`. You will also need Postgres installed on the server to store the Prefect data. We recommend using Singularity to install and run. Below we provide two scripts for starting the Postgres and Prefect server on a remote machine:

.. tip::
The default configuration connection settings of the Prefect and Postgres services may not be appropriate if you expect to run many workers concurrently. If the services are being overwhelmed you will see API and database time outs in the logs of these services. To avoid this it is recommended to update the `max_connection` and `shared_buffers` properties of Postgres, and set `WEB_CONCURRENCY` to enable multiple `uvicorn` workers.
The default configuration connection settings of the Prefect and Postgres services may not be appropriate if you expect to run many workers concurrently. If the services are being overwhelmed you will see API and database time outs in the logs of these services. To avoid this it is recommended to update the `max_connection` and `shared_buffers` properties of Postgres, and set `WEB_CONCURRENCY` to enable multiple `uvicorn` workers.

.. tip::
In each of the scripts below, you will need to set the password for the Postgres database. You can do this by setting the environment variable ``POSTGRES_PASS``. You will also need toset the hostname of the machine running the database with ``POSTGRES_ADDR``.
Expand Down Expand Up @@ -148,7 +148,7 @@ To set up a Prefect Server, fist install Prefect with `pip`. You will also need
export POSTGRES_USER='postgres'
export POSTGRES_DB=orion
export POSTGRES_SCRATCH=$(realpath $(pwd))
if [[ $START_POSTGRES -eq 0 ]]
then
# Need singulaity, and to remove the badness of pawsey
Expand Down Expand Up @@ -205,12 +205,12 @@ To set up a Prefect Server, fist install Prefect with `pip`. You will also need
export POSTGRES_USER='postgres'
export POSTGRES_DB=orion
export POSTGRES_SCRATCH=$(pwd)
export PREFECT_API_URL="http://${POSTGRES_ADDR}:4200/api"
export PREFECT_SERVER_API_HOST="127.0.0.1"
export PREFECT_API_DATABASE_CONNECTION_URL="postgresql+asyncpg://$POSTGRES_USER:$POSTGRES_PASS@$POSTGRES_ADDR:5432/$POSTGRES_DB"
# This establishes a larger number of workers for prefect on the webserver (uvicorn under the hood)
export WEB_CONCURRENCY=16
# These can be tweaked to allow for more persistent data connections
Expand All @@ -228,11 +228,11 @@ To set up a Prefect Server, fist install Prefect with `pip`. You will also need
Tips on adaptive scaling:
========================

There can be strange failure modes when a prefect based workflow is being executed on a Dask task runner on a `dask_jobqueue.SLURMCluster` object with adaptive scaling enabled. Commonly, this presents as a previously completed taskrun restarting. Depending on the actual workflow, this may outright fail (e.g. if a data product that is expected has been removed), or may run perfectly fine (e.g. wsclean clobbering existing files and reimaging). Naturally, this is not behaviour that should be encouraged.
There can be strange failure modes when a prefect based workflow is being executed on a Dask task runner on a `dask_jobqueue.SLURMCluster` object with adaptive scaling enabled. Commonly, this presents as a previously completed taskrun restarting. Depending on the actual workflow, this may outright fail (e.g. if a data product that is expected has been removed), or may run perfectly fine (e.g. wsclean clobbering existing files and reimaging). Naturally, this is not behaviour that should be encouraged.

It appears as those the issue is related job stealing among a dask workers established in an adaptive scaling mode. The error mode is not entirely clear, but as workers are started (or shutdown, whether in a controlled manner or by SLURM itself restarting the node) the dask scheduler will attempt to rebalance work. For whatever reason, keys representing tasks are marked as needing to be repeated (perhaps as an attempt by dask believing it needs to recover data that was not persistent?) and are rescheduled.
It appears as those the issue is related job stealing among a dask workers established in an adaptive scaling mode. The error mode is not entirely clear, but as workers are started (or shutdown, whether in a controlled manner or by SLURM itself restarting the node) the dask scheduler will attempt to rebalance work. For whatever reason, keys representing tasks are marked as needing to be repeated (perhaps as an attempt by dask believing it needs to recover data that was not persistent?) and are rescheduled.

The dask environment variables below are intended to try to limit these failure modes. These should be exported in the `sbatch` launch script before the python prefect / dask entry point.
The dask environment variables below are intended to try to limit these failure modes. These should be exported in the `sbatch` launch script before the python prefect / dask entry point.

.. code-block:: bash
# See https://docs.dask.org/en/latest/configuration.html#distributed-scheduler
Expand All @@ -250,10 +250,10 @@ The dask environment variables below are intended to try to limit these failure
export DASK_DISTRIBUTED__COMM__RETRY__COUNT=12
Additionally, these should be provided to the `.adapt` call that would automatically scale the dask cluster (whether in code directory or through an appropriate cluster YAML definition file).
Additionally, these should be provided to the `.adapt` call that would automatically scale the dask cluster (whether in code directory or through an appropriate cluster YAML definition file).

.. code-block:: python
{
minimum: 2,
maximum: 36,
Expand Down
2 changes: 1 addition & 1 deletion licenses/LICENSE_distributed.txt
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,4 @@ DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
30 changes: 15 additions & 15 deletions submit/test_image.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,24 @@
#!/usr/bin/env python3
#SBATCH --output=/scratch2/tho822/spiceracs/pipe_test/test_image_%j.log
#SBATCH --error=/scratch2/tho822/spiceracs/pipe_test/test_image_%j.log
#SBATCH --time=1-00:00:00
#SBATCH --tasks=1
#SBATCH --cpus-per-task=1
#SBATCH --account=OD-217087
#SBATCH --qos=express
# SBATCH --output=/scratch2/tho822/spiceracs/pipe_test/test_image_%j.log
# SBATCH --error=/scratch2/tho822/spiceracs/pipe_test/test_image_%j.log
# SBATCH --time=1-00:00:00
# SBATCH --tasks=1
# SBATCH --cpus-per-task=1
# SBATCH --account=OD-217087
# SBATCH --qos=express

import logging
from pathlib import Path

import yaml
from astropy import units as u
from dask.distributed import Client, LocalCluster
from dask_jobqueue import SLURMCluster
from IPython import embed
from astropy import units as u
from pathlib import Path

from spiceracs import imager
from spiceracs.utils import port_forward
from spiceracs.logger import logger
from spiceracs.utils import port_forward

logger.setLevel(logging.INFO)


Expand All @@ -41,7 +41,6 @@ def main():
# cluster = LocalCluster(n_workers=10, threads_per_worker=1)
# cluster.adapt(minimum=1, maximum=36)


client = Client(cluster)

port = client.scheduler_info()["services"]["dashboard"]
Expand All @@ -60,7 +59,7 @@ def main():
local_rms_window=60,
auto_threshold=1,
size=6144,
scale=2.5*u.arcsec,
scale=2.5 * u.arcsec,
robust=-0.5,
pols="IQU",
gridder="wgridder",
Expand All @@ -69,9 +68,10 @@ def main():
reimage=True,
multiscale=False,
# parallel_deconvolution=6144,
absmem=float(config["memory"].replace("GB", "").replace("GiB", ""))
absmem=float(config["memory"].replace("GB", "").replace("GiB", "")),
)
logs = client.get_worker_logs()


if __name__ == "__main__":
main()
main()

0 comments on commit 41b26e5

Please sign in to comment.