Skip to content

Commit

Permalink
Cherry-picks for 2.10.17-RC.3 (#5520)
Browse files Browse the repository at this point in the history
Includes the following:

* #5515
* #5516 
* #5517
* #5519
* #5514 
* #5521 
* #5522 

Signed-off-by: Neil Twigg <[email protected]>
  • Loading branch information
wallyqs authored Jun 11, 2024
2 parents 6c0d3aa + 407aa16 commit 675b530
Show file tree
Hide file tree
Showing 17 changed files with 454 additions and 78 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ script: ./scripts/runTestsOnTravis.sh $TEST_SUITE
deploy:
provider: script
cleanup: true
script: curl -sL http://git.io/goreleaser | bash
script: curl -sfL https://goreleaser.com/static/run | VERSION=v1.26.2 bash
on:
tags: true
condition: ($TRAVIS_GO_VERSION =~ 1.22) && ($TEST_SUITE = "compile")
6 changes: 4 additions & 2 deletions server/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2950,8 +2950,10 @@ func (c *client) addShadowSub(sub *subscription, ime *ime, enact bool) (*subscri
return nil, fmt.Errorf(errs)
}

// Update our route map here.
c.srv.updateRemoteSubscription(im.acc, &nsub, 1)
// Update our route map here. But only if we are not a leaf node or a hub leafnode.
if c.kind != LEAF || c.isHubLeafNode() {
c.srv.updateRemoteSubscription(im.acc, &nsub, 1)
}

return &nsub, nil
}
Expand Down
12 changes: 5 additions & 7 deletions server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -711,7 +711,7 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri
}

mset.mu.RLock()
s, jsa, tierName, cfg, acc := mset.srv, mset.jsa, mset.tier, mset.cfg, mset.acc
s, jsa, cfg, acc := mset.srv, mset.jsa, mset.cfg, mset.acc
retention := cfg.Retention
mset.mu.RUnlock()

Expand All @@ -726,21 +726,19 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri
return nil, NewJSConsumerConfigRequiredError()
}

jsa.usageMu.RLock()
selectedLimits, limitsFound := jsa.limits[tierName]
jsa.usageMu.RUnlock()
if !limitsFound {
selectedLimits, _, _, _ := acc.selectLimits(config.replicas(&cfg))
if selectedLimits == nil {
return nil, NewJSNoLimitsError()
}

srvLim := &s.getOpts().JetStreamLimits
// Make sure we have sane defaults. Do so with the JS lock, otherwise a
// badly timed meta snapshot can result in a race condition.
mset.js.mu.Lock()
setConsumerConfigDefaults(config, &mset.cfg, srvLim, &selectedLimits)
setConsumerConfigDefaults(config, &mset.cfg, srvLim, selectedLimits)
mset.js.mu.Unlock()

if err := checkConsumerCfg(config, srvLim, &cfg, acc, &selectedLimits, isRecovering); err != nil {
if err := checkConsumerCfg(config, srvLim, &cfg, acc, selectedLimits, isRecovering); err != nil {
return nil, err
}
sampleFreq := 0
Expand Down
47 changes: 31 additions & 16 deletions server/jetstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -1440,7 +1440,11 @@ func (a *Account) maxBytesLimits(cfg *StreamConfig) (bool, int64) {
return false, 0
}
jsa.usageMu.RLock()
selectedLimits, _, ok := jsa.selectLimits(cfg)
var replicas int
if cfg != nil {
replicas = cfg.Replicas
}
selectedLimits, _, ok := jsa.selectLimits(replicas)
jsa.usageMu.RUnlock()
if !ok {
return false, 0
Expand Down Expand Up @@ -1590,7 +1594,7 @@ func diffCheckedLimits(a, b map[string]JetStreamAccountLimits) map[string]JetStr
func (jsa *jsAccount) reservedStorage(tier string) (mem, store uint64) {
for _, mset := range jsa.streams {
cfg := &mset.cfg
if tier == _EMPTY_ || tier == tierName(cfg) && cfg.MaxBytes > 0 {
if tier == _EMPTY_ || tier == tierName(cfg.Replicas) && cfg.MaxBytes > 0 {
switch cfg.Storage {
case FileStorage:
store += uint64(cfg.MaxBytes)
Expand All @@ -1607,7 +1611,7 @@ func (jsa *jsAccount) reservedStorage(tier string) (mem, store uint64) {
func reservedStorage(sas map[string]*streamAssignment, tier string) (mem, store uint64) {
for _, sa := range sas {
cfg := sa.Config
if tier == _EMPTY_ || tier == tierName(cfg) && cfg.MaxBytes > 0 {
if tier == _EMPTY_ || tier == tierName(cfg.Replicas) && cfg.MaxBytes > 0 {
switch cfg.Storage {
case FileStorage:
store += uint64(cfg.MaxBytes)
Expand Down Expand Up @@ -1695,17 +1699,29 @@ func (a *Account) JetStreamUsage() JetStreamAccountStats {
stats.ReservedMemory, stats.ReservedStore = reservedStorage(sas, _EMPTY_)
}
for _, sa := range sas {
stats.Consumers += len(sa.consumers)
if !defaultTier {
tier := tierName(sa.Config)
u, ok := stats.Tiers[tier]
if defaultTier {
stats.Consumers += len(sa.consumers)
} else {
stats.Streams++
streamTier := tierName(sa.Config.Replicas)
su, ok := stats.Tiers[streamTier]
if !ok {
u = JetStreamTier{}
su = JetStreamTier{}
}
su.Streams++
stats.Tiers[streamTier] = su

// Now consumers, check each since could be different tiers.
for _, ca := range sa.consumers {
stats.Consumers++
consumerTier := tierName(ca.Config.replicas(sa.Config))
cu, ok := stats.Tiers[consumerTier]
if !ok {
cu = JetStreamTier{}
}
cu.Consumers++
stats.Tiers[consumerTier] = cu
}
u.Streams++
stats.Streams++
u.Consumers += len(sa.consumers)
stats.Tiers[tier] = u
}
}
} else {
Expand Down Expand Up @@ -2089,9 +2105,8 @@ func (js *jetStream) limitsExceeded(storeType StorageType) bool {
return js.wouldExceedLimits(storeType, 0)
}

func tierName(cfg *StreamConfig) string {
func tierName(replicas int) string {
// TODO (mh) this is where we could select based off a placement tag as well "qos:tier"
replicas := cfg.Replicas
if replicas == 0 {
replicas = 1
}
Expand All @@ -2111,11 +2126,11 @@ func (jsa *jsAccount) jetStreamAndClustered() (*jetStream, bool) {
}

// jsa.usageMu read lock should be held.
func (jsa *jsAccount) selectLimits(cfg *StreamConfig) (JetStreamAccountLimits, string, bool) {
func (jsa *jsAccount) selectLimits(replicas int) (JetStreamAccountLimits, string, bool) {
if selectedLimits, ok := jsa.limits[_EMPTY_]; ok {
return selectedLimits, _EMPTY_, true
}
tier := tierName(cfg)
tier := tierName(replicas)
if selectedLimits, ok := jsa.limits[tier]; ok {
return selectedLimits, tier, true
}
Expand Down
6 changes: 5 additions & 1 deletion server/jetstream_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -3267,7 +3267,11 @@ func (s *Server) jsStreamPurgeRequest(sub *subscription, c *client, _ *Account,
}

func (acc *Account) jsNonClusteredStreamLimitsCheck(cfg *StreamConfig) *ApiError {
selectedLimits, tier, jsa, apiErr := acc.selectLimits(cfg)
var replicas int
if cfg != nil {
replicas = cfg.Replicas
}
selectedLimits, tier, jsa, apiErr := acc.selectLimits(replicas)
if apiErr != nil {
return apiErr
}
Expand Down
14 changes: 9 additions & 5 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -5986,7 +5986,7 @@ func (js *jetStream) createGroupForStream(ci *ClientInfo, cfg *StreamConfig) (*r
return nil, errs
}

func (acc *Account) selectLimits(cfg *StreamConfig) (*JetStreamAccountLimits, string, *jsAccount, *ApiError) {
func (acc *Account) selectLimits(replicas int) (*JetStreamAccountLimits, string, *jsAccount, *ApiError) {
// Grab our jetstream account info.
acc.mu.RLock()
jsa := acc.js
Expand All @@ -5997,7 +5997,7 @@ func (acc *Account) selectLimits(cfg *StreamConfig) (*JetStreamAccountLimits, st
}

jsa.usageMu.RLock()
selectedLimits, tierName, ok := jsa.selectLimits(cfg)
selectedLimits, tierName, ok := jsa.selectLimits(replicas)
jsa.usageMu.RUnlock()

if !ok {
Expand All @@ -6008,7 +6008,11 @@ func (acc *Account) selectLimits(cfg *StreamConfig) (*JetStreamAccountLimits, st

// Read lock needs to be held
func (js *jetStream) jsClusteredStreamLimitsCheck(acc *Account, cfg *StreamConfig) *ApiError {
selectedLimits, tier, _, apiErr := acc.selectLimits(cfg)
var replicas int
if cfg != nil {
replicas = cfg.Replicas
}
selectedLimits, tier, _, apiErr := acc.selectLimits(replicas)
if apiErr != nil {
return apiErr
}
Expand Down Expand Up @@ -7145,7 +7149,7 @@ func (s *Server) jsClusteredConsumerRequest(ci *ClientInfo, acc *Account, subjec
s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp))
return
}
selectedLimits, _, _, apiErr := acc.selectLimits(&streamCfg)
selectedLimits, _, _, apiErr := acc.selectLimits(cfg.replicas(&streamCfg))
if apiErr != nil {
resp.Error = apiErr
s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp))
Expand Down Expand Up @@ -7202,7 +7206,7 @@ func (s *Server) jsClusteredConsumerRequest(ci *ClientInfo, acc *Account, subjec
// If the consumer name is specified and we think it already exists, then
// we're likely updating an existing consumer, so don't count it. Otherwise
// we will incorrectly return NewJSMaximumConsumersLimitError for an update.
if oname != "" && cn == oname && sa.consumers[oname] != nil {
if oname != _EMPTY_ && cn == oname && sa.consumers[oname] != nil {
continue
}
}
Expand Down
72 changes: 72 additions & 0 deletions server/jetstream_jwt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1460,3 +1460,75 @@ func TestJetStreamJWTHAStorageLimitsOnScaleAndUpdate(t *testing.T) {
require_Equal(t, r3.ReservedMemory, 22*1024) // TEST9
require_Equal(t, r3.ReservedStore, 5*1024*1024) // TEST1-TEST6
}

func TestJetStreamJWTClusteredTiersR3StreamWithR1ConsumersAndAccounting(t *testing.T) {
sysKp, syspub := createKey(t)
sysJwt := encodeClaim(t, jwt.NewAccountClaims(syspub), syspub)
newUser(t, sysKp)

accKp, aExpPub := createKey(t)
accClaim := jwt.NewAccountClaims(aExpPub)
accClaim.Name = "acc"
accClaim.Limits.JetStreamTieredLimits["R1"] = jwt.JetStreamLimits{
DiskStorage: 1100, Consumer: 10, Streams: 1}
accClaim.Limits.JetStreamTieredLimits["R3"] = jwt.JetStreamLimits{
DiskStorage: 1100, Consumer: 1, Streams: 1}
accJwt := encodeClaim(t, accClaim, aExpPub)
accCreds := newUser(t, accKp)
tmlp := `
listen: 127.0.0.1:-1
server_name: %s
jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: '%s'}
leaf {
listen: 127.0.0.1:-1
}
cluster {
name: %s
listen: 127.0.0.1:%d
routes = [%s]
}
` + fmt.Sprintf(`
operator: %s
system_account: %s
resolver = MEMORY
resolver_preload = {
%s : %s
%s : %s
}
`, ojwt, syspub, syspub, sysJwt, aExpPub, accJwt)

c := createJetStreamClusterWithTemplate(t, tmlp, "cluster", 3)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomServer(), nats.UserCredentials(accCreds))
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo.*"},
Replicas: 3,
})
require_NoError(t, err)

// Now make sure we can add in 10 R1 consumers.
for i := 1; i <= 10; i++ {
_, err = js.AddConsumer("TEST", &nats.ConsumerConfig{
Name: fmt.Sprintf("C-%d", i),
AckPolicy: nats.AckExplicitPolicy,
Replicas: 1,
})
require_NoError(t, err)
}

info, err := js.AccountInfo()
require_NoError(t, err)

// Make sure we account for these properly.
r1 := info.Tiers["R1"]
r3 := info.Tiers["R3"]

require_Equal(t, r1.Streams, 0)
require_Equal(t, r1.Consumers, 10)
require_Equal(t, r3.Streams, 1)
require_Equal(t, r3.Consumers, 0)
}
82 changes: 82 additions & 0 deletions server/jetstream_leafnode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1242,3 +1242,85 @@ func TestJetStreamLeafNodeSvcImportExportCycle(t *testing.T) {
_, err = js.Publish("foo", []byte("msg"))
require_NoError(t, err)
}

func TestJetStreamLeafNodeJSClusterMigrateRecovery(t *testing.T) {
tmpl := strings.Replace(jsClusterAccountsTempl, "store_dir:", "domain: hub, store_dir:", 1)
c := createJetStreamCluster(t, tmpl, "hub", _EMPTY_, 3, 12232, true)
defer c.shutdown()

tmpl = strings.Replace(jsClusterTemplWithLeafNode, "store_dir:", "domain: leaf, store_dir:", 1)
lnc := c.createLeafNodesWithTemplateAndStartPort(tmpl, "leaf", 3, 23913)
defer lnc.shutdown()

lnc.waitOnClusterReady()
for _, s := range lnc.servers {
s.setJetStreamMigrateOnRemoteLeaf()
}

nc, _ := jsClientConnect(t, lnc.randomServer())
defer nc.Close()

ljs, err := nc.JetStream(nats.Domain("leaf"))
require_NoError(t, err)

// Create an asset in the leafnode cluster.
si, err := ljs.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 3,
})
require_NoError(t, err)
require_Equal(t, si.Cluster.Name, "leaf")
require_NotEqual(t, si.Cluster.Leader, noLeader)
require_Equal(t, len(si.Cluster.Replicas), 2)

// Count how many remotes each server in the leafnode cluster is
// supposed to have and then take them down.
remotes := map[*Server]int{}
for _, s := range lnc.servers {
remotes[s] += len(s.leafRemoteCfgs)
s.closeAndDisableLeafnodes()
checkLeafNodeConnectedCount(t, s, 0)
}

// The Raft nodes in the leafnode cluster now need some time to
// notice that they're no longer receiving AEs from a leader, as
// they should have been forced into observer mode. Check that
// this is the case.
time.Sleep(maxElectionTimeout)
for _, s := range lnc.servers {
s.rnMu.RLock()
for name, n := range s.raftNodes {
// We don't expect the metagroup to have turned into an
// observer but all other assets should have done.
if name == defaultMetaGroupName {
require_False(t, n.IsObserver())
} else {
require_True(t, n.IsObserver())
}
}
s.rnMu.RUnlock()
}

// Bring the leafnode connections back up.
for _, s := range lnc.servers {
s.reEnableLeafnodes()
checkLeafNodeConnectedCount(t, s, remotes[s])
}

// Wait for nodes to notice they are no longer in observer mode
// and to leave observer mode.
time.Sleep(maxElectionTimeout)
for _, s := range lnc.servers {
s.rnMu.RLock()
for _, n := range s.raftNodes {
require_False(t, n.IsObserver())
}
s.rnMu.RUnlock()
}

// Previously nodes would have left observer mode but then would
// have failed to elect a stream leader as they were stuck on a
// long election timer. Now this should work reliably.
lnc.waitOnStreamLeader(globalAccountName, "TEST")
}
Loading

0 comments on commit 675b530

Please sign in to comment.