From ed300c5d221bee45c38f3633be24c588fd917ada Mon Sep 17 00:00:00 2001 From: Wenjun Si Date: Sat, 9 Apr 2022 22:42:59 +0800 Subject: [PATCH] [BACKPORT][Ray] Use main pool as owner when autoscale disabled (#2878) (#2903) Co-authored-by: Shawn --- .github/workflows/cancel-prev.yml | 12 ----------- .github/workflows/core-ci.yml | 4 ++++ .github/workflows/docker-cd.yml | 4 ++++ .github/workflows/os-compat-ci.yml | 4 ++++ .github/workflows/platform-ci.yml | 4 ++++ .github/workflows/pypi-cd.yml | 4 ++++ mars/services/storage/core.py | 11 ---------- mars/services/storage/worker/service.py | 28 ++++++++++++++++++++++--- 8 files changed, 45 insertions(+), 26 deletions(-) delete mode 100644 .github/workflows/cancel-prev.yml diff --git a/.github/workflows/cancel-prev.yml b/.github/workflows/cancel-prev.yml deleted file mode 100644 index b542e8ccf8..0000000000 --- a/.github/workflows/cancel-prev.yml +++ /dev/null @@ -1,12 +0,0 @@ -name: Stop Duplicated Flows - -on: [push, pull_request_target] - -jobs: - cancel: - runs-on: ubuntu-latest - steps: - - uses: styfle/cancel-workflow-action@0.9.1 - with: - access_token: ${{ github.token }} - workflow_id: core-ci.yml,os-compat-ci.yml,platform-ci.yml diff --git a/.github/workflows/core-ci.yml b/.github/workflows/core-ci.yml index e7fc1e1998..f521dea984 100644 --- a/.github/workflows/core-ci.yml +++ b/.github/workflows/core-ci.yml @@ -7,6 +7,10 @@ on: pull_request: types: ['opened', 'reopened', 'synchronize'] +concurrency: + group: ${{ github.workflow }}-${{ github.ref }} + cancel-in-progress: true + jobs: build: runs-on: ${{ matrix.os }} diff --git a/.github/workflows/docker-cd.yml b/.github/workflows/docker-cd.yml index d8b4cab626..c28f987a9d 100644 --- a/.github/workflows/docker-cd.yml +++ b/.github/workflows/docker-cd.yml @@ -7,6 +7,10 @@ on: tags: - '*' +concurrency: + group: ${{ github.workflow }}-${{ github.ref }} + cancel-in-progress: true + jobs: build: runs-on: ubuntu-latest diff --git a/.github/workflows/os-compat-ci.yml b/.github/workflows/os-compat-ci.yml index 2e229d438f..7a906e5c66 100644 --- a/.github/workflows/os-compat-ci.yml +++ b/.github/workflows/os-compat-ci.yml @@ -7,6 +7,10 @@ on: pull_request: types: ['opened', 'reopened', 'synchronize'] +concurrency: + group: ${{ github.workflow }}-${{ github.ref }} + cancel-in-progress: true + jobs: build: runs-on: ${{ matrix.os }} diff --git a/.github/workflows/platform-ci.yml b/.github/workflows/platform-ci.yml index 320aaa0989..e7a629e72b 100644 --- a/.github/workflows/platform-ci.yml +++ b/.github/workflows/platform-ci.yml @@ -7,6 +7,10 @@ on: pull_request: types: ['opened', 'reopened', 'synchronize'] +concurrency: + group: ${{ github.workflow }}-${{ github.ref }} + cancel-in-progress: true + jobs: build: runs-on: ${{ matrix.os }} diff --git a/.github/workflows/pypi-cd.yml b/.github/workflows/pypi-cd.yml index dd7bc31b99..189e1e3c51 100644 --- a/.github/workflows/pypi-cd.yml +++ b/.github/workflows/pypi-cd.yml @@ -5,6 +5,10 @@ on: tags: - '*' +concurrency: + group: ${{ github.workflow }}-${{ github.ref }} + cancel-in-progress: true + jobs: build: name: Build wheels on ${{ matrix.os }} for ${{ matrix.arch }} diff --git a/mars/services/storage/core.py b/mars/services/storage/core.py index 49627da05a..ca73943f32 100644 --- a/mars/services/storage/core.py +++ b/mars/services/storage/core.py @@ -550,17 +550,6 @@ async def _setup_storage( ): backend = get_storage_backend(storage_backend) storage_config = storage_config or dict() - - from ..cluster import ClusterAPI - - if backend.name == "ray": - try: - cluster_api = await ClusterAPI.create(self.address) - supervisor_address = (await cluster_api.get_supervisors())[0] - # ray storage backend need to set supervisor as owner to avoid data lost when worker dies. - storage_config["owner"] = supervisor_address - except mo.ActorNotExist: - pass init_params, teardown_params = await backend.setup(**storage_config) client = backend(**init_params) self._init_params[band_name][storage_backend] = init_params diff --git a/mars/services/storage/worker/service.py b/mars/services/storage/worker/service.py index f853c98f3a..a91c2c79cc 100644 --- a/mars/services/storage/worker/service.py +++ b/mars/services/storage/worker/service.py @@ -36,9 +36,31 @@ async def start(self): backends = storage_configs.get("backends") options = storage_configs.get("default_config", dict()) transfer_block_size = options.get("transfer_block_size", None) - backend_config = { - backend: storage_configs.get(backend, dict()) for backend in backends - } + backend_config = {} + for backend in backends: + storage_config = storage_configs.get(backend, dict()) + backend_config[backend] = storage_config + if backend == "ray": + # Specify supervisor as ray owner will be costly when mars do shuffle which there will be m*n objects + # need to specify supervisor as owner, so enable it only for auto scale to avoid data lost when scale + # in. This limit can be removed when ray support ownership transfer. + if ( + self._config.get("scheduling", {}) + .get("autoscale", {}) + .get("enabled", False) + ): + try: + from ...cluster.api import ClusterAPI + + cluster_api = await ClusterAPI.create(self._address) + supervisor_address = (await cluster_api.get_supervisors())[0] + # ray storage backend need to set supervisor as owner to avoid data lost when worker dies. + owner = supervisor_address + except mo.ActorNotExist: + owner = self._address + else: + owner = self._address + storage_config["owner"] = owner await mo.create_actor( StorageManagerActor,