Skip to content

Commit

Permalink
Make sure to always remove internal clients from the account regardle…
Browse files Browse the repository at this point in the history
…ss of kind.

Signed-off-by: Derek Collison <[email protected]>
  • Loading branch information
derekcollison authored and neilalexander committed Jun 19, 2024
1 parent d3dae63 commit fc677b6
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 35 deletions.
73 changes: 38 additions & 35 deletions server/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5230,48 +5230,51 @@ func (c *client) closeConnection(reason ClosedState) {
// Unregister
srv.removeClient(c)

// Update remote subscriptions.
if acc != nil && (kind == CLIENT || kind == LEAF || kind == JETSTREAM) {
qsubs := map[string]*qsub{}
for _, sub := range subs {
// Call unsubscribe here to cleanup shadow subscriptions and such.
c.unsubscribe(acc, sub, true, false)
// Update route as normal for a normal subscriber.
if sub.queue == nil {
if !spoke {
srv.updateRouteSubscriptionMap(acc, sub, -1)
if srv.gateway.enabled {
srv.gatewayUpdateSubInterest(acc.Name, sub, -1)
if acc != nil {
// Update remote subscriptions.
if kind == CLIENT || kind == LEAF || kind == JETSTREAM {
qsubs := map[string]*qsub{}
for _, sub := range subs {
// Call unsubscribe here to cleanup shadow subscriptions and such.
c.unsubscribe(acc, sub, true, false)
// Update route as normal for a normal subscriber.
if sub.queue == nil {
if !spoke {
srv.updateRouteSubscriptionMap(acc, sub, -1)
if srv.gateway.enabled {
srv.gatewayUpdateSubInterest(acc.Name, sub, -1)
}
}
}
acc.updateLeafNodes(sub, -1)
} else {
// We handle queue subscribers special in case we
// have a bunch we can just send one update to the
// connected routes.
num := int32(1)
if kind == LEAF {
num = sub.qw
}
// TODO(dlc) - Better to use string builder?
key := bytesToString(sub.subject) + " " + bytesToString(sub.queue)
if esub, ok := qsubs[key]; ok {
esub.n += num
acc.updateLeafNodes(sub, -1)
} else {
qsubs[key] = &qsub{sub, num}
// We handle queue subscribers special in case we
// have a bunch we can just send one update to the
// connected routes.
num := int32(1)
if kind == LEAF {
num = sub.qw
}
key := keyFromSub(sub)
if esub, ok := qsubs[key]; ok {
esub.n += num
} else {
qsubs[key] = &qsub{sub, num}
}
}
}
}
// Process any qsubs here.
for _, esub := range qsubs {
if !spoke {
srv.updateRouteSubscriptionMap(acc, esub.sub, -(esub.n))
if srv.gateway.enabled {
srv.gatewayUpdateSubInterest(acc.Name, esub.sub, -(esub.n))
// Process any qsubs here.
for _, esub := range qsubs {
if !spoke {
srv.updateRouteSubscriptionMap(acc, esub.sub, -(esub.n))
if srv.gateway.enabled {
srv.gatewayUpdateSubInterest(acc.Name, esub.sub, -(esub.n))
}
}
acc.updateLeafNodes(esub.sub, -(esub.n))
}
acc.updateLeafNodes(esub.sub, -(esub.n))
}
// Always remove from the account, otherwise we can leak clients.
// Note that SYSTEM and ACCOUNT types from above cleanup their own subs.
if prev := acc.removeClient(c); prev == 1 {
srv.decActiveAccounts()
}
Expand Down
31 changes: 31 additions & 0 deletions server/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -518,3 +518,34 @@ func TestNRGElectionTimerAfterObserver(t *testing.T) {
require_True(t, etlr.After(before))
}
}

func TestNRGSystemClientCleanupFromAccount(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

s := c.randomServer()
sacc := s.SystemAccount()

numClients := func() int {
sacc.mu.RLock()
defer sacc.mu.RUnlock()
return len(sacc.clients)
}

start := numClients()

var all []smGroup
for i := 0; i < 5; i++ {
rgName := fmt.Sprintf("TEST-%d", i)
rg := c.createRaftGroup(rgName, 3, newStateAdder)
all = append(all, rg)
rg.waitOnLeader()
}
for _, rg := range all {
for _, sm := range rg {
sm.node().Stop()
}
}
finish := numClients()
require_Equal(t, start, finish)
}

0 comments on commit fc677b6

Please sign in to comment.