diff --git a/server/mqtt.go b/server/mqtt.go index a045630b65d..c53d64ed7c9 100644 --- a/server/mqtt.go +++ b/server/mqtt.go @@ -1563,10 +1563,6 @@ func (jsa *mqttJSA) newRequestEx(kind, subject, cidHash string, hdr int, msg []b if err != nil { return nil, err } - if timeout == 0{ - return nil, err - } - if len(responses) != 1 { return nil, fmt.Errorf("unreachable: invalid number of responses (%d)", len(responses)) } @@ -1626,10 +1622,6 @@ func (jsa *mqttJSA) newRequestExMulti(kind, subject, cidHash string, hdrs []int, r2i[reply] = i } - if timeout == 0 { - return nil, nil - } - // Wait for all responses to come back, or for the timeout to expire. We // don't want to use time.After() which causes memory growth because the // timer can't be stopped and will need to expire to then be garbage @@ -1712,14 +1704,12 @@ func (jsa *mqttJSA) createDurableConsumer(cfg *CreateConsumerRequest) (*JSApiCon // if noWait is specified, does not wait for the JS response, returns nil func (jsa *mqttJSA) deleteConsumer(streamName, consName string, noWait bool) (*JSApiConsumerDeleteResponse, error) { subj := fmt.Sprintf(JSApiConsumerDeleteT, streamName, consName) - - timeout := mqttJSAPITimeout if noWait { - timeout = 0 + jsa.sendMsg(subj, nil) + return nil, nil } - // timeout == 0 signals that we don't want to wait for the response. - cdri, err := jsa.newRequestEx(mqttJSAConsumerDel, subj, _EMPTY_, -1, nil, timeout) + cdri, err := jsa.newRequestEx(mqttJSAConsumerDel, subj, _EMPTY_, -1, nil, mqttJSAPITimeout) if err != nil { return nil, err } diff --git a/server/mqtt_test.go b/server/mqtt_test.go index a92607ccc02..7a72136570f 100644 --- a/server/mqtt_test.go +++ b/server/mqtt_test.go @@ -6789,65 +6789,6 @@ func TestMQTTConsumerMemStorageReload(t *testing.T) { } } -type unableToDeleteConsLogger struct { - DummyLogger - errCh chan string -} - -func (l *unableToDeleteConsLogger) Errorf(format string, args ...any) { - msg := fmt.Sprintf(format, args...) - if strings.Contains(msg, "unable to delete consumer") { - l.errCh <- msg - } -} - -func TestMQTTSessionNotDeletedOnDeleteConsumerError(t *testing.T) { - org := mqttJSAPITimeout - mqttJSAPITimeout = 1000 * time.Millisecond - defer func() { mqttJSAPITimeout = org }() - - cl := createJetStreamClusterWithTemplate(t, testMQTTGetClusterTemplaceNoLeaf(), "MQTT", 2) - defer cl.shutdown() - - o := cl.opts[0] - s1 := cl.servers[0] - // Plug error logger to s1 - l := &unableToDeleteConsLogger{errCh: make(chan string, 10)} - s1.SetLogger(l, false, false) - - nc, js := jsClientConnect(t, s1) - defer nc.Close() - - mc, r := testMQTTConnectRetry(t, &mqttConnInfo{cleanSess: true}, o.MQTT.Host, o.MQTT.Port, 5) - defer mc.Close() - testMQTTCheckConnAck(t, r, mqttConnAckRCConnectionAccepted, false) - - testMQTTSub(t, 1, mc, r, []*mqttFilter{{filter: "foo", qos: 1}}, []byte{1}) - testMQTTFlush(t, mc, nil, r) - - // Now shutdown server 2, we should lose quorum - cl.servers[1].Shutdown() - - // Close the MQTT client: - testMQTTDisconnect(t, mc, nil) - - // We should have reported that there was an error deleting the consumer - select { - case <-l.errCh: - // OK - case <-time.After(time.Second): - t.Fatal("Server did not report any error") - } - - // Now restart the server 2 so that we can check that the session is still persisted. - cl.restartAllSamePorts() - cl.waitOnStreamLeader(globalAccountName, mqttSessStreamName) - - si, err := js.StreamInfo(mqttSessStreamName) - require_NoError(t, err) - require_True(t, si.State.Msgs == 1) -} - // Test for auto-cleanup of consumers. func TestMQTTConsumerInactiveThreshold(t *testing.T) { tdir := t.TempDir()