From 765f990b8629cfaf566ba7c10e2028c5e9c1ebf6 Mon Sep 17 00:00:00 2001 From: Wenjun Si Date: Thu, 17 Feb 2022 16:25:39 +0800 Subject: [PATCH] Try resolve missing items in cache --- mars/services/scheduling/utils.py | 59 +++++++++++++++++++ .../scheduling/worker/execution/actor.py | 12 ++-- .../subtask/worker/tests/subtask_processor.py | 13 +++- 3 files changed, 78 insertions(+), 6 deletions(-) diff --git a/mars/services/scheduling/utils.py b/mars/services/scheduling/utils.py index 56490313e0..4c602e928c 100644 --- a/mars/services/scheduling/utils.py +++ b/mars/services/scheduling/utils.py @@ -15,6 +15,9 @@ import asyncio import contextlib import sys +import time +from collections import OrderedDict +from typing import Dict, Mapping, Optional, TypeVar, Iterator from ... import oscar as mo from ...lib.aio import alru_cache @@ -58,3 +61,59 @@ async def redirect_subtask_errors(actor: mo.Actor, subtasks): ) await asyncio.wait(coros) raise + + +ResultType = TypeVar("ResultType") + + +class ResultCache(Mapping[str, ResultType]): + _cache: Dict[str, ResultType] + _cache_time: Dict[str, float] + _duration: float + + def __init__(self, duration: float = 120): + self._cache = dict() + self._cache_time = OrderedDict() + self._duration = duration + + def __getitem__(self, item: str): + self._del_expired_items() + return self._cache[item] + + def get( + self, key: str, default: Optional[ResultType] = None + ) -> Optional[ResultType]: + self._del_expired_items() + return self._cache.get(key, default) + + def _del_expired_items(self): + keys = [] + expire_time = time.time() - self._duration + for key, store_time in self._cache_time.items(): + if store_time < expire_time: + break + keys.append(key) + for key in keys: + self._delitem(key) + + def __setitem__(self, key: str, value): + self._del_expired_items() + self._cache[key] = value + self._cache_time[key] = time.time() + + def _delitem(self, key: str): + del self._cache[key] + self._cache_time.pop(key, None) + + def __delitem__(self, key: str): + self._delitem(key) + self._del_expired_items() + + def __len__(self) -> int: + self._del_expired_items() + return len(self._cache) + + def __iter__(self) -> Iterator[str]: + self._del_expired_items() + return iter(self._cache) + diff --git a/mars/services/scheduling/worker/execution/actor.py b/mars/services/scheduling/worker/execution/actor.py index 4d54d56fe9..a127547d30 100644 --- a/mars/services/scheduling/worker/execution/actor.py +++ b/mars/services/scheduling/worker/execution/actor.py @@ -26,6 +26,7 @@ from ....cluster import ClusterAPI from ....core import ActorCallback from ....subtask import Subtask, SubtaskAPI, SubtaskResult, SubtaskStatus +from ...utils import ResultCache from ..queues import SubtaskPrepareQueueActor, SubtaskExecutionQueueActor from ..quota import QuotaActor from ..slotmanager import SlotManagerActor @@ -49,6 +50,7 @@ class SubtaskExecutionActor(mo.Actor): _subtask_api: SubtaskAPI _subtask_preparer: SubtaskPreparer + _subtask_result_cache: ResultCache[SubtaskResult] def __init__( self, @@ -56,11 +58,13 @@ def __init__( enable_kill_slot: bool = True, ): self._pred_key_mapping_dag = DAG() - self._subtask_caches = dict() - self._subtask_executions = dict() self._prepare_queue_ref = None self._execution_queue_ref = None + self._subtask_caches = dict() + self._subtask_executions = dict() + self._subtask_result_cache = ResultCache() + self._subtask_max_retries = subtask_max_retries or DEFAULT_SUBTASK_MAX_RETRIES self._enable_kill_slot = enable_kill_slot @@ -222,6 +226,7 @@ async def submit_subtasks( priorities: List[Tuple], supervisor_address: str, band_name: str, + rerun_when_fail: bool = False, ): assert len(subtasks) == len(priorities) logger.debug("%d subtasks submitted to SubtaskExecutionActor", len(subtasks)) @@ -249,7 +254,6 @@ async def submit_subtasks( supervisor_address=supervisor_address, band_name=band_name, ) - self._subtask_caches.pop(subtask.subtask_id, None) self._subtask_executions[subtask.subtask_id] = subtask_info put_delays.append( self._prepare_queue_ref.put.delay( @@ -322,7 +326,7 @@ async def cancel_subtasks( continue if not subtask_info.result.status.is_done: self._fill_result_with_exc(subtask_info, exc_cls=asyncio.CancelledError) - infos_to_report.append(subtask_info) + infos_to_report.append(subtask_info) await self._report_subtask_results(infos_to_report) async def wait_subtasks(self, subtask_ids: List[str]): diff --git a/mars/services/subtask/worker/tests/subtask_processor.py b/mars/services/subtask/worker/tests/subtask_processor.py index 89a1170740..9729cde3a5 100644 --- a/mars/services/subtask/worker/tests/subtask_processor.py +++ b/mars/services/subtask/worker/tests/subtask_processor.py @@ -50,10 +50,19 @@ def _execute_operand(self, ctx: Dict[str, Any], op: OperandType): continue self.assert_object_consistent(out, ctx[out.key]) - async def done(self): - await super().done() + def _unregister_executors(self): for op in self._operand_executors: try: op.unregister_executor() except KeyError: pass + + async def _release_scheduling(self): + # once the operand stops running, the slot may be reused immediately + # thus executors must be cleaned in time + self._unregister_executors() + await super()._release_scheduling() + + async def done(self): + await super().done() + self._unregister_executors()