Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

When "NATSSlowConsumerException" happens the Connection keeps throwing this exception and never restores it state #814

Open
davesmits opened this issue Sep 5, 2023 · 9 comments
Labels
defect Suspected defect such as a bug or regression

Comments

@davesmits
Copy link

davesmits commented Sep 5, 2023

What version were you using?

Server: 2.9.1
Client:

What environment was the server running in?

Kubernetes 1.27.3 Linux AKS

Is this defect reproducible?

Run this; notice when the NATSSlowConsumerException happens once it keep happening.

We tried to debug the library it self and we notice that in Connection.cs in processSlowConsumer lastEx is set. But lastEx only gets cleared after a reconnect; which in this scenario, shouldn't be required in our opinion and also not tried by the library.

We did try to add an additional method to reset the lastEx but we didnt find a good place to call it; as in our scenario there is only one producer, which is the one ends up in this state. No new messages get send and causing a reset would never happen.

internal void processSlowConsumer(Subscription s)
{
    lastEx = new NATSSlowConsumerException();
    if (!s.sc)
    {
        callbackScheduler.Add(
            () => { opts.AsyncErrorEventHandlerOrDefault(this, new ErrEventArgs(this, s, "Slow Consumer")); }
        );
    }
    s.sc = true;
}

Reproduction:

using NATS.Client;

var hashset = new HashSet<Guid>(100_000 * 200);
long messageReceived = 0;

Console.WriteLine("Hello, World!");

var connection = new ConnectionFactory().CreateConnection(CreateConnectionOptions());
var subscription = connection.SubscribeAsync("davespam", OnMessageRecieved);

Console.WriteLine("Subscribed");

async void OnMessageRecieved(object? sender, MsgHandlerEventArgs e)
{
    Interlocked.Increment(ref messageReceived);
    Console.WriteLine("OnMessageRecieved: " + Interlocked.Read(ref messageReceived));

    var g = new Guid(e.Message.Data);
    lock (hashset)
        hashset.Remove(g);

};

for (int i = 0; i < 200; i++)
{
    for (var x = 0; x < 100_000; x++)
    {
        var g = Guid.NewGuid();

        lock (hashset)
            hashset.Add(g);

        var f = g.ToByteArray();

        try
        {
            connection.Publish("davespam", f);
        }
        catch (NATSSlowConsumerException)
        {
            Console.WriteLine("subscription: " + subscription.PendingMessages);
            Console.WriteLine("hashset: " + hashset.Count);
            Console.WriteLine("message received: " + messageReceived);

            await Task.Delay(1000);
            Console.WriteLine("continue");
        }
        catch (Exception ex)
        {
            Console.WriteLine("Failed: " + ex.ToString());
        }
    }
    await Task.Delay(1000);
}

Console.WriteLine("Ready");
Console.ReadLine();

NATS.Client.Options CreateConnectionOptions()
{
    var _options = ConnectionFactory.GetDefaultOptions();
    _options.AllowReconnect = true;
    _options.MaxReconnect = NATS.Client.Options.ReconnectForever;
    _options.ReconnectBufferSize = NATS.Client.Options.ReconnectBufferDisabled;
    _options.Url = ""; // todo enter a connection
    _options.Verbose = true;
    return _options;
}

Given the capability you are leveraging, describe your expectation?

That the exception gets resolved when there are no pending messages anymore

Given the expectation, what is the defect you are observing?

Exception happens and there is no way to resume normal operations except closing the connection and restart connection / subscription

@davesmits davesmits added the defect Suspected defect such as a bug or regression label Sep 5, 2023
@scottf
Copy link
Collaborator

scottf commented Sep 6, 2023

Slow consumer is a legitimate problem that cannot really be addressed without code changes.

I would suggest switching to the simplification API which uses pull under the covers. Pull better manages the flow of messages.

But, if you want to stick with a push based consumer, one thing you can try is flow control.

@davesmits
Copy link
Author

davesmits commented Sep 6, 2023

I understand that it is a legitiem error and I am fine that the exception is thrown; My problem is that the connection never gets back into a state that it will resume accepting new messages; even not when there are no pending messages anymore (see log line on here: Console.WriteLine("subscription: " + subscription.PendingMessages);

Basically you can wait hours not doing anything and after you then publish something again the connection will still throw that exception as there is nothing that will clear out the exception from the variable lastEx

And will oooking in to the simplification API to see if that address our actual problem for now

@davesmits
Copy link
Author

I hate to push for this; but any thoughts?

and we are trying to use the new API as you suggested; but for the legacy / old flow when we debugged the library this really looks like a problem in the library as it never hits the codes that should recover from this error

@scottf
Copy link
Collaborator

scottf commented Sep 26, 2023

Do you have some reproducible sample for me to work with?

@davesmits
Copy link
Author

davesmits commented Sep 28, 2023

I can share the solution if that helps; it contains the console app that is here above.

With some help of breakpoints you will find then that in the method processSlowConsumer, lastEx = new NATSSlowConsumerException(); is set and there is no flow that clears lastEx to null in this specific scenario, while in most other fault scenario's a reconnection is triggered and that will clear the lastEx.

And having lastEx point to an exception the Publish method will keep throwing and never recover.

We did investigate to add a flow where this could be fix and offer you a PR but to be honest we lacked the knowledge of the inner workings to come to a nice solution.

@scottf
Copy link
Collaborator

scottf commented Sep 28, 2023

@davesmits Maybe different last exceptions for outgoing versus incoming messages on a shared connection?

@heikkilamarko
Copy link

We've encountered the same issue in our system, and our investigation led us to the same conclusion: when the slow consumer exception is thrown, it never recovers. To address this, we've implemented a solution where we exit our app with exit code 1, allowing the orchestrator to initiate a new instance.

@davesmits
Copy link
Author

@scottf and if store it in different variable; what would be the right place to close it? Throw it once in publish and restart after its thrown one time?

@scottf
Copy link
Collaborator

scottf commented Oct 3, 2023

@scottf and if store it in different variable; what would be the right place to close it? Throw it once in publish and restart after its thrown one time?

You would have to track whether it's an incoming message or an outgoing message, since the connection can have both.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
defect Suspected defect such as a bug or regression
Projects
None yet
Development

No branches or pull requests

3 participants