Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Processor: Fail stuck process events #175

Merged
merged 3 commits into from
Dec 3, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 37 additions & 4 deletions services/processor/processor/ftrack_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import collections

import appdirs
import arrow
import requests
import ftrack_api
import ftrack_api.session
Expand All @@ -19,12 +20,16 @@
from weakref import WeakMethod

from ayon_api import (
get_service_addon_name,
get_service_name,
enroll_event_job,
get_event,
get_events,
update_event,
)

# 10 minutes
EVENT_PROCESS_TIMEOUT = 60 * 10


class ProcessEventHub(ftrack_api.event.hub.EventHub):
_server_con = None
Expand All @@ -35,7 +40,7 @@ def get_next_ftrack_event(self):
return enroll_event_job(
source_topic="ftrack.leech",
target_topic="ftrack.proc",
sender=get_service_addon_name(),
sender=get_service_name(),
description="Event processing",
sequential=True,
)
Expand All @@ -50,7 +55,7 @@ def finish_job(self, job):

update_event(
event_id,
sender=get_service_addon_name(),
sender=get_service_name(),
status="finished",
description=description,
)
Expand All @@ -72,6 +77,7 @@ def wait(self, duration=None):
"""

started = time.time()
last_loaded_job = 0
while True:
job = None
empty_queue = False
Expand All @@ -91,7 +97,12 @@ def wait(self, duration=None):
if not self.connected:
break

if not self.load_event_from_jobs():
if self.load_event_from_jobs():
last_loaded_job = time.time()
elif time.time() - last_loaded_job > 5 * 60:
if not self._check_stuck_events():
time.sleep(5)
else:
time.sleep(0.1)
continue

Expand All @@ -116,6 +127,28 @@ def _handle_packet(self, code, packet_identifier, path, data):

return super()._handle_packet(code, packet_identifier, path, data)

def _check_stuck_events(self) -> bool:
"""Check if there are stuck events and mark them as failed"""
now = arrow.utcnow()
changed_status = False
for event in get_events(
topics={"ftrack.proc"},
statuses={"pending"},
):
created_at = arrow.get(event["createdAt"]).to("local")
delta = now - created_at
if delta.seconds > EVENT_PROCESS_TIMEOUT:
event_id = event["id"]
print(f"Failing stuck event '{event_id}'")
changed_status = True
update_event(
event_id,
sender=get_service_name(),
status="failed",
description="Stuck event",
)
return changed_status


class CustomEventHubSession(ftrack_api.session.Session):
"""An isolated session for interaction with an ftrack server."""
Expand Down
Loading