Skip to content

Commit

Permalink
Handle no results message
Browse files Browse the repository at this point in the history
  • Loading branch information
gabriel-piles committed Sep 17, 2024
1 parent 4471e58 commit bab1f44
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 14 deletions.
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
name=PROJECT_NAME,
packages=["queue_processor"],
package_dir={"": "src"},
version="0.2",
version="0.3",
url="https://github.com/huridocs/queue-processor",
author="HURIDOCS",
description="Manage queues on Uwazi services",
Expand Down
12 changes: 4 additions & 8 deletions src/queue_processor/QueueProcessor.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,20 +43,16 @@ def start(self, process: callable):
for task_queue_name, results_queue_name in zip(self.task_queues_names, self.results_queues_names):
try:
self.create_queues()
self.queue_processor_logger.info(f"Processing queue: {task_queue_name}")
task_queue = self.get_queue(task_queue_name)
message = task_queue.receiveMessage().execute()
task_queue.deleteMessage(qname=task_queue_name, id=message["id"]).execute()
results = process(utils.decode_message(message["message"]))

if "message" in message:
message["message"] = utils.decode_message(message["message"])
if results:
self.get_queue(results_queue_name).sendMessage().message(results).execute()
break

results = process(message["message"])
self.queue_processor_logger.info(f"Sending results to queue: {results_queue_name}")
self.get_queue(results_queue_name).sendMessage().message(results).execute()
break
except NoMessageInQueue:
self.queue_processor_logger.info("No messages in queue")
sleep(2)
except redis.exceptions.ConnectionError:
self.exists_queues = False
Expand Down
5 changes: 4 additions & 1 deletion src/service_mock.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
from src.queue_processor.QueueProcessor import QueueProcessor


def process(message: dict[str, any]) -> dict[str, any]:
def process(message: dict[str, any]) -> dict[str, any] | None:
if "required_field" not in message:
return None

message["processed"] = True
return message

Expand Down
10 changes: 6 additions & 4 deletions src/tests/test_queue_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@ def test_two_queues(self):

sleep(3)

queue_tasks_1.sendMessage().message({"test": "test_1"}).execute()
queue_tasks_1.sendMessage().message({"test": "test_2"}).execute()
queue_tasks_2.sendMessage().message({"test": "test_3"}).execute()
queue_tasks_1.sendMessage().message({"test": "test_0"}).execute()
queue_tasks_1.sendMessage().message({"required_field": True, "test": "test_1"}).execute()
queue_tasks_1.sendMessage().message({"required_field": True, "test": "test_2"}).execute()
queue_tasks_2.sendMessage().message({"test": "test_0"}).execute()
queue_tasks_2.sendMessage().message({"required_field": True, "test": "test_3"}).execute()

sleep(3)
sleep(5)

result_message_1 = utils.decode_message(queue_results_1.receiveMessage().execute()["message"])
result_message_2 = utils.decode_message(queue_results_1.receiveMessage().execute()["message"])
Expand Down

0 comments on commit bab1f44

Please sign in to comment.