Skip to content

Commit

Permalink
PARTIAL add worker scheduling
Browse files Browse the repository at this point in the history
  • Loading branch information
wjsi committed Feb 10, 2022
1 parent dab1fa5 commit 1cbbde3
Show file tree
Hide file tree
Showing 45 changed files with 2,527 additions and 2,945 deletions.
2 changes: 2 additions & 0 deletions mars/oscar/backends/ray/tests/test_ray_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@


class TestActor(mo.Actor):
__test__ = False

async def kill(self, address, uid):
actor_ref = await mo.actor_ref(address, uid)
task = asyncio.create_task(actor_ref.crash())
Expand Down
2 changes: 2 additions & 0 deletions mars/oscar/backends/test/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@


class TestMainActorPool(MainActorPool):
__test__ = False

@classmethod
def get_external_addresses(
cls, address: str, n_process: int = None, ports: List[int] = None
Expand Down
4 changes: 2 additions & 2 deletions mars/oscar/core.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,8 @@ cdef class ActorRefMethod:
"""
Wrapper for an Actor method at client
"""
cdef ActorRef ref
cdef object method_name
cdef public ActorRef ref
cdef public object method_name
cdef object _options

def __init__(self, ref, method_name, options=None):
Expand Down
2 changes: 2 additions & 0 deletions mars/remote/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ def __init__(
n_output=None,
**kw,
):
function_args = function_args or []
function_kwargs = function_kwargs or {}
super().__init__(
_function=function,
_function_args=function_args,
Expand Down
2 changes: 1 addition & 1 deletion mars/services/cluster/api/oscar.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ async def get_mars_versions(self) -> List[str]:
node_info_ref = await self._get_node_info_ref()
return await node_info_ref.get_mars_versions()

async def get_bands(self) -> Dict:
async def get_bands(self) -> Dict[BandType, int]:
"""
Get bands that can be used for computation on current node.
Expand Down
39 changes: 39 additions & 0 deletions mars/services/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,16 @@
import warnings
from typing import Dict, Iterable, List, Union

from .. import oscar as mo
from ..lib.aio import alru_cache
from ..serialization.serializables import (
Serializable,
BytesField,
StringField,
TupleField,
DictField,
)

_ModulesType = Union[List, str, None]


Expand All @@ -28,6 +38,35 @@ class NodeRole(enum.Enum):
WORKER = 1


class ActorCallback(Serializable):
actor_uid: bytes = BytesField("actor_uid")
actor_address: str = StringField("actor_address")
actor_method: str = StringField("actor_method")
args: tuple = TupleField("args")
kwargs: dict = DictField("kwargs")

def __init__(self, ref_method=None, **kw):
if ref_method is not None:
kw["actor_uid"] = ref_method.ref.uid
kw["actor_address"] = ref_method.ref.address
kw["actor_method"] = ref_method.method_name
kw["args"] = kw.get("args") or ()
kw["kwargs"] = kw.get("kwargs") or {}
super().__init__(**kw)

@classmethod
@alru_cache(cache_exceptions=False)
async def _get_ref(cls, actor_uid: bytes, actor_address: str):
return await mo.actor_ref(actor_uid, address=actor_address)

async def __call__(self, *args, **kwargs):
ref = await self._get_ref(self.actor_uid, self.actor_address)
args = self.args + args
kw = self.kwargs.copy()
kw.update(kwargs)
return await getattr(ref, self.actor_method)(*args, **kw)


class AbstractService(abc.ABC):
_instances = dict()

Expand Down
77 changes: 45 additions & 32 deletions mars/services/scheduling/api/oscar.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,11 @@


class SchedulingAPI(AbstractSchedulingAPI):
def __init__(
self, session_id: str, address: str, manager_ref=None, queueing_ref=None
):
def __init__(self, session_id: str, address: str, manager_ref=None):
self._session_id = session_id
self._address = address

self._manager_ref = manager_ref
self._queueing_ref = queueing_ref

@classmethod
@alru_cache
Expand All @@ -41,20 +38,30 @@ async def create(cls: Type[APIType], session_id: str, address: str) -> APIType:
manager_ref = await mo.actor_ref(
SubtaskManagerActor.gen_uid(session_id), address=address
)
from ..supervisor.queueing import SubtaskQueueingActor

queueing_ref = await mo.actor_ref(
SubtaskQueueingActor.gen_uid(session_id), address=address
)

scheduling_api = SchedulingAPI(session_id, address, manager_ref, queueing_ref)
scheduling_api = SchedulingAPI(session_id, address, manager_ref)
return scheduling_api

async def get_subtask_schedule_summaries(
self, task_id: Optional[str] = None
) -> List[SubtaskScheduleSummary]:
return await self._manager_ref.get_schedule_summaries(task_id)

async def cache_subtasks(
self, subtasks: List[Subtask], priorities: Optional[List[Tuple]] = None
):
"""
Add subtask graph to cache for fast forwarding
Parameters
----------
subtasks
list of subtasks to be submitted to service
priorities
list of priorities of subtasks
"""
await self._manager_ref.cache_subtasks(subtasks, priorities)

async def add_subtasks(
self, subtasks: List[Subtask], priorities: Optional[List[Tuple]] = None
):
Expand Down Expand Up @@ -88,12 +95,12 @@ async def update_subtask_priority(self, subtask_id: str, priority: Tuple):

@update_subtask_priority.batch
async def update_subtask_priority(self, args_list, kwargs_list):
await self._queueing_ref.update_subtask_priority.batch(
*(
self._queueing_ref.update_subtask_priority.delay(*args, **kwargs)
for args, kwargs in zip(args_list, kwargs_list)
)
)
subtask_ids, priorities = [], []
for args, kwargs in zip(args_list, kwargs_list):
subtask_id, priority = self.update_subtask_priority.bind(*args, **kwargs)
subtask_ids.append(subtask_id)
priorities.append(priority)
await self._manager_ref.update_subtask_priorities(subtask_ids, priorities)

async def cancel_subtasks(
self, subtask_ids: List[str], kill_timeout: Union[float, int] = 5
Expand Down Expand Up @@ -128,33 +135,39 @@ async def finish_subtasks(self, subtask_ids: List[str], schedule_next: bool = Tr
class MockSchedulingAPI(SchedulingAPI):
@classmethod
async def create(cls: Type[APIType], session_id: str, address: str) -> APIType:
from ..supervisor import GlobalSlotManagerActor, AutoscalerActor

await mo.create_actor(
GlobalSlotManagerActor,
uid=GlobalSlotManagerActor.default_uid(),
address=address,
)
await mo.create_actor(
AutoscalerActor, {}, uid=AutoscalerActor.default_uid(), address=address
)
# from ..supervisor import AutoscalerActor
# await mo.create_actor(
# AutoscalerActor, {}, uid=AutoscalerActor.default_uid(), address=address
# )

from .... import resource as mars_resource
from ..worker import (
SubtaskExecutionActor,
WorkerSlotManagerActor,
SubtaskPrepareQueueActor,
SubtaskExecutionQueueActor,
WorkerQuotaManagerActor,
SlotManagerActor,
)

await mo.create_actor(
SubtaskExecutionActor,
subtask_max_retries=0,
uid=SubtaskExecutionActor.default_uid(),
SlotManagerActor,
uid=SlotManagerActor.default_uid(),
address=address,
)
await mo.create_actor(
WorkerSlotManagerActor,
uid=WorkerSlotManagerActor.default_uid(),
SubtaskPrepareQueueActor,
uid=SubtaskPrepareQueueActor.default_uid(),
address=address,
)
await mo.create_actor(
SubtaskExecutionQueueActor,
uid=SubtaskExecutionQueueActor.default_uid(),
address=address,
)
await mo.create_actor(
SubtaskExecutionActor,
subtask_max_retries=0,
uid=SubtaskExecutionActor.default_uid(),
address=address,
)
await mo.create_actor(
Expand Down
2 changes: 0 additions & 2 deletions mars/services/scheduling/supervisor/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,5 @@

from .assigner import AssignerActor
from .autoscale import AutoscalerActor
from .globalslot import GlobalSlotManagerActor
from .manager import SubtaskManagerActor
from .queueing import SubtaskQueueingActor
from .service import SchedulingSupervisorService
21 changes: 11 additions & 10 deletions mars/services/scheduling/supervisor/autoscale.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,11 @@ async def __post_create__(self):
strategy_cls = getattr(importlib.import_module(module), name)
else:
strategy_cls = PendingTaskBacklogStrategy
from ..supervisor import GlobalSlotManagerActor

self.global_slot_ref = await mo.actor_ref(
GlobalSlotManagerActor.default_uid(), address=self.address
)
# from ..supervisor import GlobalSlotManagerActor
#
# self.global_slot_ref = await mo.actor_ref(
# GlobalSlotManagerActor.default_uid(), address=self.address
# )
self._cluster_api = await ClusterAPI.create(self.address)
self._strategy = await strategy_cls.create(self._autoscale_conf, self)
if self._enabled:
Expand All @@ -62,11 +62,12 @@ async def __pre_destroy__(self):
await self._strategy.stop()

async def register_session(self, session_id: str, address: str):
from .queueing import SubtaskQueueingActor

self.queueing_refs[session_id] = await mo.actor_ref(
SubtaskQueueingActor.gen_uid(session_id), address=address
)
pass
# from .queueing import SubtaskQueueingActor
#
# self.queueing_refs[session_id] = await mo.actor_ref(
# SubtaskQueueingActor.gen_uid(session_id), address=address
# )

async def unregister_session(self, session_id: str):
self.queueing_refs.pop(session_id, None)
Expand Down
Loading

0 comments on commit 1cbbde3

Please sign in to comment.