Slow consumer error when fetching from work-queue stream #788
Replies: 4 comments 3 replies
-
Moved to discussion |
Beta Was this translation helpful? Give feedback.
-
So I'll need a lot more context. But on the client side, it means that a messages has come in that crosses the connection pending limits of max messages or max bytes, meaning there are unprocessed messages sitting in the queue. What are your connection options. How often are you doing the fetch? |
Beta Was this translation helpful? Give feedback.
-
So first, I'm not seeing anything obvious in the code. Is this happening repeatedly? let's keep an eye on this and see if we can figure out how to repeat it. Second, I would offer a note.
try
{
sub.PullExpiresIn(1, 5000); // the server gives 5 seconds to allow for a message to show up
// Add a little extra to the timeout to allow for traffic back and forth to the server, to make sure
// we don't timeout while the message is coming over the wire.
Msg msg = sub.NextMessage(5050);
_logger.LogInformation("message received ({subject})", msg.Subject);
// Handle the message...
_logger.LogInformation("message handled ({subject})", msg.Subject);
msg.Ack();
}
catch (NATSTimeoutException)
{
// normal behavior when NextMessage times out, probably don't even need to log
_logger.LogInformation("No messages available({subject})", msg.Subject);
}
catch (Exception err)
{
_logger.LogError(err, "message handling failed");
} |
Beta Was this translation helpful? Give feedback.
-
We are now using the new simplified API. We catch the public async Task ListenAsync(CancellationToken stoppingToken)
{
try
{
_logger.LogInformation("starting nats subscription");
var consumer = SubConnection.CreateConsumerContext(_options.SubStream, _options.SubConsumer);
while (true)
{
if (stoppingToken.IsCancellationRequested)
{
_logger.LogInformation("cancellation requested for nats subscription");
break;
}
try
{
var msg = consumer.Next(2000);
if (msg is null) continue;
_logger.LogInformation("message received ({subject})", msg.Subject);
// Handle the message...
_logger.LogInformation("message handled ({subject})", msg.Subject);
msg.Ack();
}
catch (NATSSlowConsumerException)
{
throw;
}
catch (Exception err)
{
_logger.LogError(err, "message handling failed");
}
}
_logger.LogInformation("stopping nats subscription");
}
catch (Exception err)
{
_logger.LogError(err, "nats subscription failed");
Environment.ExitCode = 1;
throw;
}
} |
Beta Was this translation helpful? Give feedback.
-
Our environment:
We have configured a JetStream pull subscriber for a work-queue stream as follows:
We have set the
AsyncErrorEventHandler
as follows:The setup ran without problems for almost two weeks and then suddenly stopped working. In our logs we see the slow consumer error logged by our
AsyncErrorEventHandler
:The connection is used just for fetching messages from the work-queue one by one:
The message rate is about 10 messages in a minute or less, so we shouldn't have the slow consumer problem. The are no slow consumers detected by the NATS Server, only the client SDK.
Any idea what migth cause the slow consumer error?
Beta Was this translation helpful? Give feedback.
All reactions