Skip to content

Commit

Permalink
TRY fix fault inject
Browse files Browse the repository at this point in the history
  • Loading branch information
wjsi committed Feb 10, 2022
1 parent 1cbbde3 commit d3b74b4
Show file tree
Hide file tree
Showing 17 changed files with 139 additions and 66 deletions.
36 changes: 34 additions & 2 deletions mars/deploy/oscar/tests/test_cmdline.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,40 @@ def _reload_args(args):

@pytest.mark.parametrize(
"supervisor_args,worker_args,use_web_addr",
list(start_params.values()),
ids=list(start_params.keys()),
[
pytest.param(
supervisor_cmd_start,
worker_cmd_start
+ [
"--config-file",
os.path.join(os.path.dirname(__file__), "local_test_config.yml"),
],
False,
id="bare_start",
),
pytest.param(
supervisor_cmd_start
+ [
"-e",
lambda: f'127.0.0.1:{_get_labelled_port("supervisor")}',
"-w",
lambda: str(_get_labelled_port("web")),
"--n-process",
"2",
],
worker_cmd_start
+ [
"-e",
lambda: f"127.0.0.1:{get_next_port(occupy=True)}",
"-s",
lambda: f'127.0.0.1:{_get_labelled_port("supervisor")}',
"--config-file",
os.path.join(os.path.dirname(__file__), "local_test_config.yml"),
],
True,
id="with_supervisors",
),
]
)
@flaky(max_runs=10, rerun_filter=lambda err, *_: issubclass(err[0], _rerun_errors))
def test_cmdline_run(supervisor_args, worker_args, use_web_addr):
Expand Down
20 changes: 10 additions & 10 deletions mars/deploy/oscar/tests/test_fault_injection.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,16 +135,16 @@ async def test_fault_inject_subtask_processor(fault_cluster, fault_and_exception
@pytest.mark.parametrize(
"fault_config",
[
[
FaultType.Exception,
{FaultPosition.ON_EXECUTE_OPERAND: 1},
pytest.raises(FaultInjectionError, match="Fault Injection"),
],
[
FaultType.ProcessExit,
{FaultPosition.ON_EXECUTE_OPERAND: 1},
pytest.raises(ServerClosed),
],
# [
# FaultType.Exception,
# {FaultPosition.ON_EXECUTE_OPERAND: 1},
# pytest.raises(FaultInjectionError, match="Fault Injection"),
# ],
# [
# FaultType.ProcessExit,
# {FaultPosition.ON_EXECUTE_OPERAND: 1},
# pytest.raises(ServerClosed),
# ],
[
FaultType.Exception,
{FaultPosition.ON_RUN_SUBTASK: 1},
Expand Down
14 changes: 0 additions & 14 deletions mars/services/scheduling/api/oscar.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,20 +117,6 @@ async def cancel_subtasks(
"""
await self._manager_ref.cancel_subtasks(subtask_ids, kill_timeout=kill_timeout)

async def finish_subtasks(self, subtask_ids: List[str], schedule_next: bool = True):
"""
Mark subtasks as finished, letting scheduling service to schedule
next tasks in the ready queue
Parameters
----------
subtask_ids
ids of subtasks to mark as finished
schedule_next
whether to schedule succeeding subtasks
"""
await self._manager_ref.finish_subtasks(subtask_ids, schedule_next)


class MockSchedulingAPI(SchedulingAPI):
@classmethod
Expand Down
41 changes: 30 additions & 11 deletions mars/services/scheduling/supervisor/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,7 @@ async def __post_create__(self):
AssignerActor.gen_uid(self._session_id), address=self.address
)

@alru_cache
async def _get_task_api(self):
async def _get_task_api(self) -> TaskAPI:
return await TaskAPI.create(self._session_id, self.address)

def _put_subtask_with_priority(self, subtask: Subtask, priority: Tuple = None):
Expand Down Expand Up @@ -272,21 +271,41 @@ async def update_subtask_priorities(

@alru_cache(maxsize=10000)
async def _get_execution_ref(self, address: str):
from ..worker.exec import SubtaskExecutionActor
from ..worker.execution import SubtaskExecutionActor

return await mo.actor_ref(SubtaskExecutionActor.default_uid(), address=address)

async def finish_subtasks(self, subtask_ids: List[str], schedule_next: bool = True):
band_tasks = defaultdict(lambda: 0)
for subtask_id in subtask_ids:
subtask_info = self._subtask_infos.pop(subtask_id, None)
async def set_subtask_results(
self, subtask_results: List[SubtaskResult], source_bands: List[BandType]
):
delays = []
task_api = await self._get_task_api()
for result, band in zip(subtask_results, source_bands):
if result.status == SubtaskStatus.errored:
subtask_info = self._subtask_infos.get(result.subtask_id)
if (
subtask_info is not None
and subtask_info.num_reschedules < subtask_info.max_reschedules
and isinstance(result.error, MarsError)
):
subtask_info.num_reschedules += 1
execution_ref = await self._get_execution_ref(band[0])
await execution_ref.submit_subtasks.tell(
[subtask_info.subtask],
[subtask_info.priority],
self.address,
band[1],
)
continue

subtask_info = self._subtask_infos.pop(result.subtask_id, None)
if subtask_info is not None:
self._subtask_summaries[subtask_id] = subtask_info.to_summary(
self._subtask_summaries[result.subtask_id] = subtask_info.to_summary(
is_finished=True
)
if schedule_next:
for band in subtask_info.submitted_bands:
band_tasks[band] += 1
delays.append(task_api.set_subtask_result.delay(result))

await task_api.set_subtask_result.batch(*delays)

def _get_subtasks_by_ids(self, subtask_ids: List[str]) -> List[Optional[Subtask]]:
subtasks = []
Expand Down
3 changes: 0 additions & 3 deletions mars/services/scheduling/tests/test_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,6 @@ async def test_schedule_success(actor_pools):
subtask.expect_bands = [(worker_pool.external_address, "numa-0")]
await scheduling_api.add_subtasks([subtask], [(0,)])
await task_manager_ref.wait_subtask_result(subtask.subtask_id)
await scheduling_api.finish_subtasks([subtask.subtask_id])

result_key = next(subtask.chunk_graph.iter_indep(reverse=True)).key
result = await storage_api.get(result_key)
Expand All @@ -197,7 +196,6 @@ def _remote_fun(secs):

async def _waiter_fun(subtask_id):
await task_manager_ref.wait_subtask_result(subtask_id)
await scheduling_api.finish_subtasks([subtask_id])
finish_ids.append(subtask_id)
finish_time.append(time.time())

Expand Down Expand Up @@ -245,7 +243,6 @@ def _remote_fun(secs):

async def _waiter_fun(subtask_id):
await task_manager_ref.wait_subtask_result(subtask_id)
await scheduling_api.finish_subtasks([subtask_id])

subtasks = []
wait_tasks = []
Expand Down
2 changes: 1 addition & 1 deletion mars/services/scheduling/worker/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from .exec import SubtaskExecutionActor
from .execution import SubtaskExecutionActor
from .queues import SubtaskExecutionQueueActor, SubtaskPrepareQueueActor
from .quota import QuotaActor, MemQuotaActor, WorkerQuotaManagerActor
from .service import SchedulingWorkerService
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,5 @@
# limitations under the License.

from .actor import SubtaskExecutionActor
from .core import SubtaskExecutionInfo
from .prepare import SubtaskPreparer
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
from ....cluster import ClusterAPI
from ....core import ActorCallback
from ....subtask import Subtask, SubtaskAPI, SubtaskResult, SubtaskStatus
from ....task import TaskAPI
from ..queues import SubtaskPrepareQueueActor, SubtaskExecutionQueueActor
from ..quota import QuotaActor
from ..slotmanager import SlotManagerActor
Expand Down Expand Up @@ -102,19 +101,37 @@ async def _get_band_quota_ref(
) -> Union[mo.ActorRef, QuotaActor]:
return await mo.actor_ref(QuotaActor.gen_uid(band_name), address=self.address)

@staticmethod
@alru_cache(cache_exceptions=False)
async def _get_manager_ref(session_id: str, supervisor_address: str):
from ...supervisor.manager import SubtaskManagerActor

return await mo.actor_ref(
uid=SubtaskManagerActor.gen_uid(session_id),
address=supervisor_address,
)

def _build_subtask_info(
self,
subtask: Subtask,
priority: Tuple,
supervisor_address: str,
band_name: str,
) -> SubtaskExecutionInfo:
subtask_max_retries = (
subtask.extra_config.get("subtask_max_retries")
if subtask.extra_config
else None
)
if subtask_max_retries is None:
subtask_max_retries = self._subtask_max_retries

subtask_info = SubtaskExecutionInfo(
subtask,
priority,
supervisor_address=supervisor_address,
band_name=band_name,
max_retries=self._subtask_max_retries,
max_retries=subtask_max_retries,
)
subtask_info.result = SubtaskResult(
subtask_id=subtask.subtask_id,
Expand Down Expand Up @@ -252,18 +269,19 @@ async def _dequeue_subtask_ids(self, queue_ref, subtask_ids: List[str]):
infos_to_report.append(subtask_info)
await self._report_subtask_results(infos_to_report)

@staticmethod
async def _report_subtask_results(subtask_infos: List[SubtaskExecutionInfo]):
async def _report_subtask_results(self, subtask_infos: List[SubtaskExecutionInfo]):
if not subtask_infos:
return
task_api = await TaskAPI.create(
subtask_infos[0].result.session_id, subtask_infos[0].supervisor_address
try:
manager_ref = await self._get_manager_ref(
subtask_infos[0].result.session_id, subtask_infos[0].supervisor_address
)
except mo.ActorNotExist:
return
await manager_ref.set_subtask_results(
[info.result for info in subtask_infos],
[(self.address, info.band_name) for info in subtask_infos],
)
batch = [
task_api.set_subtask_result.delay(subtask_info.result)
for subtask_info in subtask_infos
]
await task_api.set_subtask_result.batch(*batch)

async def cancel_subtasks(
self, subtask_ids: List[str], kill_timeout: Optional[int] = 5
Expand All @@ -289,13 +307,16 @@ async def cancel_subtasks(

self.uncache_subtasks(subtask_ids)

infos_to_report = []
for subtask_id in subtask_ids:
try:
subtask_info = self._subtask_executions[subtask_id]
except KeyError:
continue
if not subtask_info.result.status.is_done:
self._fill_result_with_exc(subtask_info, exc_cls=asyncio.CancelledError)
infos_to_report.append(subtask_info)
await self._report_subtask_results(infos_to_report)

async def wait_subtasks(self, subtask_ids: List[str]):
infos = [
Expand All @@ -307,6 +328,25 @@ async def wait_subtasks(self, subtask_ids: List[str]):
yield asyncio.wait([info.finish_future for info in infos])
raise mo.Return([info.result for info in infos])

def _create_subtask_with_exception(self, subtask_id, coro):
info = self._subtask_executions[subtask_id]

async def _run_with_exception_handling():
try:
return await coro
except: # noqa: E722 # nosec # pylint: disable=bare-except
self._fill_result_with_exc(info)
await self._report_subtask_results([info])
await self._prepare_queue_ref.release_slot(
info.subtask.subtask_id, errors="ignore"
)
await self._execution_queue_ref.release_slot(
info.subtask.subtask_id, errors="ignore"
)

task = asyncio.create_task(_run_with_exception_handling())
info.aio_tasks.append(task)

async def handle_prepare_queue(self, band_name: str):
while True:
try:
Expand All @@ -322,8 +362,8 @@ async def handle_prepare_queue(self, band_name: str):
continue

logger.debug(f"Obtained subtask {subtask_id} from prepare queue")
subtask_info.aio_tasks.append(
asyncio.create_task(self._prepare_subtask_with_retry(subtask_info))
self._create_subtask_with_exception(
subtask_id, self._prepare_subtask_with_retry(subtask_info)
)

async def handle_execute_queue(self, band_name: str):
Expand Down Expand Up @@ -355,8 +395,8 @@ async def handle_execute_queue(self, band_name: str):
c.key in self._pred_key_mapping_dag
for c in subtask_info.subtask.chunk_graph.result_chunks
)
subtask_info.aio_tasks.append(
asyncio.create_task(self._execute_subtask_with_retry(subtask_info))
self._create_subtask_with_exception(
subtask_id, self._execute_subtask_with_retry(subtask_info)
)

async def _prepare_subtask_once(self, subtask_info: SubtaskExecutionInfo):
Expand Down
2 changes: 1 addition & 1 deletion mars/services/scheduling/worker/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
from .... import oscar as mo
from ....utils import calc_size_by_str
from ...core import AbstractService
from .execution import SubtaskExecutionActor
from .slotmanager import SlotManagerActor
from .queues import SubtaskPrepareQueueActor, SubtaskExecutionQueueActor
from .quota import WorkerQuotaManagerActor
from .exec import SubtaskExecutionActor


class SchedulingWorkerService(AbstractService):
Expand Down
4 changes: 0 additions & 4 deletions mars/services/task/supervisor/stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,9 +152,6 @@ async def set_subtask_result(self, result: SubtaskResult):
await self._update_chunks_meta(self.chunk_graph)

# tell scheduling to finish subtasks
await self._scheduling_api.finish_subtasks(
[result.subtask_id], schedule_next=not error_or_cancelled
)
if self.result.status != TaskStatus.terminated:
self.result = TaskResult(
self.task.task_id,
Expand Down Expand Up @@ -196,7 +193,6 @@ async def set_subtask_result(self, result: SubtaskResult):
# all predecessors finished
to_schedule_subtasks.append(succ_subtask)
await self._schedule_subtasks(to_schedule_subtasks)
await self._scheduling_api.finish_subtasks([result.subtask_id])

async def run(self):
if len(self.subtask_graph) == 0:
Expand Down
3 changes: 3 additions & 0 deletions mars/services/tests/fault_injection_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,16 @@
# limitations under the License.

import enum
import logging
import os
import uuid
from abc import ABC, abstractmethod

from ...core.base import MarsError
from ..session import SessionAPI

logger = logging.getLogger(__name__)


class ExtraConfigKey:
FAULT_INJECTION_MANAGER_NAME = "fault_injection_manager_name"
Expand Down
Loading

0 comments on commit d3b74b4

Please sign in to comment.