From e97d876eab10edfd5a81844f06506869fd19650f Mon Sep 17 00:00:00 2001 From: Jens Troeger Date: Tue, 24 Dec 2024 07:36:22 +1000 Subject: [PATCH] feat: add ensure_consumer kwarg to before/after_declare_queue to control whether a consumer should be created on a RabbitMQ, then disable consumer thread creation upon message delivery --- dramatiq/brokers/rabbitmq.py | 10 +++++----- dramatiq/middleware/middleware.py | 6 +++--- dramatiq/worker.py | 16 +++++++++------- 3 files changed, 17 insertions(+), 15 deletions(-) diff --git a/dramatiq/brokers/rabbitmq.py b/dramatiq/brokers/rabbitmq.py index ef1728b3..6bf22fd8 100644 --- a/dramatiq/brokers/rabbitmq.py +++ b/dramatiq/brokers/rabbitmq.py @@ -219,7 +219,7 @@ def consume(self, queue_name, prefetch=1, timeout=5000): self.declare_queue(queue_name, ensure=True) return self.consumer_class(self.parameters, queue_name, prefetch, timeout) - def declare_queue(self, queue_name, *, ensure=False): + def declare_queue(self, queue_name, *, ensure=False, ensure_consumer=True): """Declare a queue. Has no effect if a queue with the given name already exists. @@ -233,14 +233,14 @@ def declare_queue(self, queue_name, *, ensure=False): or connection fails. """ if q_name(queue_name) not in self.queues: - self.emit_before("declare_queue", queue_name) + self.emit_before("declare_queue", queue_name, ensure_consumer=ensure_consumer) self.queues.add(queue_name) self.queues_pending.add(queue_name) - self.emit_after("declare_queue", queue_name) + self.emit_after("declare_queue", queue_name, ensure_consumer=ensure_consumer) delayed_name = dq_name(queue_name) self.delay_queues.add(delayed_name) - self.emit_after("declare_delay_queue", delayed_name) + self.emit_after("declare_delay_queue", delayed_name, ensure_consumer=ensure_consumer) if ensure: self._ensure_queue(queue_name) @@ -324,7 +324,7 @@ def enqueue(self, message, *, delay=None): attempts = 0 while True: try: - self.declare_queue(canonical_queue_name, ensure=True) + self.declare_queue(canonical_queue_name, ensure=True, ensure_consumer=False) self.logger.debug("Enqueueing message %r on queue %r.", message.message_id, queue_name) self.emit_before("enqueue", message, delay) self.channel.basic_publish( diff --git a/dramatiq/middleware/middleware.py b/dramatiq/middleware/middleware.py index 5ace255c..e6578ef3 100644 --- a/dramatiq/middleware/middleware.py +++ b/dramatiq/middleware/middleware.py @@ -70,11 +70,11 @@ def after_declare_actor(self, broker, actor): """Called after an actor has been declared. """ - def before_declare_queue(self, broker, queue_name): + def before_declare_queue(self, broker, queue_name, ensure_consumer=True): """Called before a queue is declared. """ - def after_declare_queue(self, broker, queue_name): + def after_declare_queue(self, broker, queue_name, ensure_consumer=True): """Called after a queue has been declared. This signals that the queue has been registered with the @@ -84,7 +84,7 @@ def after_declare_queue(self, broker, queue_name): them until messages are enqueued or consumed. """ - def after_declare_delay_queue(self, broker, queue_name): + def after_declare_delay_queue(self, broker, queue_name, ensure_consumer=True): """Called after a delay queue has been declared. """ diff --git a/dramatiq/worker.py b/dramatiq/worker.py index a3058420..4c1b3224 100644 --- a/dramatiq/worker.py +++ b/dramatiq/worker.py @@ -219,13 +219,15 @@ def __init__(self, worker): self.logger = get_logger(__name__, type(self)) self.worker = worker - def after_declare_queue(self, broker, queue_name): - self.logger.debug("Adding consumer for queue %r.", queue_name) - self.worker._add_consumer(queue_name) - - def after_declare_delay_queue(self, broker, queue_name): - self.logger.debug("Adding consumer for delay queue %r.", queue_name) - self.worker._add_consumer(queue_name, delay=True) + def after_declare_queue(self, broker, queue_name, ensure_consumer=True): + if ensure_consumer: + self.logger.debug("Adding consumer for queue %r.", queue_name) + self.worker._add_consumer(queue_name) + + def after_declare_delay_queue(self, broker, queue_name, ensure_consumer=True): + if ensure_consumer: + self.logger.debug("Adding consumer for delay queue %r.", queue_name) + self.worker._add_consumer(queue_name, delay=True) class _ConsumerThread(Thread):