Skip to content

Commit

Permalink
feat: add ensure_consumer kwarg to before/after_declare_queue to cont…
Browse files Browse the repository at this point in the history
…rol whether a consumer should be created on a RabbitMQ, then disable consumer thread creation upon message delivery
  • Loading branch information
jenstroeger committed Dec 23, 2024
1 parent b372f43 commit e97d876
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 15 deletions.
10 changes: 5 additions & 5 deletions dramatiq/brokers/rabbitmq.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ def consume(self, queue_name, prefetch=1, timeout=5000):
self.declare_queue(queue_name, ensure=True)
return self.consumer_class(self.parameters, queue_name, prefetch, timeout)

def declare_queue(self, queue_name, *, ensure=False):
def declare_queue(self, queue_name, *, ensure=False, ensure_consumer=True):
"""Declare a queue. Has no effect if a queue with the given
name already exists.
Expand All @@ -233,14 +233,14 @@ def declare_queue(self, queue_name, *, ensure=False):
or connection fails.
"""
if q_name(queue_name) not in self.queues:
self.emit_before("declare_queue", queue_name)
self.emit_before("declare_queue", queue_name, ensure_consumer=ensure_consumer)
self.queues.add(queue_name)
self.queues_pending.add(queue_name)
self.emit_after("declare_queue", queue_name)
self.emit_after("declare_queue", queue_name, ensure_consumer=ensure_consumer)

delayed_name = dq_name(queue_name)
self.delay_queues.add(delayed_name)
self.emit_after("declare_delay_queue", delayed_name)
self.emit_after("declare_delay_queue", delayed_name, ensure_consumer=ensure_consumer)

if ensure:
self._ensure_queue(queue_name)
Expand Down Expand Up @@ -324,7 +324,7 @@ def enqueue(self, message, *, delay=None):
attempts = 0
while True:
try:
self.declare_queue(canonical_queue_name, ensure=True)
self.declare_queue(canonical_queue_name, ensure=True, ensure_consumer=False)
self.logger.debug("Enqueueing message %r on queue %r.", message.message_id, queue_name)
self.emit_before("enqueue", message, delay)
self.channel.basic_publish(
Expand Down
6 changes: 3 additions & 3 deletions dramatiq/middleware/middleware.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,11 @@ def after_declare_actor(self, broker, actor):
"""Called after an actor has been declared.
"""

def before_declare_queue(self, broker, queue_name):
def before_declare_queue(self, broker, queue_name, ensure_consumer=True):
"""Called before a queue is declared.
"""

def after_declare_queue(self, broker, queue_name):
def after_declare_queue(self, broker, queue_name, ensure_consumer=True):
"""Called after a queue has been declared.
This signals that the queue has been registered with the
Expand All @@ -84,7 +84,7 @@ def after_declare_queue(self, broker, queue_name):
them until messages are enqueued or consumed.
"""

def after_declare_delay_queue(self, broker, queue_name):
def after_declare_delay_queue(self, broker, queue_name, ensure_consumer=True):
"""Called after a delay queue has been declared.
"""

Expand Down
16 changes: 9 additions & 7 deletions dramatiq/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,13 +219,15 @@ def __init__(self, worker):
self.logger = get_logger(__name__, type(self))
self.worker = worker

def after_declare_queue(self, broker, queue_name):
self.logger.debug("Adding consumer for queue %r.", queue_name)
self.worker._add_consumer(queue_name)

def after_declare_delay_queue(self, broker, queue_name):
self.logger.debug("Adding consumer for delay queue %r.", queue_name)
self.worker._add_consumer(queue_name, delay=True)
def after_declare_queue(self, broker, queue_name, ensure_consumer=True):
if ensure_consumer:
self.logger.debug("Adding consumer for queue %r.", queue_name)
self.worker._add_consumer(queue_name)

def after_declare_delay_queue(self, broker, queue_name, ensure_consumer=True):
if ensure_consumer:
self.logger.debug("Adding consumer for delay queue %r.", queue_name)
self.worker._add_consumer(queue_name, delay=True)


class _ConsumerThread(Thread):
Expand Down

0 comments on commit e97d876

Please sign in to comment.