diff --git a/server/client.go b/server/client.go index 5d4b57131d7..3dd0ce6dc66 100644 --- a/server/client.go +++ b/server/client.go @@ -5230,48 +5230,51 @@ func (c *client) closeConnection(reason ClosedState) { // Unregister srv.removeClient(c) - // Update remote subscriptions. - if acc != nil && (kind == CLIENT || kind == LEAF || kind == JETSTREAM) { - qsubs := map[string]*qsub{} - for _, sub := range subs { - // Call unsubscribe here to cleanup shadow subscriptions and such. - c.unsubscribe(acc, sub, true, false) - // Update route as normal for a normal subscriber. - if sub.queue == nil { - if !spoke { - srv.updateRouteSubscriptionMap(acc, sub, -1) - if srv.gateway.enabled { - srv.gatewayUpdateSubInterest(acc.Name, sub, -1) + if acc != nil { + // Update remote subscriptions. + if kind == CLIENT || kind == LEAF || kind == JETSTREAM { + qsubs := map[string]*qsub{} + for _, sub := range subs { + // Call unsubscribe here to cleanup shadow subscriptions and such. + c.unsubscribe(acc, sub, true, false) + // Update route as normal for a normal subscriber. + if sub.queue == nil { + if !spoke { + srv.updateRouteSubscriptionMap(acc, sub, -1) + if srv.gateway.enabled { + srv.gatewayUpdateSubInterest(acc.Name, sub, -1) + } } - } - acc.updateLeafNodes(sub, -1) - } else { - // We handle queue subscribers special in case we - // have a bunch we can just send one update to the - // connected routes. - num := int32(1) - if kind == LEAF { - num = sub.qw - } - // TODO(dlc) - Better to use string builder? - key := bytesToString(sub.subject) + " " + bytesToString(sub.queue) - if esub, ok := qsubs[key]; ok { - esub.n += num + acc.updateLeafNodes(sub, -1) } else { - qsubs[key] = &qsub{sub, num} + // We handle queue subscribers special in case we + // have a bunch we can just send one update to the + // connected routes. + num := int32(1) + if kind == LEAF { + num = sub.qw + } + key := keyFromSub(sub) + if esub, ok := qsubs[key]; ok { + esub.n += num + } else { + qsubs[key] = &qsub{sub, num} + } } } - } - // Process any qsubs here. - for _, esub := range qsubs { - if !spoke { - srv.updateRouteSubscriptionMap(acc, esub.sub, -(esub.n)) - if srv.gateway.enabled { - srv.gatewayUpdateSubInterest(acc.Name, esub.sub, -(esub.n)) + // Process any qsubs here. + for _, esub := range qsubs { + if !spoke { + srv.updateRouteSubscriptionMap(acc, esub.sub, -(esub.n)) + if srv.gateway.enabled { + srv.gatewayUpdateSubInterest(acc.Name, esub.sub, -(esub.n)) + } } + acc.updateLeafNodes(esub.sub, -(esub.n)) } - acc.updateLeafNodes(esub.sub, -(esub.n)) } + // Always remove from the account, otherwise we can leak clients. + // Note that SYSTEM and ACCOUNT types from above cleanup their own subs. if prev := acc.removeClient(c); prev == 1 { srv.decActiveAccounts() } diff --git a/server/filestore.go b/server/filestore.go index f65721cc061..df58c981e80 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -295,13 +295,13 @@ const ( // Maximum size of a write buffer we may consider for re-use. maxBufReuse = 2 * 1024 * 1024 // default cache buffer expiration - defaultCacheBufferExpiration = 2 * time.Second + defaultCacheBufferExpiration = 10 * time.Second // default sync interval defaultSyncInterval = 2 * time.Minute // default idle timeout to close FDs. closeFDsIdle = 30 * time.Second // default expiration time for mb.fss when idle. - defaultFssExpiration = 10 * time.Second + defaultFssExpiration = 2 * time.Minute // coalesceMinimum coalesceMinimum = 16 * 1024 // maxFlushWait is maximum we will wait to gather messages to flush. diff --git a/server/raft_test.go b/server/raft_test.go index 5e5ab4b630c..91226407583 100644 --- a/server/raft_test.go +++ b/server/raft_test.go @@ -518,3 +518,34 @@ func TestNRGElectionTimerAfterObserver(t *testing.T) { require_True(t, etlr.After(before)) } } + +func TestNRGSystemClientCleanupFromAccount(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + s := c.randomServer() + sacc := s.SystemAccount() + + numClients := func() int { + sacc.mu.RLock() + defer sacc.mu.RUnlock() + return len(sacc.clients) + } + + start := numClients() + + var all []smGroup + for i := 0; i < 5; i++ { + rgName := fmt.Sprintf("TEST-%d", i) + rg := c.createRaftGroup(rgName, 3, newStateAdder) + all = append(all, rg) + rg.waitOnLeader() + } + for _, rg := range all { + for _, sm := range rg { + sm.node().Stop() + } + } + finish := numClients() + require_Equal(t, start, finish) +}