Skip to content

Commit

Permalink
[CHANGED] MQTT s.clear() do not wait for JS responses when disconnect…
Browse files Browse the repository at this point in the history
…ing the session
  • Loading branch information
levb committed Jun 20, 2024
1 parent d4205d4 commit 1884b9a
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 273 deletions.
38 changes: 14 additions & 24 deletions server/mqtt.go
Original file line number Diff line number Diff line change
Expand Up @@ -1000,8 +1000,7 @@ func (s *Server) mqttHandleClosedClient(c *client) {

// This needs to be done outside of any lock.
if doClean {
if err := sess.clear(); err != nil {
// if err := sess.clear(true); err != nil {
if err := sess.clear(true); err != nil {
c.Errorf(err.Error())
}
}
Expand Down Expand Up @@ -1476,8 +1475,7 @@ func (s *Server) mqttCreateAccountSessionManager(acc *Account, quitCh chan struc
// Opportunistically delete the old (legacy) consumer, from v2.10.10 and
// before. Ignore any errors that might arise.
rmLegacyDurName := mqttRetainedMsgsStreamName + "_" + jsa.id
// jsa.deleteConsumer(mqttRetainedMsgsStreamName, rmLegacyDurName, true)
jsa.deleteConsumer(mqttRetainedMsgsStreamName, rmLegacyDurName)
jsa.deleteConsumer(mqttRetainedMsgsStreamName, rmLegacyDurName, true)

// Create a new, uniquely names consumer for retained messages for this
// server. The prior one will expire eventually.
Expand Down Expand Up @@ -1704,13 +1702,12 @@ func (jsa *mqttJSA) createDurableConsumer(cfg *CreateConsumerRequest) (*JSApiCon
}

// if noWait is specified, does not wait for the JS response, returns nil
// func (jsa *mqttJSA) deleteConsumer(streamName, consName string, noWait bool) (*JSApiConsumerDeleteResponse, error) {
func (jsa *mqttJSA) deleteConsumer(streamName, consName string) (*JSApiConsumerDeleteResponse, error) {
func (jsa *mqttJSA) deleteConsumer(streamName, consName string, noWait bool) (*JSApiConsumerDeleteResponse, error) {
subj := fmt.Sprintf(JSApiConsumerDeleteT, streamName, consName)
// if noWait {
// jsa.sendMsg(subj, nil)
// return nil, nil
// }
if noWait {
jsa.sendMsg(subj, nil)
return nil, nil
}

cdri, err := jsa.newRequest(mqttJSAConsumerDel, subj, 0, nil)
if err != nil {
Expand Down Expand Up @@ -3187,8 +3184,7 @@ func (sess *mqttSession) save() error {
//
// Runs from the client's readLoop.
// Lock not held on entry, but session is in the locked map.
func (sess *mqttSession) clear() error {
// func (sess *mqttSession) clear(noWait bool) error {
func (sess *mqttSession) clear(noWait bool) error {
var durs []string
var pubRelDur string

Expand Down Expand Up @@ -5193,20 +5189,17 @@ func (sess *mqttSession) ensurePubRelConsumerSubscription(c *client) error {
Config: ConsumerConfig{
DeliverSubject: pubRelDeliverySubject,
Durable: mqttPubRelConsumerDurablePrefix + idHash,
// Name: mqttPubRelConsumerDurablePrefix + idHash,
AckPolicy: AckExplicit,
DeliverPolicy: DeliverNew,
FilterSubject: pubRelSubject,
AckWait: ackWait,
MaxAckPending: maxAckPending,
MemoryStorage: opts.MQTT.ConsumerMemoryStorage,
// InactiveThreshold: 1 * time.Hour,
AckPolicy: AckExplicit,
DeliverPolicy: DeliverNew,
FilterSubject: pubRelSubject,
AckWait: ackWait,
MaxAckPending: maxAckPending,
MemoryStorage: opts.MQTT.ConsumerMemoryStorage,
},
}
if opts.MQTT.ConsumerInactiveThreshold > 0 {
ccr.Config.InactiveThreshold = opts.MQTT.ConsumerInactiveThreshold
}
// if _, err := sess.jsa.createEphemeralConsumer(ccr); err != nil {
if _, err := sess.jsa.createDurableConsumer(ccr); err != nil {
c.Errorf("Unable to add JetStream consumer for PUBREL for client %q: err=%v", id, err)
return err
Expand Down Expand Up @@ -5312,12 +5305,9 @@ func (sess *mqttSession) processJSConsumer(c *client, subject, sid string,
MemoryStorage: opts.MQTT.ConsumerMemoryStorage,
},
}
// Name: durName,
// InactiveThreshold: 1 * time.Hour,
if opts.MQTT.ConsumerInactiveThreshold > 0 {
ccr.Config.InactiveThreshold = opts.MQTT.ConsumerInactiveThreshold
}
// if _, err := sess.jsa.createEphemeralConsumer(ccr); err != nil {
if _, err := sess.jsa.createDurableConsumer(ccr); err != nil {
c.Errorf("Unable to add JetStream consumer for subscription on %q: err=%v", subject, err)
return nil, nil, err
Expand Down
4 changes: 2 additions & 2 deletions server/mqtt_ex_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ func mqttBenchWrapForMatrixField(
func (m mqttBenchMatrix) runMatrix(b *testing.B, bc mqttBenchContext, f func(*testing.B, *mqttBenchContext)) {
b.Helper()
f = mqttBenchWrapForMatrixField(&bc.MessageSize, m.MessageSize, f, func(size int) string {
return sizeKB(uint64(size))
return sizeKB(size)
})
f = mqttBenchWrapForMatrixField(&bc.Topics, m.Topics, f, func(n int) string {
return fmt.Sprintf("%dtopics", n)
Expand Down Expand Up @@ -387,7 +387,7 @@ func (m mqttBenchMatrix) QOS1Only() mqttBenchMatrix {
return m
}

func sizeKB(size uint64) string {
func sizeKB(size int) string {
unit := "B"
N := size
if size >= KB {
Expand Down
Loading

0 comments on commit 1884b9a

Please sign in to comment.