Skip to content

Commit

Permalink
Accept condition to restart server
Browse files Browse the repository at this point in the history
  • Loading branch information
gabriel-piles committed Dec 12, 2024
1 parent cc30c4b commit 1f0294f
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 14 deletions.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "queue-processor"
version = "2024.12.10.1"
version = "2024.12.12.2"
description = "Manage queues on Uwazi services"
license = { file = "LICENSE" }
authors = [{ name = "HURIDOCS" }]
Expand Down
32 changes: 32 additions & 0 deletions src/delete_queues.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
from redis import exceptions
from rsmq import RedisSMQ


REDIS_HOST = "127.0.0.1"
REDIS_PORT = "6379"

QUEUES_NAMES = "test_queue_1 test_queue_2"


def delete_queues():
try:
for queue_name in QUEUES_NAMES.split():
for suffix in ["_tasks", "_results"]:
queue = RedisSMQ(
host=REDIS_HOST,
port=REDIS_PORT,
qname=queue_name + suffix,
quiet=False,
)

queue.deleteQueue().exceptions(False).execute()
queue.createQueue(maxsize=-1).vt(120).exceptions(False).execute()

print("Queues properly deleted")

except exceptions.ConnectionError:
print("No redis connection")


if __name__ == "__main__":
delete_queues()
34 changes: 21 additions & 13 deletions src/queue_processor/QueueProcessor.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import logging
from collections.abc import Callable
from time import sleep
from typing import Callable

import redis
from rsmq.cmd import NoMessageInQueue, utils
Expand Down Expand Up @@ -44,30 +44,34 @@ def create_queues(self):
self.get_queue(queue_name).getQueueAttributes().exec_command()
except cmd.exceptions.QueueDoesNotExist:
self.queue_processor_logger.info(f"Creating queue {queue_name}")
self.get_queue(queue_name).createQueue().vt(120).exceptions(False).execute()
self.get_queue(queue_name).createQueue(maxsize=-1).vt(120).exceptions(False).execute()

def start(self, process: callable, hide_message_seconds: int = 0):
def start(self, process: callable, restart_condition: Callable = None):
self.queue_processor_logger.info("QueueProcessor running")
while True:
restart = False
for task_queue_name, results_queue_name in zip(self.task_queues_names, self.results_queues_names):
try:
self.create_queues()
task_queue = self.get_queue(task_queue_name)
message = task_queue.receiveMessage(vt=hide_message_seconds if hide_message_seconds else None).execute()

if hide_message_seconds == 0:
task_queue.deleteMessage(qname=task_queue_name, id=message["id"]).execute()

results = process(utils.decode_message(message["message"]))
message = task_queue.receiveMessage().execute()
task_queue.deleteMessage(qname=task_queue_name, id=message["id"]).execute()
message = utils.decode_message(message["message"])
results = process(message)

if not results:
continue

if hide_message_seconds:
task_queue.deleteMessage(qname=task_queue_name, id=message["id"]).execute()
self.get_queue(results_queue_name).sendMessage(delay=self.delay_time_for_results).message(
results
).execute()

results_queue = self.get_queue(results_queue_name)
results_queue.sendMessage(delay=self.delay_time_for_results).message(results).execute()
try:
restart = restart_condition(message)
except:
pass

break

except NoMessageInQueue:
sleep(2)
Expand All @@ -79,3 +83,7 @@ def start(self, process: callable, hide_message_seconds: int = 0):
self.exists_queues = False
self.queue_processor_logger.error(f"Error: {e}", exc_info=True)
sleep(60)

if restart:
sleep(self.delay_time_for_results + 5)
break

0 comments on commit 1f0294f

Please sign in to comment.