diff --git a/server/mqtt.go b/server/mqtt.go index 76b949b561a..d6af8c4eccf 100644 --- a/server/mqtt.go +++ b/server/mqtt.go @@ -1000,8 +1000,7 @@ func (s *Server) mqttHandleClosedClient(c *client) { // This needs to be done outside of any lock. if doClean { - if err := sess.clear(); err != nil { - // if err := sess.clear(true); err != nil { + if err := sess.clear(true); err != nil { c.Errorf(err.Error()) } } @@ -1476,8 +1475,7 @@ func (s *Server) mqttCreateAccountSessionManager(acc *Account, quitCh chan struc // Opportunistically delete the old (legacy) consumer, from v2.10.10 and // before. Ignore any errors that might arise. rmLegacyDurName := mqttRetainedMsgsStreamName + "_" + jsa.id - // jsa.deleteConsumer(mqttRetainedMsgsStreamName, rmLegacyDurName, true) - jsa.deleteConsumer(mqttRetainedMsgsStreamName, rmLegacyDurName) + jsa.deleteConsumer(mqttRetainedMsgsStreamName, rmLegacyDurName, true) // Create a new, uniquely names consumer for retained messages for this // server. The prior one will expire eventually. @@ -1704,13 +1702,12 @@ func (jsa *mqttJSA) createDurableConsumer(cfg *CreateConsumerRequest) (*JSApiCon } // if noWait is specified, does not wait for the JS response, returns nil -// func (jsa *mqttJSA) deleteConsumer(streamName, consName string, noWait bool) (*JSApiConsumerDeleteResponse, error) { -func (jsa *mqttJSA) deleteConsumer(streamName, consName string) (*JSApiConsumerDeleteResponse, error) { +func (jsa *mqttJSA) deleteConsumer(streamName, consName string, noWait bool) (*JSApiConsumerDeleteResponse, error) { subj := fmt.Sprintf(JSApiConsumerDeleteT, streamName, consName) - // if noWait { - // jsa.sendMsg(subj, nil) - // return nil, nil - // } + if noWait { + jsa.sendMsg(subj, nil) + return nil, nil + } cdri, err := jsa.newRequest(mqttJSAConsumerDel, subj, 0, nil) if err != nil { @@ -3187,8 +3184,7 @@ func (sess *mqttSession) save() error { // // Runs from the client's readLoop. // Lock not held on entry, but session is in the locked map. -func (sess *mqttSession) clear() error { - // func (sess *mqttSession) clear(noWait bool) error { +func (sess *mqttSession) clear(noWait bool) error { var durs []string var pubRelDur string @@ -5193,20 +5189,17 @@ func (sess *mqttSession) ensurePubRelConsumerSubscription(c *client) error { Config: ConsumerConfig{ DeliverSubject: pubRelDeliverySubject, Durable: mqttPubRelConsumerDurablePrefix + idHash, - // Name: mqttPubRelConsumerDurablePrefix + idHash, - AckPolicy: AckExplicit, - DeliverPolicy: DeliverNew, - FilterSubject: pubRelSubject, - AckWait: ackWait, - MaxAckPending: maxAckPending, - MemoryStorage: opts.MQTT.ConsumerMemoryStorage, - // InactiveThreshold: 1 * time.Hour, + AckPolicy: AckExplicit, + DeliverPolicy: DeliverNew, + FilterSubject: pubRelSubject, + AckWait: ackWait, + MaxAckPending: maxAckPending, + MemoryStorage: opts.MQTT.ConsumerMemoryStorage, }, } if opts.MQTT.ConsumerInactiveThreshold > 0 { ccr.Config.InactiveThreshold = opts.MQTT.ConsumerInactiveThreshold } - // if _, err := sess.jsa.createEphemeralConsumer(ccr); err != nil { if _, err := sess.jsa.createDurableConsumer(ccr); err != nil { c.Errorf("Unable to add JetStream consumer for PUBREL for client %q: err=%v", id, err) return err @@ -5312,12 +5305,9 @@ func (sess *mqttSession) processJSConsumer(c *client, subject, sid string, MemoryStorage: opts.MQTT.ConsumerMemoryStorage, }, } - // Name: durName, - // InactiveThreshold: 1 * time.Hour, if opts.MQTT.ConsumerInactiveThreshold > 0 { ccr.Config.InactiveThreshold = opts.MQTT.ConsumerInactiveThreshold } - // if _, err := sess.jsa.createEphemeralConsumer(ccr); err != nil { if _, err := sess.jsa.createDurableConsumer(ccr); err != nil { c.Errorf("Unable to add JetStream consumer for subscription on %q: err=%v", subject, err) return nil, nil, err diff --git a/server/mqtt_ex_test.go b/server/mqtt_ex_test.go index 21fd65bac73..7ddcf0aa0a4 100644 --- a/server/mqtt_ex_test.go +++ b/server/mqtt_ex_test.go @@ -343,7 +343,7 @@ func mqttBenchWrapForMatrixField( func (m mqttBenchMatrix) runMatrix(b *testing.B, bc mqttBenchContext, f func(*testing.B, *mqttBenchContext)) { b.Helper() f = mqttBenchWrapForMatrixField(&bc.MessageSize, m.MessageSize, f, func(size int) string { - return sizeKB(uint64(size)) + return sizeKB(size) }) f = mqttBenchWrapForMatrixField(&bc.Topics, m.Topics, f, func(n int) string { return fmt.Sprintf("%dtopics", n) @@ -387,7 +387,7 @@ func (m mqttBenchMatrix) QOS1Only() mqttBenchMatrix { return m } -func sizeKB(size uint64) string { +func sizeKB(size int) string { unit := "B" N := size if size >= KB { diff --git a/server/mqtt_test.go b/server/mqtt_test.go index b54f31b0812..a92607ccc02 100644 --- a/server/mqtt_test.go +++ b/server/mqtt_test.go @@ -28,8 +28,6 @@ import ( "net" "os" "reflect" - "runtime" - "runtime/pprof" "strings" "sync" "testing" @@ -110,7 +108,7 @@ func testMQTTReadPacket(t testing.TB, r *mqttReader) (byte, int) { return copyBytes(buf[:n]) } - rd.SetReadDeadline(time.Now().Add(testMQTTTimeout * 10)) + rd.SetReadDeadline(time.Now().Add(testMQTTTimeout)) for { r.pstart = r.pos if !r.hasMore() { @@ -331,9 +329,8 @@ func testMQTTRunServer(t testing.TB, o *Options) *Server { if err != nil { t.Fatalf("Error creating server: %v", err) } - // l := &DummyLogger{} - // s.SetLogger(l, true, true) - s.ConfigureLogger() + l := &DummyLogger{} + s.SetLogger(l, true, true) s.Start() if err := s.readyForConnections(3 * time.Second); err != nil { testMQTTShutdownServer(s) @@ -3048,30 +3045,14 @@ func TestMQTTCluster(t *testing.T) { } } -func testMQTTConnectSubDisconnect(t *testing.T, o *Options, clientID string, clean bool, found bool, sub bool, qos byte) { +func testMQTTConnectDisconnect(t *testing.T, o *Options, clientID string, clean bool, found bool) { t.Helper() mc, r := testMQTTConnect(t, &mqttConnInfo{clientID: clientID, cleanSess: clean}, o.MQTT.Host, o.MQTT.Port) testMQTTCheckConnAck(t, r, mqttConnAckRCConnectionAccepted, found) - if sub { - testMQTTSub(t, 1, mc, r, []*mqttFilter{{filter: "foo", qos: qos}}, []byte{qos}) - } testMQTTDisconnectEx(t, mc, nil, false) mc.Close() } -func captureHeapProfile(filename string) { - f, _ := os.Create(filename) - defer f.Close() - runtime.GC() // Force garbage collection to get a clear picture - pprof.WriteHeapProfile(f) -} - -func printHeapUsage(label string) runtime.MemStats { - var memStats runtime.MemStats - runtime.ReadMemStats(&memStats) - fmt.Printf("%s Heap: inuse=%v, objects=%v\n", label, memStats.HeapInuse, memStats.HeapObjects) - return memStats -} func TestMQTTClusterConnectDisconnectClean(t *testing.T) { nServers := 3 cl := createJetStreamClusterWithTemplate(t, testMQTTGetClusterTemplaceNoLeaf(), "MQTT", nServers) @@ -3083,39 +3064,7 @@ func TestMQTTClusterConnectDisconnectClean(t *testing.T) { // specified. N := 100 for n := 0; n < N; n++ { - testMQTTConnectSubDisconnect(t, cl.opts[rand.Intn(nServers)], clientID, true, false, false, 0) - } -} - -func TestMQTTClusterConnectSubDisconnectClean(t *testing.T) { - nServers := 3 - cl := createJetStreamClusterWithTemplate(t, testMQTTGetClusterTemplaceNoLeaf(), "MQTT", nServers) - defer cl.shutdown() - - time.Sleep(1 * time.Second) - - // initialize MQTT assets in the cluster - testMQTTConnectSubDisconnect(t, cl.opts[0], "init", true, false, false, 0) - runtime.GC() // Force garbage collection to get a clear picture - - memStats := printHeapUsage("BEFORE") - baseHeapInuse := memStats.HeapInuse - baseHeapObjects := memStats.HeapObjects - - N := 1000000 - for i := 0; i < N; i++ { - clientID := nuid.Next() - testMQTTConnectSubDisconnect(t, cl.opts[0], clientID, true, false, true, 2) - runtime.GC() // Force garbage collection to get a clear picture - - memStats = printHeapUsage(fmt.Sprintf("AFTER %d", i)) - if memStats.HeapInuse > 100*baseHeapInuse || memStats.HeapObjects > 100*baseHeapObjects { - captureHeapProfile("AFTERLEAK.pprof") - t.Fatalf("after %d iterations heap alloc has grown from %v to %v (%v%%), objects from %v to %v (%v%%)", - i, - baseHeapInuse, memStats.HeapInuse, memStats.HeapInuse*100/baseHeapInuse, - baseHeapObjects, memStats.HeapObjects, memStats.HeapObjects*100/baseHeapObjects) - } + testMQTTConnectDisconnect(t, cl.opts[rand.Intn(nServers)], clientID, true, false) } } @@ -3132,13 +3081,13 @@ func TestMQTTClusterConnectDisconnectPersist(t *testing.T) { for n := 0; n < N; n++ { // First clean sessions on all servers for i := 0; i < nServers; i++ { - testMQTTConnectSubDisconnect(t, cl.opts[i], clientID, true, false, false, 0) + testMQTTConnectDisconnect(t, cl.opts[i], clientID, true, false) } - testMQTTConnectSubDisconnect(t, cl.opts[0], clientID, false, false, false, 0) - testMQTTConnectSubDisconnect(t, cl.opts[1], clientID, false, true, false, 0) - testMQTTConnectSubDisconnect(t, cl.opts[2], clientID, false, true, false, 0) - testMQTTConnectSubDisconnect(t, cl.opts[0], clientID, false, true, false, 0) + testMQTTConnectDisconnect(t, cl.opts[0], clientID, false, false) + testMQTTConnectDisconnect(t, cl.opts[1], clientID, false, true) + testMQTTConnectDisconnect(t, cl.opts[2], clientID, false, true) + testMQTTConnectDisconnect(t, cl.opts[0], clientID, false, true) } } @@ -7766,192 +7715,6 @@ func TestMQTTSparkbBirthHandling(t *testing.T) { }) } -func BenchmarkXXXX(b *testing.B) { - if b.N != 1 { - return - } - - useCluster := false - NSubscribers := 2 - NMessages := 3 - QOS := byte(2) - - b.ReportAllocs() - b.StopTimer() - - var cl *cluster - var s *Server - if useCluster { - // Create the cluster - start := time.Now() - cl = createJetStreamClusterWithTemplate(b, ` - listen: 127.0.0.1:-1 - server_name: %s - jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: '%s'} - - cluster { - name: %s - listen: 127.0.0.1:%d - routes = [%s] - } - - mqtt { - listen: 127.0.0.1:-1 - stream_replicas: 3 - } - - # For access to system account. - accounts { $SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] } } - `, "MQTT", 3) - b.Log("Started cluster in", time.Since(start), "will run", NSubscribers, "connections") - defer func() { - b.StopTimer() // to avoid measuring cl.shutdown(). - start := time.Now() - b.Log("Shutting down cluster") - cl.shutdown() - b.Log("Cluster is down in", time.Since(start)) - }() - - s = cl.randomNonLeader() - } else { - // Create the server - o := testMQTTDefaultOptions() - s = testMQTTRunServer(b, o) - b.Log("Started server, will run", NSubscribers, "connections") - defer func() { - b.StopTimer() // to avoid measuring s.shutdown(). - start := time.Now() - b.Log("Shutting down server") - testMQTTShutdownServer(s) - b.Log("Server is down in", time.Since(start)) - }() - } - - // Initialize the MQTT support (create JS assets, etc). - o := s.getOpts() - c, r := testMQTTConnect(b, &mqttConnInfo{clientID: "init", cleanSess: true}, o.MQTT.Host, o.MQTT.Port) - testMQTTCheckConnAck(b, r, mqttConnAckRCConnectionAccepted, false) - testMQTTSub(b, 1, c, r, []*mqttFilter{{filter: "irrelevant", qos: QOS}}, []byte{QOS}) - testMQTTFlush(b, c, nil, r) - testMQTTDisconnect(b, c, nil) - c.Close() - - b.Log("==== FINISHED INIT") - - var startCh []chan struct{} - receivedCh := make(chan int) - allSubscribed := sync.WaitGroup{} - allSubscribed.Add(NSubscribers) - for i := 0; i < NSubscribers; i++ { - startCh = append(startCh, make(chan struct{})) - } - - // Launch all subscribers before measuring starts. - for i := 0; i < NSubscribers; i++ { - go func(i int, startCh chan struct{}) { - // Wait for the start signal. - <-startCh - - expected := 0 - - // connect with a unique clientID to a random host in the cluster. - if useCluster { - s = cl.randomNonLeader() - } - o := s.getOpts() - c, r := testMQTTConnect(b, &mqttConnInfo{clientID: fmt.Sprintf("sub-%d", i), cleanSess: true}, o.MQTT.Host, o.MQTT.Port) - defer c.Close() - testMQTTCheckConnAck(b, r, mqttConnAckRCConnectionAccepted, false) - - testMQTTSub(b, 1, c, r, []*mqttFilter{{filter: "test", qos: QOS}}, []byte{QOS}) - testMQTTFlush(b, c, nil, r) - allSubscribed.Done() - - for { - pubHeader, pi, _, msg := testMQTTGetPubMsgEx(b, c, r, "test", nil) - b.Logf("Received '%s' message", string(msg)) - if mqttGetQoS(pubHeader) != QOS { - b.Fatalf("Expected QoS2, got %v", mqttGetQoS(pubHeader)) - } - if pubHeader&mqttPubFlagDup != 0 { - b.Fatalf("Expected DUP flag to be clear") - } - switch QOS { - case 1: - testMQTTSendPIPacket(mqttPacketPubAck, b, c, pi) - case 2: - testMQTTSendPIPacket(mqttPacketPubRec, b, c, pi) - testMQTTReadPIPacket(mqttPacketPubRel, b, r, pi) - testMQTTSendPIPacket(mqttPacketPubComp, b, c, pi) - } - - if string(msg) == "quit" { - testMQTTDisconnect(b, c, nil) - c.Close() - receivedCh <- i - return - } - - n, err := strconv.Atoi(string(msg)) - if err != nil { - b.Fatalf("Expected a number, got %q", msg) - } - if n != expected { - b.Fatalf("Expected %v, got %v", expected, n) - } - expected++ - - receivedCh <- i - } - }(i, startCh[i]) - } - b.Log("Launched all", NSubscribers, "subscribers") - - // Measure. - b.ResetTimer() - b.StartTimer() - - // Start the subscribers. - for i := 0; i < NSubscribers; i++ { - close(startCh[i]) - time.Sleep(10 * time.Millisecond) - if (i+1)%100 == 0 { - b.Log("Started", i+1, "subscribers") - } - } - allSubscribed.Wait() - - // Publish a message. - c, r = testMQTTConnect(b, &mqttConnInfo{clientID: "init", cleanSess: true}, o.MQTT.Host, o.MQTT.Port) - testMQTTCheckConnAck(b, r, mqttConnAckRCConnectionAccepted, false) - - sendAndReceiveAll := func(pi uint16, data []byte) { - testMQTTPublish(b, c, r, QOS, false, false, "test", pi, data) - b.Logf("Published '%s' message", string(data)) - - var total, expectedTotal int64 - for i := 0; i < NSubscribers; i++ { - done := <-receivedCh - expectedTotal += int64(i) - total += int64(done) - } - if total != expectedTotal { - b.Fatalf("Expected total to be %v, got %v", expectedTotal, total) - } - b.Log("Received by all", NSubscribers, "subscribers, sum:", total) - } - - var pi uint16 = 1 - for i := 0; i < NMessages-1; i++ { - sendAndReceiveAll(pi, []byte(strconv.Itoa(i))) - pi++ - } - sendAndReceiveAll(pi, []byte("quit")) - - testMQTTDisconnect(b, c, nil) - c.Close() -} - ////////////////////////////////////////////////////////////////////////// // // Benchmarks