Skip to content

Commit

Permalink
Fix coiled compatibility
Browse files Browse the repository at this point in the history
  • Loading branch information
luis11011 committed Oct 4, 2021
1 parent a5cf287 commit e5c8448
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 17 deletions.
19 changes: 9 additions & 10 deletions optimus/engines/dask/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,15 @@ class DaskEngine(BaseEngine):

# Using procces or threads https://stackoverflow.com/questions/51099685/best-practices-in-setting-number-of-dask-workers
def __init__(self, session=None, address=None, n_workers=None, threads_per_worker=None, processes=False,
memory_limit='4GB', verbose=False, coiled_token=None, *args, **kwargs):
memory_limit=None, verbose=False, coiled_token=None, *args, **kwargs):

if n_workers is None:
if n_workers is None and not coiled_token:
import psutil
threads_per_worker = psutil.cpu_count() * 4

self.verbose(verbose)

use_remote = kwargs.get("use_remote", coiled_token is not None)

if kwargs.get("use_remote", None):
del kwargs["use_remote"]
use_remote = kwargs.pop("use_remote", False)

if coiled_token:
import coiled
Expand All @@ -39,14 +36,15 @@ def __init__(self, session=None, address=None, n_workers=None, threads_per_worke

idle_timeout = kwargs.get("idle_timeout", None)

memory_limit = memory_limit or '16 GiB'

cluster = coiled.Cluster(
name=kwargs.get("name"),
worker_options={
**({"nthreads": threads_per_worker} if threads_per_worker else {}),
**({"memory_limit": memory_limit} if memory_limit else {})
**({"nthreads": threads_per_worker} if threads_per_worker else {})
},
n_workers=n_workers,
worker_memory='15GiB',
n_workers=n_workers if n_workers else 4,
worker_memory=memory_limit,
scheduler_options={
**({"idle_timeout": idle_timeout} if idle_timeout else {})
},
Expand All @@ -67,6 +65,7 @@ def __init__(self, session=None, address=None, n_workers=None, threads_per_worke
try:
self.client = get_client()
except ValueError:
memory_limit = memory_limit or '4GB'
self.client = Client(address=address, n_workers=n_workers, threads_per_worker=threads_per_worker,
processes=processes, memory_limit=memory_limit, *args, **kwargs)

Expand Down
14 changes: 8 additions & 6 deletions optimus/engines/dask_cudf/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

class DaskCUDFEngine(BaseEngine):
def __init__(self, session=None, address=None, n_workers=1, threads_per_worker=8, processes=False,
memory_limit='4GB', verbose=False, coiled_token=None, *args, **kwargs):
memory_limit=None, verbose=False, coiled_token=None, *args, **kwargs):

"""
Expand Down Expand Up @@ -42,16 +42,17 @@ def __init__(self, session=None, address=None, n_workers=1, threads_per_worker=8

idle_timeout = kwargs.get("idle_timeout", None)

memory_limit = memory_limit or '16 GiB'

cluster = coiled.Cluster(
name=kwargs.get("name"),
worker_options={
**({"nthreads": threads_per_worker} if threads_per_worker else {}),
**({"memory_limit": memory_limit} if memory_limit else {})
**({"nthreads": threads_per_worker} if threads_per_worker else {})
},
worker_gpu=1,
worker_class='dask_cuda.CUDAWorker',
n_workers=n_workers,
worker_memory='15GiB',
n_workers=n_workers if n_workers else 4,
worker_memory=memory_limit,
backend_options={
"region": kwargs.get("backend_region", "us-east-1")
},
Expand Down Expand Up @@ -91,7 +92,8 @@ def __init__(self, session=None, address=None, n_workers=1, threads_per_worker=8
device_memory_limit=memoryTotal * 0.8
# Spill to RAM when 80% memory is full
)
self.client = Client(cluster, *args, **kwargs)
memory_limit = memory_limit or '4GB'
self.client = Client(cluster, memory_limit=memory_limit, *args, **kwargs)

if use_remote:
self.remote = RemoteOptimusInterface(self.client, Engine.DASK_CUDF.value)
Expand Down
2 changes: 1 addition & 1 deletion requirements/dask-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@ dask[complete]==2021.9.0
distributed==2021.9.0
dask-ml>=1.9.0
pyarrow==1.0.1
coiled>=0.0.30
coiled>=0.0.52

0 comments on commit e5c8448

Please sign in to comment.