Skip to content

Commit

Permalink
Add restart condition
Browse files Browse the repository at this point in the history
  • Loading branch information
gabriel-piles committed Oct 25, 2024
1 parent 716ddf0 commit be8ef9c
Showing 1 changed file with 20 additions and 10 deletions.
30 changes: 20 additions & 10 deletions src/queue_processor/QueueProcessor.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
from collections.abc import Callable
from time import sleep

import redis
Expand Down Expand Up @@ -45,23 +46,31 @@ def create_queues(self):
self.queue_processor_logger.info(f"Creating queue {queue_name}")
self.get_queue(queue_name).createQueue().vt(120).exceptions(False).execute()

def start(self, process: callable, run_once: bool = False):
def start(self, process: callable, restart_condition: Callable = None):
self.queue_processor_logger.info("QueueProcessor running")
while True:
executed_once = False
restart = False
for task_queue_name, results_queue_name in zip(self.task_queues_names, self.results_queues_names):
try:
self.create_queues()
task_queue = self.get_queue(task_queue_name)
message = task_queue.receiveMessage().execute()
task_queue.deleteMessage(qname=task_queue_name, id=message["id"]).execute()
results = process(utils.decode_message(message["message"]))
executed_once = True
if results:
self.get_queue(results_queue_name).sendMessage(delay=self.delay_time_for_results).message(
results
).execute()
break
message = utils.decode_message(message["message"])
results = process(message)

if not results:
continue

try:
restart = True if restart else restart_condition(message)
except:
restart = False

self.get_queue(results_queue_name).sendMessage(delay=self.delay_time_for_results).message(
results
).execute()
break

except NoMessageInQueue:
sleep(2)
Expand All @@ -74,5 +83,6 @@ def start(self, process: callable, run_once: bool = False):
self.queue_processor_logger.error(f"Error: {e}", exc_info=True)
sleep(60)

if run_once and executed_once:
if restart:
sleep(self.delay_time_for_results + 5)
break

0 comments on commit be8ef9c

Please sign in to comment.