From ddda72d7f18135bb61559c94a2138f57184c5628 Mon Sep 17 00:00:00 2001 From: Lev Brouk Date: Fri, 31 May 2024 08:53:16 -0700 Subject: [PATCH 1/6] [CHANGED] MQTT s.clear() do not wait for JS responses when disconnecting the session --- server/mqtt.go | 33 +++++++++++++++++++++++---------- 1 file changed, 23 insertions(+), 10 deletions(-) diff --git a/server/mqtt.go b/server/mqtt.go index b69de16f80e..d490c3a37a6 100644 --- a/server/mqtt.go +++ b/server/mqtt.go @@ -1000,7 +1000,7 @@ func (s *Server) mqttHandleClosedClient(c *client) { // This needs to be done outside of any lock. if doClean { - if err := sess.clear(); err != nil { + if err := sess.clear(true); err != nil { c.Errorf(err.Error()) } } @@ -1475,7 +1475,7 @@ func (s *Server) mqttCreateAccountSessionManager(acc *Account, quitCh chan struc // Opportunistically delete the old (legacy) consumer, from v2.10.10 and // before. Ignore any errors that might arise. rmLegacyDurName := mqttRetainedMsgsStreamName + "_" + jsa.id - jsa.deleteConsumer(mqttRetainedMsgsStreamName, rmLegacyDurName) + jsa.deleteConsumer(mqttRetainedMsgsStreamName, rmLegacyDurName, true) // Create a new, uniquely names consumer for retained messages for this // server. The prior one will expire eventually. @@ -1622,6 +1622,10 @@ func (jsa *mqttJSA) newRequestExMulti(kind, subject, cidHash string, hdrs []int, r2i[reply] = i } + if timeout == 0 { + return nil, nil + } + // Wait for all responses to come back, or for the timeout to expire. We // don't want to use time.After() which causes memory growth because the // timer can't be stopped and will need to expire to then be garbage @@ -1701,9 +1705,17 @@ func (jsa *mqttJSA) createDurableConsumer(cfg *CreateConsumerRequest) (*JSApiCon return ccr, ccr.ToError() } -func (jsa *mqttJSA) deleteConsumer(streamName, consName string) (*JSApiConsumerDeleteResponse, error) { +// if noWait is specified, does not wait for the JS response, returns nil +func (jsa *mqttJSA) deleteConsumer(streamName, consName string, noWait bool) (*JSApiConsumerDeleteResponse, error) { subj := fmt.Sprintf(JSApiConsumerDeleteT, streamName, consName) - cdri, err := jsa.newRequest(mqttJSAConsumerDel, subj, 0, nil) + + timeout := mqttJSAPITimeout + if noWait { + timeout = 0 + } + + // timeout == 0 signals that we don't want to wait for the response. + cdri, err := jsa.newRequestEx(mqttJSAConsumerDel, subj, _EMPTY_, -1, nil, timeout) if err != nil { return nil, err } @@ -3178,7 +3190,7 @@ func (sess *mqttSession) save() error { // // Runs from the client's readLoop. // Lock not held on entry, but session is in the locked map. -func (sess *mqttSession) clear() error { +func (sess *mqttSession) clear(noWait bool) error { var durs []string var pubRelDur string @@ -3206,19 +3218,19 @@ func (sess *mqttSession) clear() error { sess.mu.Unlock() for _, dur := range durs { - if _, err := sess.jsa.deleteConsumer(mqttStreamName, dur); isErrorOtherThan(err, JSConsumerNotFoundErr) { + if _, err := sess.jsa.deleteConsumer(mqttStreamName, dur, noWait); isErrorOtherThan(err, JSConsumerNotFoundErr) { return fmt.Errorf("unable to delete consumer %q for session %q: %v", dur, sess.id, err) } } if pubRelDur != _EMPTY_ { - _, err := sess.jsa.deleteConsumer(mqttOutStreamName, pubRelDur) + _, err := sess.jsa.deleteConsumer(mqttOutStreamName, pubRelDur, noWait) if isErrorOtherThan(err, JSConsumerNotFoundErr) { return fmt.Errorf("unable to delete consumer %q for session %q: %v", pubRelDur, sess.id, err) } } if seq > 0 { - err := sess.jsa.deleteMsg(mqttSessStreamName, seq, true) + err := sess.jsa.deleteMsg(mqttSessStreamName, seq, !noWait) // Ignore the various errors indicating that the message (or sequence) // is already deleted, can happen in a cluster. if isErrorOtherThan(err, JSSequenceNotFoundErrF) { @@ -3484,7 +3496,8 @@ func (sess *mqttSession) untrackPubRel(pi uint16) (jsAckSubject string) { func (sess *mqttSession) deleteConsumer(cc *ConsumerConfig) { sess.mu.Lock() sess.tmaxack -= cc.MaxAckPending - sess.jsa.sendq.push(&mqttJSPubMsg{subj: sess.jsa.prefixDomain(fmt.Sprintf(JSApiConsumerDeleteT, mqttStreamName, cc.Durable))}) + sess.jsa.deleteConsumer(mqttStreamName, cc.Durable, true) + // sess.jsa.sendq.push(&mqttJSPubMsg{subj: sess.jsa.prefixDomain(fmt.Sprintf(JSApiConsumerDeleteT, mqttStreamName, cc.Durable))}) sess.mu.Unlock() } @@ -3823,7 +3836,7 @@ CHECK: // This Session lasts as long as the Network Connection. State data // associated with this Session MUST NOT be reused in any subsequent // Session. - if err := es.clear(); err != nil { + if err := es.clear(false); err != nil { asm.removeSession(es, true) return err } From 00efd8467f979dbe2f168357ee1bda33d8bed979 Mon Sep 17 00:00:00 2001 From: Lev Brouk Date: Thu, 20 Jun 2024 16:07:51 -0700 Subject: [PATCH 2/6] Removed commented out code --- server/mqtt.go | 1 - 1 file changed, 1 deletion(-) diff --git a/server/mqtt.go b/server/mqtt.go index d490c3a37a6..8e37d3435f2 100644 --- a/server/mqtt.go +++ b/server/mqtt.go @@ -3497,7 +3497,6 @@ func (sess *mqttSession) deleteConsumer(cc *ConsumerConfig) { sess.mu.Lock() sess.tmaxack -= cc.MaxAckPending sess.jsa.deleteConsumer(mqttStreamName, cc.Durable, true) - // sess.jsa.sendq.push(&mqttJSPubMsg{subj: sess.jsa.prefixDomain(fmt.Sprintf(JSApiConsumerDeleteT, mqttStreamName, cc.Durable))}) sess.mu.Unlock() } From 25490e1eb535a501ce69380cce79c7dee9fbbdc5 Mon Sep 17 00:00:00 2001 From: Lev Brouk Date: Thu, 20 Jun 2024 16:38:43 -0700 Subject: [PATCH 3/6] fixes for moving nowait into NewRequest --- server/mqtt.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/server/mqtt.go b/server/mqtt.go index 8e37d3435f2..a045630b65d 100644 --- a/server/mqtt.go +++ b/server/mqtt.go @@ -1563,6 +1563,10 @@ func (jsa *mqttJSA) newRequestEx(kind, subject, cidHash string, hdr int, msg []b if err != nil { return nil, err } + if timeout == 0{ + return nil, err + } + if len(responses) != 1 { return nil, fmt.Errorf("unreachable: invalid number of responses (%d)", len(responses)) } @@ -1719,6 +1723,9 @@ func (jsa *mqttJSA) deleteConsumer(streamName, consName string, noWait bool) (*J if err != nil { return nil, err } + if noWait { + return nil, nil + } cdr := cdri.(*JSApiConsumerDeleteResponse) return cdr, cdr.ToError() } From 8c9faa7fb5013d5bbe91f758253dc4a6811ac9c8 Mon Sep 17 00:00:00 2001 From: Lev Brouk Date: Fri, 21 Jun 2024 07:57:52 -0700 Subject: [PATCH 4/6] Reverted to the original approach, removed TestMQTTSessionNotDeletedOnDeleteConsumerError --- server/mqtt.go | 16 +++--------- server/mqtt_test.go | 59 --------------------------------------------- 2 files changed, 3 insertions(+), 72 deletions(-) diff --git a/server/mqtt.go b/server/mqtt.go index a045630b65d..c53d64ed7c9 100644 --- a/server/mqtt.go +++ b/server/mqtt.go @@ -1563,10 +1563,6 @@ func (jsa *mqttJSA) newRequestEx(kind, subject, cidHash string, hdr int, msg []b if err != nil { return nil, err } - if timeout == 0{ - return nil, err - } - if len(responses) != 1 { return nil, fmt.Errorf("unreachable: invalid number of responses (%d)", len(responses)) } @@ -1626,10 +1622,6 @@ func (jsa *mqttJSA) newRequestExMulti(kind, subject, cidHash string, hdrs []int, r2i[reply] = i } - if timeout == 0 { - return nil, nil - } - // Wait for all responses to come back, or for the timeout to expire. We // don't want to use time.After() which causes memory growth because the // timer can't be stopped and will need to expire to then be garbage @@ -1712,14 +1704,12 @@ func (jsa *mqttJSA) createDurableConsumer(cfg *CreateConsumerRequest) (*JSApiCon // if noWait is specified, does not wait for the JS response, returns nil func (jsa *mqttJSA) deleteConsumer(streamName, consName string, noWait bool) (*JSApiConsumerDeleteResponse, error) { subj := fmt.Sprintf(JSApiConsumerDeleteT, streamName, consName) - - timeout := mqttJSAPITimeout if noWait { - timeout = 0 + jsa.sendMsg(subj, nil) + return nil, nil } - // timeout == 0 signals that we don't want to wait for the response. - cdri, err := jsa.newRequestEx(mqttJSAConsumerDel, subj, _EMPTY_, -1, nil, timeout) + cdri, err := jsa.newRequestEx(mqttJSAConsumerDel, subj, _EMPTY_, -1, nil, mqttJSAPITimeout) if err != nil { return nil, err } diff --git a/server/mqtt_test.go b/server/mqtt_test.go index a92607ccc02..7a72136570f 100644 --- a/server/mqtt_test.go +++ b/server/mqtt_test.go @@ -6789,65 +6789,6 @@ func TestMQTTConsumerMemStorageReload(t *testing.T) { } } -type unableToDeleteConsLogger struct { - DummyLogger - errCh chan string -} - -func (l *unableToDeleteConsLogger) Errorf(format string, args ...any) { - msg := fmt.Sprintf(format, args...) - if strings.Contains(msg, "unable to delete consumer") { - l.errCh <- msg - } -} - -func TestMQTTSessionNotDeletedOnDeleteConsumerError(t *testing.T) { - org := mqttJSAPITimeout - mqttJSAPITimeout = 1000 * time.Millisecond - defer func() { mqttJSAPITimeout = org }() - - cl := createJetStreamClusterWithTemplate(t, testMQTTGetClusterTemplaceNoLeaf(), "MQTT", 2) - defer cl.shutdown() - - o := cl.opts[0] - s1 := cl.servers[0] - // Plug error logger to s1 - l := &unableToDeleteConsLogger{errCh: make(chan string, 10)} - s1.SetLogger(l, false, false) - - nc, js := jsClientConnect(t, s1) - defer nc.Close() - - mc, r := testMQTTConnectRetry(t, &mqttConnInfo{cleanSess: true}, o.MQTT.Host, o.MQTT.Port, 5) - defer mc.Close() - testMQTTCheckConnAck(t, r, mqttConnAckRCConnectionAccepted, false) - - testMQTTSub(t, 1, mc, r, []*mqttFilter{{filter: "foo", qos: 1}}, []byte{1}) - testMQTTFlush(t, mc, nil, r) - - // Now shutdown server 2, we should lose quorum - cl.servers[1].Shutdown() - - // Close the MQTT client: - testMQTTDisconnect(t, mc, nil) - - // We should have reported that there was an error deleting the consumer - select { - case <-l.errCh: - // OK - case <-time.After(time.Second): - t.Fatal("Server did not report any error") - } - - // Now restart the server 2 so that we can check that the session is still persisted. - cl.restartAllSamePorts() - cl.waitOnStreamLeader(globalAccountName, mqttSessStreamName) - - si, err := js.StreamInfo(mqttSessStreamName) - require_NoError(t, err) - require_True(t, si.State.Msgs == 1) -} - // Test for auto-cleanup of consumers. func TestMQTTConsumerInactiveThreshold(t *testing.T) { tdir := t.TempDir() From 539b79fec4a2d1e44009f4cc5e5372c1eeb0ba0c Mon Sep 17 00:00:00 2001 From: Lev Brouk Date: Fri, 21 Jun 2024 08:07:39 -0700 Subject: [PATCH 5/6] removed dead code --- server/mqtt.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/server/mqtt.go b/server/mqtt.go index c53d64ed7c9..a4ec6dd7cef 100644 --- a/server/mqtt.go +++ b/server/mqtt.go @@ -1713,9 +1713,6 @@ func (jsa *mqttJSA) deleteConsumer(streamName, consName string, noWait bool) (*J if err != nil { return nil, err } - if noWait { - return nil, nil - } cdr := cdri.(*JSApiConsumerDeleteResponse) return cdr, cdr.ToError() } From 59db0928762c21afb449d82bfc02d1630dab4d3b Mon Sep 17 00:00:00 2001 From: Lev Brouk Date: Fri, 21 Jun 2024 08:11:28 -0700 Subject: [PATCH 6/6] restored the original newRequest, not Ex --- server/mqtt.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/mqtt.go b/server/mqtt.go index a4ec6dd7cef..19350850066 100644 --- a/server/mqtt.go +++ b/server/mqtt.go @@ -1709,7 +1709,7 @@ func (jsa *mqttJSA) deleteConsumer(streamName, consName string, noWait bool) (*J return nil, nil } - cdri, err := jsa.newRequestEx(mqttJSAConsumerDel, subj, _EMPTY_, -1, nil, mqttJSAPITimeout) + cdri, err := jsa.newRequest(mqttJSAConsumerDel, subj, 0, nil) if err != nil { return nil, err }