Skip to content

Commit

Permalink
fix invalid event subscription pattern
Browse files Browse the repository at this point in the history
  • Loading branch information
kyujin-cho committed Nov 30, 2023
1 parent 0e810ba commit eaaefdf
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 6 deletions.
17 changes: 12 additions & 5 deletions src/ai/backend/manager/api/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
KernelLifecycleEventReason,
ModelServiceStatusEvent,
SessionCancelledEvent,
SessionPreparingEvent,
SessionStartedEvent,
SessionTerminatedEvent,
)
Expand Down Expand Up @@ -549,10 +550,16 @@ async def _handle_event(
forced=True,
)

session_event_matcher = lambda args: args[0] == result["sessionId"]
model_service_event_matcher = lambda args: args[1] == result["sessionId"]
session_event_matcher = lambda args: args[0] == str(result["sessionId"])
model_service_event_matcher = lambda args: args[1] == str(result["sessionId"])

handlers: list[EventHandler] = [
root_ctx.event_dispatcher.subscribe(
SessionPreparingEvent,
None,
_handle_event,
args_matcher=session_event_matcher,
),
root_ctx.event_dispatcher.subscribe(
SessionStartedEvent,
None,
Expand All @@ -565,13 +572,13 @@ async def _handle_event(
_handle_event,
args_matcher=session_event_matcher,
),
root_ctx.event_dispatcher.consume(
root_ctx.event_dispatcher.subscribe(
SessionTerminatedEvent,
None,
_handle_event,
args_matcher=session_event_matcher,
),
root_ctx.event_dispatcher.consume(
root_ctx.event_dispatcher.subscribe(
ModelServiceStatusEvent,
None,
_handle_event,
Expand All @@ -586,7 +593,7 @@ async def _handle_event(
root_ctx.event_dispatcher.unsubscribe(handler)

task_id = await background_task_manager.start(_task)
return web.json_response({"task_id": task_id})
return web.json_response({"task_id": str(task_id)})


@auth_required
Expand Down
4 changes: 3 additions & 1 deletion src/ai/backend/manager/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -3407,7 +3407,9 @@ async def handle_model_service_status_update(
kernel_loading_strategy=KernelLoadingStrategy.MAIN_KERNEL_ONLY,
)
route = await RoutingRow.get_by_session(db_sess, session.id, load_endpoint=True)
except SessionNotFound | NoResultFound:
except SessionNotFound:
return
except NoResultFound:
return

async def _update():
Expand Down

0 comments on commit eaaefdf

Please sign in to comment.