Skip to content

Commit

Permalink
[CHANGED] MQTT s.clear() do not wait for JS responses when disconnect…
Browse files Browse the repository at this point in the history
…ing the session
  • Loading branch information
levb committed Jun 20, 2024
1 parent 5749c41 commit ddda72d
Showing 1 changed file with 23 additions and 10 deletions.
33 changes: 23 additions & 10 deletions server/mqtt.go
Original file line number Diff line number Diff line change
Expand Up @@ -1000,7 +1000,7 @@ func (s *Server) mqttHandleClosedClient(c *client) {

// This needs to be done outside of any lock.
if doClean {
if err := sess.clear(); err != nil {
if err := sess.clear(true); err != nil {
c.Errorf(err.Error())
}
}
Expand Down Expand Up @@ -1475,7 +1475,7 @@ func (s *Server) mqttCreateAccountSessionManager(acc *Account, quitCh chan struc
// Opportunistically delete the old (legacy) consumer, from v2.10.10 and
// before. Ignore any errors that might arise.
rmLegacyDurName := mqttRetainedMsgsStreamName + "_" + jsa.id
jsa.deleteConsumer(mqttRetainedMsgsStreamName, rmLegacyDurName)
jsa.deleteConsumer(mqttRetainedMsgsStreamName, rmLegacyDurName, true)

// Create a new, uniquely names consumer for retained messages for this
// server. The prior one will expire eventually.
Expand Down Expand Up @@ -1622,6 +1622,10 @@ func (jsa *mqttJSA) newRequestExMulti(kind, subject, cidHash string, hdrs []int,
r2i[reply] = i
}

if timeout == 0 {
return nil, nil
}

// Wait for all responses to come back, or for the timeout to expire. We
// don't want to use time.After() which causes memory growth because the
// timer can't be stopped and will need to expire to then be garbage
Expand Down Expand Up @@ -1701,9 +1705,17 @@ func (jsa *mqttJSA) createDurableConsumer(cfg *CreateConsumerRequest) (*JSApiCon
return ccr, ccr.ToError()
}

func (jsa *mqttJSA) deleteConsumer(streamName, consName string) (*JSApiConsumerDeleteResponse, error) {
// if noWait is specified, does not wait for the JS response, returns nil
func (jsa *mqttJSA) deleteConsumer(streamName, consName string, noWait bool) (*JSApiConsumerDeleteResponse, error) {
subj := fmt.Sprintf(JSApiConsumerDeleteT, streamName, consName)
cdri, err := jsa.newRequest(mqttJSAConsumerDel, subj, 0, nil)

timeout := mqttJSAPITimeout
if noWait {
timeout = 0
}

// timeout == 0 signals that we don't want to wait for the response.
cdri, err := jsa.newRequestEx(mqttJSAConsumerDel, subj, _EMPTY_, -1, nil, timeout)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -3178,7 +3190,7 @@ func (sess *mqttSession) save() error {
//
// Runs from the client's readLoop.
// Lock not held on entry, but session is in the locked map.
func (sess *mqttSession) clear() error {
func (sess *mqttSession) clear(noWait bool) error {
var durs []string
var pubRelDur string

Expand Down Expand Up @@ -3206,19 +3218,19 @@ func (sess *mqttSession) clear() error {
sess.mu.Unlock()

for _, dur := range durs {
if _, err := sess.jsa.deleteConsumer(mqttStreamName, dur); isErrorOtherThan(err, JSConsumerNotFoundErr) {
if _, err := sess.jsa.deleteConsumer(mqttStreamName, dur, noWait); isErrorOtherThan(err, JSConsumerNotFoundErr) {
return fmt.Errorf("unable to delete consumer %q for session %q: %v", dur, sess.id, err)
}
}
if pubRelDur != _EMPTY_ {
_, err := sess.jsa.deleteConsumer(mqttOutStreamName, pubRelDur)
_, err := sess.jsa.deleteConsumer(mqttOutStreamName, pubRelDur, noWait)
if isErrorOtherThan(err, JSConsumerNotFoundErr) {
return fmt.Errorf("unable to delete consumer %q for session %q: %v", pubRelDur, sess.id, err)
}
}

if seq > 0 {
err := sess.jsa.deleteMsg(mqttSessStreamName, seq, true)
err := sess.jsa.deleteMsg(mqttSessStreamName, seq, !noWait)
// Ignore the various errors indicating that the message (or sequence)
// is already deleted, can happen in a cluster.
if isErrorOtherThan(err, JSSequenceNotFoundErrF) {
Expand Down Expand Up @@ -3484,7 +3496,8 @@ func (sess *mqttSession) untrackPubRel(pi uint16) (jsAckSubject string) {
func (sess *mqttSession) deleteConsumer(cc *ConsumerConfig) {
sess.mu.Lock()
sess.tmaxack -= cc.MaxAckPending
sess.jsa.sendq.push(&mqttJSPubMsg{subj: sess.jsa.prefixDomain(fmt.Sprintf(JSApiConsumerDeleteT, mqttStreamName, cc.Durable))})
sess.jsa.deleteConsumer(mqttStreamName, cc.Durable, true)
// sess.jsa.sendq.push(&mqttJSPubMsg{subj: sess.jsa.prefixDomain(fmt.Sprintf(JSApiConsumerDeleteT, mqttStreamName, cc.Durable))})
sess.mu.Unlock()
}

Expand Down Expand Up @@ -3823,7 +3836,7 @@ CHECK:
// This Session lasts as long as the Network Connection. State data
// associated with this Session MUST NOT be reused in any subsequent
// Session.
if err := es.clear(); err != nil {
if err := es.clear(false); err != nil {
asm.removeSession(es, true)
return err
}
Expand Down

0 comments on commit ddda72d

Please sign in to comment.