Skip to content

Commit

Permalink
TRY fix fault inject
Browse files Browse the repository at this point in the history
  • Loading branch information
wjsi committed Feb 16, 2022
1 parent f0e8730 commit e25ff2e
Show file tree
Hide file tree
Showing 16 changed files with 265 additions and 110 deletions.
68 changes: 34 additions & 34 deletions mars/deploy/oscar/tests/test_cmdline.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,38 +111,6 @@ def _get_labelled_port(label=None, create=True):

supervisor_cmd_start = [sys.executable, "-m", "mars.deploy.oscar.supervisor"]
worker_cmd_start = [sys.executable, "-m", "mars.deploy.oscar.worker"]
start_params = {
"bare_start": [
supervisor_cmd_start,
worker_cmd_start
+ [
"--config-file",
os.path.join(os.path.dirname(__file__), "local_test_config.yml"),
],
False,
],
"with_supervisors": [
supervisor_cmd_start
+ [
"-e",
lambda: f'127.0.0.1:{_get_labelled_port("supervisor")}',
"-w",
lambda: str(_get_labelled_port("web")),
"--n-process",
"2",
],
worker_cmd_start
+ [
"-e",
lambda: f"127.0.0.1:{get_next_port(occupy=True)}",
"-s",
lambda: f'127.0.0.1:{_get_labelled_port("supervisor")}',
"--config-file",
os.path.join(os.path.dirname(__file__), "local_test_config.yml"),
],
True,
],
}


def _reload_args(args):
Expand All @@ -159,8 +127,40 @@ def _reload_args(args):

@pytest.mark.parametrize(
"supervisor_args,worker_args,use_web_addr",
list(start_params.values()),
ids=list(start_params.keys()),
[
pytest.param(
supervisor_cmd_start,
worker_cmd_start
+ [
"--config-file",
os.path.join(os.path.dirname(__file__), "local_test_config.yml"),
],
False,
id="bare_start",
),
pytest.param(
supervisor_cmd_start
+ [
"-e",
lambda: f'127.0.0.1:{_get_labelled_port("supervisor")}',
"-w",
lambda: str(_get_labelled_port("web")),
"--n-process",
"2",
],
worker_cmd_start
+ [
"-e",
lambda: f"127.0.0.1:{get_next_port(occupy=True)}",
"-s",
lambda: f'127.0.0.1:{_get_labelled_port("supervisor")}',
"--config-file",
os.path.join(os.path.dirname(__file__), "local_test_config.yml"),
],
True,
id="with_supervisors",
),
],
)
@flaky(max_runs=10, rerun_filter=lambda err, *_: issubclass(err[0], _rerun_errors))
def test_cmdline_run(supervisor_args, worker_args, use_web_addr):
Expand Down
14 changes: 0 additions & 14 deletions mars/services/scheduling/api/oscar.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,20 +117,6 @@ async def cancel_subtasks(
"""
await self._manager_ref.cancel_subtasks(subtask_ids, kill_timeout=kill_timeout)

async def finish_subtasks(self, subtask_ids: List[str], schedule_next: bool = True):
"""
Mark subtasks as finished, letting scheduling service to schedule
next tasks in the ready queue
Parameters
----------
subtask_ids
ids of subtasks to mark as finished
schedule_next
whether to schedule succeeding subtasks
"""
await self._manager_ref.finish_subtasks(subtask_ids, schedule_next)


class MockSchedulingAPI(SchedulingAPI):
@classmethod
Expand Down
47 changes: 36 additions & 11 deletions mars/services/scheduling/supervisor/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,7 @@ async def __post_create__(self):
AssignerActor.gen_uid(self._session_id), address=self.address
)

@alru_cache
async def _get_task_api(self):
async def _get_task_api(self) -> TaskAPI:
return await TaskAPI.create(self._session_id, self.address)

def _put_subtask_with_priority(self, subtask: Subtask, priority: Tuple = None):
Expand Down Expand Up @@ -272,21 +271,47 @@ async def update_subtask_priorities(

@alru_cache(maxsize=10000)
async def _get_execution_ref(self, address: str):
from ..worker.exec import SubtaskExecutionActor
from ..worker.execution import SubtaskExecutionActor

return await mo.actor_ref(SubtaskExecutionActor.default_uid(), address=address)

async def finish_subtasks(self, subtask_ids: List[str], schedule_next: bool = True):
band_tasks = defaultdict(lambda: 0)
for subtask_id in subtask_ids:
subtask_info = self._subtask_infos.pop(subtask_id, None)
async def set_subtask_results(
self, subtask_results: List[SubtaskResult], source_bands: List[BandType]
):
delays = []
task_api = await self._get_task_api()
for result, band in zip(subtask_results, source_bands):
if result.status == SubtaskStatus.errored:
subtask_info = self._subtask_infos.get(result.subtask_id)
if (
subtask_info is not None
and subtask_info.subtask.retryable
and subtask_info.num_reschedules < subtask_info.max_reschedules
and isinstance(result.error, (MarsError, OSError))
):
subtask_info.num_reschedules += 1
logger.warning(
"Resubmit subtask %s at attempt %d",
subtask_info.subtask.subtask_id,
subtask_info.num_reschedules,
)
execution_ref = await self._get_execution_ref(band[0])
await execution_ref.submit_subtasks.tell(
[subtask_info.subtask],
[subtask_info.priority],
self.address,
band[1],
)
continue

subtask_info = self._subtask_infos.pop(result.subtask_id, None)
if subtask_info is not None:
self._subtask_summaries[subtask_id] = subtask_info.to_summary(
self._subtask_summaries[result.subtask_id] = subtask_info.to_summary(
is_finished=True
)
if schedule_next:
for band in subtask_info.submitted_bands:
band_tasks[band] += 1
delays.append(task_api.set_subtask_result.delay(result))

await task_api.set_subtask_result.batch(*delays)

def _get_subtasks_by_ids(self, subtask_ids: List[str]) -> List[Optional[Subtask]]:
subtasks = []
Expand Down
3 changes: 0 additions & 3 deletions mars/services/scheduling/tests/test_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,6 @@ async def test_schedule_success(actor_pools):
subtask.expect_bands = [(worker_pool.external_address, "numa-0")]
await scheduling_api.add_subtasks([subtask], [(0,)])
await task_manager_ref.wait_subtask_result(subtask.subtask_id)
await scheduling_api.finish_subtasks([subtask.subtask_id])

result_key = next(subtask.chunk_graph.iter_indep(reverse=True)).key
result = await storage_api.get(result_key)
Expand All @@ -197,7 +196,6 @@ def _remote_fun(secs):

async def _waiter_fun(subtask_id):
await task_manager_ref.wait_subtask_result(subtask_id)
await scheduling_api.finish_subtasks([subtask_id])
finish_ids.append(subtask_id)
finish_time.append(time.time())

Expand Down Expand Up @@ -245,7 +243,6 @@ def _remote_fun(secs):

async def _waiter_fun(subtask_id):
await task_manager_ref.wait_subtask_result(subtask_id)
await scheduling_api.finish_subtasks([subtask_id])

subtasks = []
wait_tasks = []
Expand Down
2 changes: 1 addition & 1 deletion mars/services/scheduling/worker/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from .exec import SubtaskExecutionActor
from .execution import SubtaskExecutionActor
from .queues import SubtaskExecutionQueueActor, SubtaskPrepareQueueActor
from .quota import QuotaActor, MemQuotaActor, WorkerQuotaManagerActor
from .service import SchedulingWorkerService
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,5 @@
# limitations under the License.

from .actor import SubtaskExecutionActor
from .core import SubtaskExecutionInfo
from .prepare import SubtaskPreparer
Loading

0 comments on commit e25ff2e

Please sign in to comment.