Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Infinite wait possible in Channel.queue_declare #293

Open
rconradharris opened this issue Nov 8, 2019 · 1 comment
Open

Infinite wait possible in Channel.queue_declare #293

rconradharris opened this issue Nov 8, 2019 · 1 comment

Comments

@rconradharris
Copy link

rconradharris commented Nov 8, 2019

I'm using Celery for messaging and recently noticed that my task.delay requests were blocking forever. I traced out the code from celery, into kombu, and eventually in py-amqp.

I eventually landed at the code that was blocking and it turned out to be Connection.drain_events() , which is called from AbstractChannel.wait(). It turns out that while wait() accepts a timeout parameter, it isn't set by its caller Channel.declare_queue(). Since it's not set, it defaults to None, meaning that if data isn't received on the socket, it could potentially block forever.

Even if it did, that wouldn't be enough, however, since timeout would need to be passed down into wait() which would require adding the timeout parameter to all of its ancestor calls in the stack.

At the end of the day, the root case of this whole issue was my RabbitMQ server getting into a wonky-state. Restarting it fixed the core issue. However, the wonky-state did highlight the possibility of the an infinite wait, so figured I'd report the issue. Unfortunately, I don't know enough about this code to offer any suggestions on how to fix this, or whether it should be fixed.

Here are some of my notes on the issue:

This commit adds the timeout parameter to AbtractChannel.wait(): ef6f614

My the annotated call stack for this issue was
`

celery.app.task.Task.delay()
    celery.app.task.Task.apply_async()
        celery.app.base.Celery.send_task
            celery.app.amqp.AMQP.send_task_message()

                < ... kombu-land ...>

                kombu.messaging.Producer.publish()
                    kombu.messaging.Producer._publish()
                        kombu.common.maybe_declare()
                            kombu.common._maybe_declare()
                                kombu.entity.Queue.declare()
                                    kombu.entity.Queue._create_queue()
                                        kombu.entity.Queue.queue_declare()

                                < ... amqp-land ...>

                                            amqp.channel.Channel.queue_declare()
                                                amqp.abstract_channel.AbstractChannel.wait() <== timeout is not passed in, so defaults to None
                                                    amqp.connection.Connection.drain_events(timeout=None)
                                                        amqp.connection.Connection.blocking_read(timeout=None)
                                                            amqp.transport._AbstractTransport.having_timeout(timeout=None) <= no timeout
                                                                amqp.transport._AbstractTransport.read_frame()
                                                                    socket.recv() <== BLOCKS FOREVER

`

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants
@rconradharris and others