-
-
Notifications
You must be signed in to change notification settings - Fork 1.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[CHANGED] MQTT s.clear() do not wait for JS responses when disconnecting the session #5575
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The test TestMQTTSessionNotDeletedOnDeleteConsumerError
fails because it was expecting an error that is no longer reported due to this change.
@@ -1000,7 +1000,7 @@ func (s *Server) mqttHandleClosedClient(c *client) { | |||
|
|||
// This needs to be done outside of any lock. | |||
if doClean { | |||
if err := sess.clear(); err != nil { | |||
if err := sess.clear(true); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will be invoked for "clean" sessions, which means sessions which state should be discarded and not reused in any subsequent session. Could there be a situation where a create/close/create causes an issue since some assets may not have been completely removed from the JS server?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't do anything with the error other than logging it anyway, so the logic doesn't change. Also, if the session is re-created as clean, we'd clean out its state on CONNECT. If it is re-created as persistent... 0/5 we could wipe out and re-create the consumers if there was a session message; but if we failed to delete the session message on the "clean" DISCONNECT... well, it still would not change the existing logic, would it?
@@ -3823,7 +3829,7 @@ CHECK: | |||
// This Session lasts as long as the Network Connection. State data | |||
// associated with this Session MUST NOT be reused in any subsequent | |||
// Session. | |||
if err := es.clear(); err != nil { | |||
if err := es.clear(false); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At least here we make sure that the delete is complete, which I think is the right thing to do.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right. I understand why checking the success is important here, thus preserved as before.
173649e
to
ddda72d
Compare
@kozlovic Sorry, I misunderstood the failing test. I thought the error in the log was supposed to come from JetStream itself, where it is actually the very MQTT error I skipped that the test was expecting. I need a little time to re-think how to do this, there can still be errors sending the request to delete the consumer, so the test is still valid. |
@kozlovic here's my thinking. 0/5 for this PR we can keep the disconnect cleanup as opportunistic, and remove the failing Longer term:
What do you think? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
But is not an ephemeral consumer supposed to be told that the "client" side is still active through pings? Meaning that would that mean that the MQTT code would then need to be more involved in maintaining this consumer? And if so, it means that if there was a disconnect between where the MQTT sub lives and the JS consumer leader, we could end-up with the consumer being deleted while the MQTT code does not know that this is the case and end-up with a stop in message delivery? |
@derekcollison This can be merged now. |
…ing the session (#5575) MQTT s.clear(): do not wait for JS responses when disconnecting the session Related to #5471 Previously we were making `jsa.NewRequest` as it is needed when connecting a clean session. On disconnect, there is no reason to wait for the response (and tie up the MQTT read loop of the client). This should specifically help situations when a client app with many MQTT connections and QOS subscriptions disconnects suddenly, causing a flood of JSAPI deleteConsumer requests. Test: n/a, not sure how to instrument for it.
Built on top of #5575. Tries to prevent the stream `msgs` queue from becoming overwhelmed by core NATS publishes. In this case, where a reply subject is known, the sender will receive a 429 "Too Many Requests". Otherwise it's rate-logged. Two new configuration options are added to the JetStream block: `max_buffered_size` and `max_buffered_msgs`. If not configured, defaults are used. Signed-off-by: Neil Twigg <[email protected]> Signed-off-by: Neil Twigg <[email protected]>
MQTT s.clear(): do not wait for JS responses when disconnecting the session
Related to #5471
Previously we were making
jsa.NewRequest
as it is needed when connecting a clean session. On disconnect, there is no reason to wait for the response (and tie up the MQTT read loop of the client).This should specifically help situations when a client app with many MQTT connections and QOS subscriptions disconnects suddenly, causing a flood of JSAPI deleteConsumer requests.
Test: n/a, not sure how to instrument for it.