Skip to content

Commit

Permalink
Respect queues priority
Browse files Browse the repository at this point in the history
  • Loading branch information
gabriel-piles committed Sep 17, 2024
1 parent bad0241 commit 69e3fa1
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 24 deletions.
47 changes: 28 additions & 19 deletions src/queue_processor/QueueProcessor.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ def __init__(self, redis_host: str, redis_port: int, queues_names_by_priority: l
self.task_queues_names: list[str] = [queue_name + "_tasks" for queue_name in queues_names_by_priority]
self.results_queues_names: list[str] = [queue_name + "_results" for queue_name in queues_names_by_priority]

self.exists_queues = False
if logger:
self.queue_processor_logger = logger
return
Expand All @@ -25,21 +26,24 @@ def __init__(self, redis_host: str, redis_port: int, queues_names_by_priority: l
def get_queue(self, queue_name):
return RedisSMQ(host=self.redis_host, port=self.redis_port, qname=queue_name)

def create_queue(self, queue_name: str):
try:
self.get_queue(queue_name).getQueueAttributes().exec_command()
except cmd.exceptions.QueueDoesNotExist:
self.queue_processor_logger.info(f"Creating queue")
self.get_queue(queue_name).createQueue().vt(120).exceptions(False).execute()
def create_queues(self):
if self.exists_queues:
return

for queue_name in self.task_queues_names + self.results_queues_names:
try:
self.get_queue(queue_name).getQueueAttributes().exec_command()
except cmd.exceptions.QueueDoesNotExist:
self.queue_processor_logger.info(f"Creating queue {queue_name}")
self.get_queue(queue_name).createQueue().vt(120).exceptions(False).execute()

def start(self, process: callable):
self.queue_processor_logger.info("QueueProcessor running")
while True:
try:
for queue_name in self.task_queues_names + self.results_queues_names:
self.create_queue(queue_name)

for task_queue_name, results_queue_name in zip(self.task_queues_names, self.results_queues_names):
for task_queue_name, results_queue_name in zip(self.task_queues_names, self.results_queues_names):
try:
self.create_queues()
self.queue_processor_logger.info(f"Processing queue: {task_queue_name}")
task_queue = self.get_queue(task_queue_name)
message = task_queue.receiveMessage().execute()
task_queue.deleteMessage(qname=task_queue_name, id=message["id"]).execute()
Expand All @@ -48,12 +52,17 @@ def start(self, process: callable):
message["message"] = utils.decode_message(message["message"])

results = process(message["message"])
self.queue_processor_logger.info(f"Sending results to queue: {results_queue_name}")
self.get_queue(results_queue_name).sendMessage().message(results).execute()
except NoMessageInQueue:
sleep(2)
except redis.exceptions.ConnectionError:
self.queue_processor_logger.error(f"Error connecting to Redis: {self.redis_host}:{self.redis_port}")
sleep(30)
except Exception as e:
self.queue_processor_logger.error(f"Error: {e}", exc_info=True)
sleep(60)
break
except NoMessageInQueue:
self.queue_processor_logger.info("No messages in queue")
sleep(2)
except redis.exceptions.ConnectionError:
self.exists_queues = False
self.queue_processor_logger.error(f"Error connecting to Redis: {self.redis_host}:{self.redis_port}")
sleep(30)
except Exception as e:
self.exists_queues = False
self.queue_processor_logger.error(f"Error: {e}", exc_info=True)
sleep(60)
17 changes: 12 additions & 5 deletions src/tests/test_queue_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,22 @@ class TestQueueProcessor(TestCase):

def test_two_queues(self):
queue_tasks_1 = RedisSMQ(host="localhost", port=6380, qname="test_queue_1_tasks")
queue_tasks_2 = RedisSMQ(host="localhost", port=6380, qname="test_queue_2_tasks")
queue_results_1 = RedisSMQ(host="localhost", port=6380, qname="test_queue_1_results")
queue_results_2 = RedisSMQ(host="localhost", port=6380, qname="test_queue_2_results")

queue_tasks_1.deleteQueue().exceptions(False).execute()
queue_tasks_2.deleteQueue().exceptions(False).execute()
queue_results_1.deleteQueue().exceptions(False).execute()
queue_results_2.deleteQueue().exceptions(False).execute()

sleep(3)

queue_tasks_1.sendMessage().message({"test": "test_1"}).execute()
queue_tasks_1.sendMessage().message({"test": "test_2"}).execute()

queue_tasks_2 = RedisSMQ(host="localhost", port=6380, qname="test_queue_2_tasks")
queue_tasks_2.sendMessage().message({"test": "test_3"}).execute()

sleep(2)
queue_results_1 = RedisSMQ(host="localhost", port=6380, qname="test_queue_1_results")
queue_results_2 = RedisSMQ(host="localhost", port=6380, qname="test_queue_2_results")
sleep(3)

result_message_1 = utils.decode_message(queue_results_1.receiveMessage().execute()["message"])
result_message_2 = utils.decode_message(queue_results_1.receiveMessage().execute()["message"])
Expand Down

0 comments on commit 69e3fa1

Please sign in to comment.