Replies: 6 comments 2 replies
-
The time sleep is blocking the single thread event loop from asyncio, so to workaround it you need to call the blocking task from another thread using something like the ThreadPoolExecutor: https://pymotw.com/3/asyncio/executors.html After 6 minutes, the nats server will disconnect the client because of missing ping/pong replies, and the client would try to reconnect. |
Beta Was this translation helpful? Give feedback.
-
Thanks for your reply @wallyqs! I'm curious about this:
Why does the nats server disconnects after 6 minutes? Is there any way to increase/change this value? |
Beta Was this translation helpful? Give feedback.
-
That is the ping_interval setting which is by default 2 minutes, with max pings outstanding set to 3 so the third missing pong will disconnect it. |
Beta Was this translation helpful? Give feedback.
-
Thanks again @wallyqs ! We are going to try to implement the threads approach instead of increasing the |
Beta Was this translation helpful? Give feedback.
-
Hello! To give some more context, and what we end up doing to solve this particual problem: What?When a message is processed in a subscription callback function, if the processing takes a long time, trying to post a response the publish gives a Why?Subscription callback functions are run within an asyncio loop. But if the process synchronously blocks the thread for too long, all other processes living on that thread cannot continue. In this case, the subscription to NATs is blocked for a long time, losing the connection. Users must be aware of this situation and act accordingly. They have two options:
In this particular usecase, the callback function that was blocking the thread had this form: async def message_cb(msg):
for i in range(x): # x was a big number
### some code
### some code that blocks for 3~ min
### some more code
await nc.publish("reply", b"Reply")
await nc.flush(timeout=5) With this scheme, when the process was executed, the thread was completely blocked until the loop finished. The solution to unblock the thread and not interrupt the processes has been the following: async def message_cb(msg):
for i in range(x): # x was a big number
### some code
### some code that blocks for 3~ min
await asyncio.sleep(0) # this instruction frees the thread for the rest of the processes
### some more code
await nc.publish("reply", b"Reply")
await nc.flush(timeout=5) |
Beta Was this translation helpful? Give feedback.
-
@wallyqs @alejandrogr Is it possible to run the nat's connection logic in a separate parallel thread? This way my main code does not need to be heavily modified. |
Beta Was this translation helpful? Give feedback.
-
Hi guys!
I have been working with this library recently and I found an error when a task takes some much time to process.
Here is the example code that I have been using for my tests:
As you see, there is a
time.sleep
that blocks the code for 40 minutes in order to test what happens if the process waits that much time.After running the code I get the following error:
Is there any way to avoid getting this Broken pipe error even if the process takes 5 minutes or 3 hours?
Thanks in advance!
Beta Was this translation helpful? Give feedback.
All reactions