From fc677b674fd99558d52bb8d434869302e2a5472e Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Thu, 20 Jun 2024 05:28:27 +0800 Subject: [PATCH 1/2] Make sure to always remove internal clients from the account regardless of kind. Signed-off-by: Derek Collison --- server/client.go | 73 +++++++++++++++++++++++---------------------- server/raft_test.go | 31 +++++++++++++++++++ 2 files changed, 69 insertions(+), 35 deletions(-) diff --git a/server/client.go b/server/client.go index 5d4b57131d7..3dd0ce6dc66 100644 --- a/server/client.go +++ b/server/client.go @@ -5230,48 +5230,51 @@ func (c *client) closeConnection(reason ClosedState) { // Unregister srv.removeClient(c) - // Update remote subscriptions. - if acc != nil && (kind == CLIENT || kind == LEAF || kind == JETSTREAM) { - qsubs := map[string]*qsub{} - for _, sub := range subs { - // Call unsubscribe here to cleanup shadow subscriptions and such. - c.unsubscribe(acc, sub, true, false) - // Update route as normal for a normal subscriber. - if sub.queue == nil { - if !spoke { - srv.updateRouteSubscriptionMap(acc, sub, -1) - if srv.gateway.enabled { - srv.gatewayUpdateSubInterest(acc.Name, sub, -1) + if acc != nil { + // Update remote subscriptions. + if kind == CLIENT || kind == LEAF || kind == JETSTREAM { + qsubs := map[string]*qsub{} + for _, sub := range subs { + // Call unsubscribe here to cleanup shadow subscriptions and such. + c.unsubscribe(acc, sub, true, false) + // Update route as normal for a normal subscriber. + if sub.queue == nil { + if !spoke { + srv.updateRouteSubscriptionMap(acc, sub, -1) + if srv.gateway.enabled { + srv.gatewayUpdateSubInterest(acc.Name, sub, -1) + } } - } - acc.updateLeafNodes(sub, -1) - } else { - // We handle queue subscribers special in case we - // have a bunch we can just send one update to the - // connected routes. - num := int32(1) - if kind == LEAF { - num = sub.qw - } - // TODO(dlc) - Better to use string builder? - key := bytesToString(sub.subject) + " " + bytesToString(sub.queue) - if esub, ok := qsubs[key]; ok { - esub.n += num + acc.updateLeafNodes(sub, -1) } else { - qsubs[key] = &qsub{sub, num} + // We handle queue subscribers special in case we + // have a bunch we can just send one update to the + // connected routes. + num := int32(1) + if kind == LEAF { + num = sub.qw + } + key := keyFromSub(sub) + if esub, ok := qsubs[key]; ok { + esub.n += num + } else { + qsubs[key] = &qsub{sub, num} + } } } - } - // Process any qsubs here. - for _, esub := range qsubs { - if !spoke { - srv.updateRouteSubscriptionMap(acc, esub.sub, -(esub.n)) - if srv.gateway.enabled { - srv.gatewayUpdateSubInterest(acc.Name, esub.sub, -(esub.n)) + // Process any qsubs here. + for _, esub := range qsubs { + if !spoke { + srv.updateRouteSubscriptionMap(acc, esub.sub, -(esub.n)) + if srv.gateway.enabled { + srv.gatewayUpdateSubInterest(acc.Name, esub.sub, -(esub.n)) + } } + acc.updateLeafNodes(esub.sub, -(esub.n)) } - acc.updateLeafNodes(esub.sub, -(esub.n)) } + // Always remove from the account, otherwise we can leak clients. + // Note that SYSTEM and ACCOUNT types from above cleanup their own subs. if prev := acc.removeClient(c); prev == 1 { srv.decActiveAccounts() } diff --git a/server/raft_test.go b/server/raft_test.go index 5e5ab4b630c..91226407583 100644 --- a/server/raft_test.go +++ b/server/raft_test.go @@ -518,3 +518,34 @@ func TestNRGElectionTimerAfterObserver(t *testing.T) { require_True(t, etlr.After(before)) } } + +func TestNRGSystemClientCleanupFromAccount(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + s := c.randomServer() + sacc := s.SystemAccount() + + numClients := func() int { + sacc.mu.RLock() + defer sacc.mu.RUnlock() + return len(sacc.clients) + } + + start := numClients() + + var all []smGroup + for i := 0; i < 5; i++ { + rgName := fmt.Sprintf("TEST-%d", i) + rg := c.createRaftGroup(rgName, 3, newStateAdder) + all = append(all, rg) + rg.waitOnLeader() + } + for _, rg := range all { + for _, sm := range rg { + sm.node().Stop() + } + } + finish := numClients() + require_Equal(t, start, finish) +} From 2a8667d27eba8055a976004c5dab4dc19233bd20 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Thu, 20 Jun 2024 06:42:08 +0800 Subject: [PATCH 2/2] Increase cache invalidation times for mb.cache.buf and mb.fss. This is to avoid thrashing on sparse streams and streams that hit max msgs limits where we have to remove the first that nay be in a totally different block. Signed-off-by: Derek Collison --- server/filestore.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/filestore.go b/server/filestore.go index f65721cc061..df58c981e80 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -295,13 +295,13 @@ const ( // Maximum size of a write buffer we may consider for re-use. maxBufReuse = 2 * 1024 * 1024 // default cache buffer expiration - defaultCacheBufferExpiration = 2 * time.Second + defaultCacheBufferExpiration = 10 * time.Second // default sync interval defaultSyncInterval = 2 * time.Minute // default idle timeout to close FDs. closeFDsIdle = 30 * time.Second // default expiration time for mb.fss when idle. - defaultFssExpiration = 10 * time.Second + defaultFssExpiration = 2 * time.Minute // coalesceMinimum coalesceMinimum = 16 * 1024 // maxFlushWait is maximum we will wait to gather messages to flush.