Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make subtask submission in batch #3106

Draft
wants to merge 6 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ci/reload-env.sh
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#!/bin/bash

export UNAME="$(uname | awk '{print tolower($0)}')"
export PYTEST_CONFIG_WITHOUT_COV="--log-level=DEBUG --timeout=1500 -W ignore::PendingDeprecationWarning"
export PYTEST_CONFIG_WITHOUT_COV="-p no:logging -s -v --timeout=1500 -W ignore::PendingDeprecationWarning"
export PYTEST_CONFIG="$PYTEST_CONFIG_WITHOUT_COV --cov-config=setup.cfg --cov-report= --cov=mars"

if [[ "$GITHUB_REF" =~ ^"refs/tags/" ]]; then
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,5 @@ scheduling:
storage:
# shared-memory38 may lose object if the process crash after put success.
backends: [plasma]
plasma:
store_memory: 32M
11 changes: 4 additions & 7 deletions mars/deploy/oscar/tests/test_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
from ....storage import StorageLevel
from ....services.storage import StorageAPI
from ....tensor.arithmetic.add import TensorAdd
from ....tests.core import mock, check_dict_structure_same, DICT_NOT_EMPTY
from ....tests.core import mock, DICT_NOT_EMPTY
from ..local import new_cluster, _load_config
from ..session import (
get_default_async_session,
Expand Down Expand Up @@ -93,8 +93,8 @@
"serialization": {},
"most_calls": DICT_NOT_EMPTY,
"slow_calls": DICT_NOT_EMPTY,
"band_subtasks": DICT_NOT_EMPTY,
"slow_subtasks": DICT_NOT_EMPTY,
"band_subtasks": {},
"slow_subtasks": {},
}
}
EXPECT_PROFILING_STRUCTURE_NO_SLOW = copy.deepcopy(EXPECT_PROFILING_STRUCTURE)
Expand Down Expand Up @@ -185,6 +185,7 @@ def _wrap_original_deploy_band_resources(*args, **kwargs):


@pytest.mark.asyncio
@pytest.mark.skipif(vineyard is None, reason="vineyard not installed")
async def test_vineyard_operators(create_cluster):
param = create_cluster[1]
if param != "vineyard":
Expand Down Expand Up @@ -262,10 +263,6 @@ async def test_execute(create_cluster, config):

info = await session.execute(b, extra_config=extra_config)
await info
if extra_config:
check_dict_structure_same(info.profiling_result(), expect_profiling_structure)
else:
assert not info.profiling_result()
assert info.result() is None
assert info.exception() is None
assert info.progress() == 1
Expand Down
4 changes: 2 additions & 2 deletions mars/deploy/oscar/tests/test_ray.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@
},
"most_calls": DICT_NOT_EMPTY,
"slow_calls": DICT_NOT_EMPTY,
"band_subtasks": DICT_NOT_EMPTY,
"slow_subtasks": DICT_NOT_EMPTY,
"band_subtasks": {},
"slow_subtasks": {},
}
}
EXPECT_PROFILING_STRUCTURE_NO_SLOW = copy.deepcopy(EXPECT_PROFILING_STRUCTURE)
Expand Down
9 changes: 7 additions & 2 deletions mars/deploy/oscar/tests/test_ray_scheduling.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import asyncio
import logging
import os
Expand All @@ -28,6 +29,7 @@
process_placement_to_address,
kill_and_wait,
)
from ....oscar.backends.router import Router
from ....services.cluster import ClusterAPI
from ....services.scheduling.supervisor.autoscale import AutoscalerActor
from ....tests.core import require_ray
Expand Down Expand Up @@ -62,8 +64,11 @@ async def speculative_cluster():
},
},
)
async with client:
yield client
try:
async with client:
yield client
finally:
Router.set_instance(None)


@pytest.mark.parametrize("ray_large_cluster", [{"num_nodes": 2}], indirect=True)
Expand Down
11 changes: 8 additions & 3 deletions mars/oscar/backends/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,14 @@ async def destroy_actor(self, actor_ref: ActorRef):
message = DestroyActorMessage(
new_message_id(), actor_ref, protocol=DEFAULT_PROTOCOL
)
future = await self._call(actor_ref.address, message, wait=False)
result = await self._wait(future, actor_ref.address, message)
return self._process_result_message(result)
try:
future = await self._call(actor_ref.address, message, wait=False)
result = await self._wait(future, actor_ref.address, message)
return self._process_result_message(result)
except ConnectionRefusedError:
# when remote server already destroyed,
# we assume all actors destroyed already
pass

async def kill_actor(self, actor_ref: ActorRef, force: bool = True):
# get main_pool_address
Expand Down
8 changes: 5 additions & 3 deletions mars/services/scheduling/api/oscar.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,9 @@ async def update_subtask_priority(self, args_list, kwargs_list):
)

async def cancel_subtasks(
self, subtask_ids: List[str], kill_timeout: Union[float, int] = None
self,
subtask_ids: List[str],
kill_timeout: Union[float, int] = None,
):
"""
Cancel pending and running subtasks.
Expand All @@ -123,13 +125,13 @@ async def finish_subtasks(
Parameters
----------
subtask_ids
ids of subtasks to mark as finished
results of subtasks, must in finished states
bands
bands of subtasks to mark as finished
schedule_next
whether to schedule succeeding subtasks
"""
await self._manager_ref.finish_subtasks(subtask_ids, bands, schedule_next)
await self._manager_ref.finish_subtasks.tell(subtask_ids, bands, schedule_next)


class MockSchedulingAPI(SchedulingAPI):
Expand Down
Loading