diff --git a/go.mod b/go.mod index 1ee0d5d86f..6ec4723455 100644 --- a/go.mod +++ b/go.mod @@ -1,13 +1,15 @@ module github.com/nats-io/nats-server/v2 -go 1.21.0 +go 1.22 + +toolchain go1.22.8 require ( github.com/klauspost/compress v1.17.11 github.com/minio/highwayhash v1.0.3 - github.com/nats-io/jwt/v2 v2.5.8 + github.com/nats-io/jwt/v2 v2.7.3 github.com/nats-io/nats.go v1.36.0 - github.com/nats-io/nkeys v0.4.8 + github.com/nats-io/nkeys v0.4.9 github.com/nats-io/nuid v1.0.1 go.uber.org/automaxprocs v1.6.0 golang.org/x/crypto v0.31.0 diff --git a/go.sum b/go.sum index 51b9fae132..2406dda71a 100644 --- a/go.sum +++ b/go.sum @@ -4,12 +4,12 @@ github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IX github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0= github.com/minio/highwayhash v1.0.3 h1:kbnuUMoHYyVl7szWjSxJnxw11k2U709jqFPPmIUyD6Q= github.com/minio/highwayhash v1.0.3/go.mod h1:GGYsuwP/fPD6Y9hMiXuapVvlIUEhFhMTh0rxU3ik1LQ= -github.com/nats-io/jwt/v2 v2.5.8 h1:uvdSzwWiEGWGXf+0Q+70qv6AQdvcvxrv9hPM0RiPamE= -github.com/nats-io/jwt/v2 v2.5.8/go.mod h1:ZdWS1nZa6WMZfFwwgpEaqBV8EPGVgOTDHN/wTbz0Y5A= +github.com/nats-io/jwt/v2 v2.7.3 h1:6bNPK+FXgBeAqdj4cYQ0F8ViHRbi7woQLq4W29nUAzE= +github.com/nats-io/jwt/v2 v2.7.3/go.mod h1:GvkcbHhKquj3pkioy5put1wvPxs78UlZ7D/pY+BgZk4= github.com/nats-io/nats.go v1.36.0 h1:suEUPuWzTSse/XhESwqLxXGuj8vGRuPRoG7MoRN/qyU= github.com/nats-io/nats.go v1.36.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8= -github.com/nats-io/nkeys v0.4.8 h1:+wee30071y3vCZAYRsnrmIPaOe47A/SkK/UBDPdIV70= -github.com/nats-io/nkeys v0.4.8/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc= +github.com/nats-io/nkeys v0.4.9 h1:qe9Faq2Gxwi6RZnZMXfmGMZkg3afLLOtrU+gDZJ35b0= +github.com/nats-io/nkeys v0.4.9/go.mod h1:jcMqs+FLG+W5YO36OX6wFIFcmpdAns+w1Wm6D3I/evE= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= diff --git a/server/filestore.go b/server/filestore.go index de0dbda7e9..c5920587da 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -2315,8 +2315,8 @@ func (mb *msgBlock) firstMatching(filter string, wc bool, start uint64, sm *Stor fseq = lseq + 1 for _, subj := range subs { ss, _ := mb.fss.Find(stringToBytes(subj)) - if ss != nil && (ss.firstNeedsUpdate || ss.lastNeedsUpdate) { - mb.recalculateForSubj(subj, ss) + if ss != nil && ss.firstNeedsUpdate { + mb.recalculateFirstForSubj(subj, ss.First, ss) } if ss == nil || start > ss.Last || ss.First >= fseq { continue @@ -2445,8 +2445,8 @@ func (mb *msgBlock) filteredPendingLocked(filter string, wc bool, sseq uint64) ( // If we already found a partial then don't do anything else. return } - if ss.firstNeedsUpdate || ss.lastNeedsUpdate { - mb.recalculateForSubj(bytesToString(bsubj), ss) + if ss.firstNeedsUpdate { + mb.recalculateFirstForSubj(bytesToString(bsubj), ss.First, ss) } if sseq <= ss.First { update(ss) @@ -2616,6 +2616,10 @@ func (fs *fileStore) numFilteredPendingWithLast(filter string, last bool, ss *Si // Always reset. ss.First, ss.Last, ss.Msgs = 0, 0, 0 + if filter == _EMPTY_ { + filter = fwcs + } + // We do need to figure out the first and last sequences. wc := subjectHasWildcard(filter) start, stop := uint32(math.MaxUint32), uint32(0) @@ -2745,8 +2749,8 @@ func (fs *fileStore) SubjectsState(subject string) map[string]SimpleState { mb.lsts = time.Now().UnixNano() mb.fss.Match(stringToBytes(subject), func(bsubj []byte, ss *SimpleState) { subj := string(bsubj) - if ss.firstNeedsUpdate || ss.lastNeedsUpdate { - mb.recalculateForSubj(subj, ss) + if ss.firstNeedsUpdate { + mb.recalculateFirstForSubj(subj, ss.First, ss) } oss := fss[subj] if oss.First == 0 { // New @@ -2936,8 +2940,8 @@ func (fs *fileStore) NumPending(sseq uint64, filter string, lastPerSubject bool) return } subj := bytesToString(bsubj) - if ss.firstNeedsUpdate || ss.lastNeedsUpdate { - mb.recalculateForSubj(subj, ss) + if ss.firstNeedsUpdate { + mb.recalculateFirstForSubj(subj, ss.First, ss) } if sseq <= ss.First { t += ss.Msgs @@ -3224,8 +3228,8 @@ func (fs *fileStore) NumPendingMulti(sseq uint64, sl *Sublist, lastPerSubject bo // If we already found a partial then don't do anything else. return } - if ss.firstNeedsUpdate || ss.lastNeedsUpdate { - mb.recalculateForSubj(subj, ss) + if ss.firstNeedsUpdate { + mb.recalculateFirstForSubj(subj, ss.First, ss) } if sseq <= ss.First { t += ss.Msgs @@ -3898,8 +3902,8 @@ func (fs *fileStore) firstSeqForSubj(subj string) (uint64, error) { info.fblk = i } } - if ss.firstNeedsUpdate || ss.lastNeedsUpdate { - mb.recalculateForSubj(subj, ss) + if ss.firstNeedsUpdate { + mb.recalculateFirstForSubj(subj, ss.First, ss) } mb.mu.Unlock() // Re-acquire fs lock @@ -4030,8 +4034,8 @@ func (fs *fileStore) enforceMsgPerSubjectLimit(fireCallback bool) { mb.mu.Lock() mb.ensurePerSubjectInfoLoaded() ss, ok := mb.fss.Find(stringToBytes(subj)) - if ok && ss != nil && (ss.firstNeedsUpdate || ss.lastNeedsUpdate) { - mb.recalculateForSubj(subj, ss) + if ok && ss != nil && ss.firstNeedsUpdate { + mb.recalculateFirstForSubj(subj, ss.First, ss) } mb.mu.Unlock() if ss == nil { @@ -7832,14 +7836,24 @@ func (mb *msgBlock) removeSeqPerSubject(subj string, seq uint64) { ss.Msgs-- - // We can lazily calculate the first/last sequence when needed. + // Only one left. + if ss.Msgs == 1 { + if seq == ss.Last { + ss.Last = ss.First + } else { + ss.First = ss.Last + } + ss.firstNeedsUpdate = false + return + } + + // We can lazily calculate the first sequence when needed. ss.firstNeedsUpdate = seq == ss.First || ss.firstNeedsUpdate - ss.lastNeedsUpdate = seq == ss.Last || ss.lastNeedsUpdate } -// Will recalculate the first and/or last sequence for this subject in this block. +// Will recalulate the first sequence for this subject in this block. // Will avoid slower path message lookups and scan the cache directly instead. -func (mb *msgBlock) recalculateForSubj(subj string, ss *SimpleState) { +func (mb *msgBlock) recalculateFirstForSubj(subj string, startSeq uint64, ss *SimpleState) { // Need to make sure messages are loaded. if mb.cacheNotLoaded() { if err := mb.loadMsgsWithLock(); err != nil { @@ -7847,100 +7861,42 @@ func (mb *msgBlock) recalculateForSubj(subj string, ss *SimpleState) { } } - startSlot := int(ss.First - mb.cache.fseq) - if startSlot < 0 { - startSlot = 0 - } + // Mark first as updated. + ss.firstNeedsUpdate = false + + startSlot := int(startSeq - mb.cache.fseq) if startSlot >= len(mb.cache.idx) { ss.First = ss.Last return - } - endSlot := int(ss.Last - mb.cache.fseq) - if endSlot < 0 { - endSlot = 0 - } - if endSlot >= len(mb.cache.idx) || startSlot > endSlot { - return + } else if startSlot < 0 { + startSlot = 0 } var le = binary.LittleEndian - if ss.firstNeedsUpdate { - // Mark first as updated. - ss.firstNeedsUpdate = false - - fseq := ss.First + 1 - if mbFseq := atomic.LoadUint64(&mb.first.seq); fseq < mbFseq { - fseq = mbFseq - } - for slot := startSlot; slot < len(mb.cache.idx); slot++ { - bi := mb.cache.idx[slot] &^ hbit - if bi == dbit { - // delete marker so skip. - continue - } - li := int(bi) - mb.cache.off - if li >= len(mb.cache.buf) { - ss.First = ss.Last - return - } - buf := mb.cache.buf[li:] - hdr := buf[:msgHdrSize] - slen := int(le.Uint16(hdr[20:])) - if subj == bytesToString(buf[msgHdrSize:msgHdrSize+slen]) { - seq := le.Uint64(hdr[4:]) - if seq < fseq || seq&ebit != 0 || mb.dmap.Exists(seq) { - continue - } - ss.First = seq - if ss.Msgs == 1 { - ss.Last = seq - ss.lastNeedsUpdate = false - return - } - // Skip the start slot ahead, if we need to recalculate last we can stop early. - startSlot = slot - break - } + for slot, fseq := startSlot, atomic.LoadUint64(&mb.first.seq); slot < len(mb.cache.idx); slot++ { + bi := mb.cache.idx[slot] &^ hbit + if bi == dbit { + // delete marker so skip. + continue } - } - if ss.lastNeedsUpdate { - // Mark last as updated. - ss.lastNeedsUpdate = false - - lseq := ss.Last - 1 - if mbLseq := atomic.LoadUint64(&mb.last.seq); lseq > mbLseq { - lseq = mbLseq + li := int(bi) - mb.cache.off + if li >= len(mb.cache.buf) { + ss.First = ss.Last + return } - for slot := endSlot; slot >= startSlot; slot-- { - bi := mb.cache.idx[slot] &^ hbit - if bi == dbit { - // delete marker so skip. + buf := mb.cache.buf[li:] + hdr := buf[:msgHdrSize] + slen := int(le.Uint16(hdr[20:])) + if subj == bytesToString(buf[msgHdrSize:msgHdrSize+slen]) { + seq := le.Uint64(hdr[4:]) + if seq < fseq || seq&ebit != 0 || mb.dmap.Exists(seq) { continue } - li := int(bi) - mb.cache.off - if li >= len(mb.cache.buf) { - // Can't overwrite ss.Last, just skip. - return - } - buf := mb.cache.buf[li:] - hdr := buf[:msgHdrSize] - slen := int(le.Uint16(hdr[20:])) - if subj == bytesToString(buf[msgHdrSize:msgHdrSize+slen]) { - seq := le.Uint64(hdr[4:]) - if seq > lseq || seq&ebit != 0 || mb.dmap.Exists(seq) { - continue - } - // Sequence should never be lower, but guard against it nonetheless. - if seq < ss.First { - seq = ss.First - } + ss.First = seq + if ss.Msgs == 1 { ss.Last = seq - if ss.Msgs == 1 { - ss.First = seq - ss.firstNeedsUpdate = false - } - return } + return } } } diff --git a/server/filestore_test.go b/server/filestore_test.go index 458cef7a74..1be968f8b1 100644 --- a/server/filestore_test.go +++ b/server/filestore_test.go @@ -5030,7 +5030,7 @@ func TestFileStoreRecaluclateFirstForSubjBug(t *testing.T) { mb.clearCacheAndOffset() // Now call with start sequence of 1, the old one // This will panic without the fix. - mb.recalculateForSubj("foo", ss) + mb.recalculateFirstForSubj("foo", 1, ss) // Make sure it was update properly. require_True(t, *ss == SimpleState{Msgs: 1, First: 3, Last: 3, firstNeedsUpdate: false}) } diff --git a/server/memstore.go b/server/memstore.go index 350cfa388e..e2ca1cae29 100644 --- a/server/memstore.go +++ b/server/memstore.go @@ -143,8 +143,8 @@ func (ms *memStore) storeRawMsg(subj string, hdr, msg []byte, seq uint64, ts int return ErrMaxBytes } // If we are here we are at a subject maximum, need to determine if dropping last message gives us enough room. - if ss.firstNeedsUpdate || ss.lastNeedsUpdate { - ms.recalculateForSubj(subj, ss) + if ss.firstNeedsUpdate { + ms.recalculateFirstForSubj(subj, ss.First, ss) } sm, ok := ms.msgs[ss.First] if !ok || memStoreMsgSize(sm.subj, sm.hdr, sm.msg) < memStoreMsgSize(subj, hdr, msg) { @@ -430,8 +430,8 @@ func (ms *memStore) filteredStateLocked(sseq uint64, filter string, lastPerSubje var totalSkipped uint64 // We will track start and end sequences as we go. ms.fss.Match(stringToBytes(filter), func(subj []byte, fss *SimpleState) { - if fss.firstNeedsUpdate || fss.lastNeedsUpdate { - ms.recalculateForSubj(bytesToString(subj), fss) + if fss.firstNeedsUpdate { + ms.recalculateFirstForSubj(bytesToString(subj), fss.First, fss) } if sseq <= fss.First { update(fss) @@ -585,8 +585,8 @@ func (ms *memStore) SubjectsState(subject string) map[string]SimpleState { fss := make(map[string]SimpleState) ms.fss.Match(stringToBytes(subject), func(subj []byte, ss *SimpleState) { subjs := string(subj) - if ss.firstNeedsUpdate || ss.lastNeedsUpdate { - ms.recalculateForSubj(subjs, ss) + if ss.firstNeedsUpdate { + ms.recalculateFirstForSubj(subjs, ss.First, ss) } oss := fss[subjs] if oss.First == 0 { // New @@ -675,8 +675,8 @@ func (ms *memStore) NumPendingMulti(sseq uint64, sl *Sublist, lastPerSubject boo var totalSkipped uint64 // We will track start and end sequences as we go. IntersectStree[SimpleState](ms.fss, sl, func(subj []byte, fss *SimpleState) { - if fss.firstNeedsUpdate || fss.lastNeedsUpdate { - ms.recalculateForSubj(bytesToString(subj), fss) + if fss.firstNeedsUpdate { + ms.recalculateFirstForSubj(bytesToString(subj), fss.First, fss) } if sseq <= fss.First { update(fss) @@ -793,8 +793,8 @@ func (ms *memStore) enforcePerSubjectLimit(subj string, ss *SimpleState) { return } for nmsgs := ss.Msgs; nmsgs > uint64(ms.maxp); nmsgs = ss.Msgs { - if ss.firstNeedsUpdate || ss.lastNeedsUpdate { - ms.recalculateForSubj(subj, ss) + if ss.firstNeedsUpdate { + ms.recalculateFirstForSubj(subj, ss.First, ss) } if !ms.removeMsg(ss.First, false) { break @@ -1009,9 +1009,8 @@ func (ms *memStore) Compact(seq uint64) (uint64, error) { if sm := ms.msgs[seq]; sm != nil { bytes += memStoreMsgSize(sm.subj, sm.hdr, sm.msg) purged++ - ms.removeSeqPerSubject(sm.subj, seq) - // Must delete message after updating per-subject info, to be consistent with file store. delete(ms.msgs, seq) + ms.removeSeqPerSubject(sm.subj, seq) } } if purged > ms.state.Msgs { @@ -1099,9 +1098,8 @@ func (ms *memStore) Truncate(seq uint64) error { if sm := ms.msgs[i]; sm != nil { purged++ bytes += memStoreMsgSize(sm.subj, sm.hdr, sm.msg) - ms.removeSeqPerSubject(sm.subj, i) - // Must delete message after updating per-subject info, to be consistent with file store. delete(ms.msgs, i) + ms.removeSeqPerSubject(sm.subj, i) } } // Reset last. @@ -1267,8 +1265,8 @@ func (ms *memStore) LoadNextMsg(filter string, wc bool, start uint64, smp *Store if !ok { continue } - if ss.firstNeedsUpdate || ss.lastNeedsUpdate { - ms.recalculateForSubj(subj, ss) + if ss.firstNeedsUpdate { + ms.recalculateFirstForSubj(subj, ss.First, ss) } if ss.First < fseq { fseq = ss.First @@ -1362,47 +1360,34 @@ func (ms *memStore) removeSeqPerSubject(subj string, seq uint64) { } ss.Msgs-- - // We can lazily calculate the first/last sequence when needed. - ss.firstNeedsUpdate = seq == ss.First || ss.firstNeedsUpdate - ss.lastNeedsUpdate = seq == ss.Last || ss.lastNeedsUpdate + // If we know we only have 1 msg left don't need to search for next first. + if ss.Msgs == 1 { + if seq == ss.Last { + ss.Last = ss.First + } else { + ss.First = ss.Last + } + ss.firstNeedsUpdate = false + } else { + ss.firstNeedsUpdate = seq == ss.First || ss.firstNeedsUpdate + } } -// Will recalculate the first and/or last sequence for this subject. +// Will recalculate the first sequence for this subject in this block. // Lock should be held. -func (ms *memStore) recalculateForSubj(subj string, ss *SimpleState) { - if ss.firstNeedsUpdate { - tseq := ss.First + 1 - if tseq < ms.state.FirstSeq { - tseq = ms.state.FirstSeq - } - for ; tseq <= ss.Last; tseq++ { - if sm := ms.msgs[tseq]; sm != nil && sm.subj == subj { - ss.First = tseq - ss.firstNeedsUpdate = false - if ss.Msgs == 1 { - ss.Last = tseq - ss.lastNeedsUpdate = false - return - } - break - } - } - } - if ss.lastNeedsUpdate { - tseq := ss.Last - 1 - if tseq > ms.state.LastSeq { - tseq = ms.state.LastSeq - } - for ; tseq >= ss.First; tseq-- { - if sm := ms.msgs[tseq]; sm != nil && sm.subj == subj { +func (ms *memStore) recalculateFirstForSubj(subj string, startSeq uint64, ss *SimpleState) { + tseq := startSeq + 1 + if tseq < ms.state.FirstSeq { + tseq = ms.state.FirstSeq + } + for ; tseq <= ss.Last; tseq++ { + if sm := ms.msgs[tseq]; sm != nil && sm.subj == subj { + ss.First = tseq + if ss.Msgs == 1 { ss.Last = tseq - ss.lastNeedsUpdate = false - if ss.Msgs == 1 { - ss.First = tseq - ss.firstNeedsUpdate = false - } - return } + ss.firstNeedsUpdate = false + return } } } @@ -1418,6 +1403,7 @@ func (ms *memStore) removeMsg(seq uint64, secure bool) bool { ss = memStoreMsgSize(sm.subj, sm.hdr, sm.msg) + delete(ms.msgs, seq) if ms.state.Msgs > 0 { ms.state.Msgs-- if ss > ms.state.Bytes { @@ -1442,8 +1428,6 @@ func (ms *memStore) removeMsg(seq uint64, secure bool) bool { // Remove any per subject tracking. ms.removeSeqPerSubject(sm.subj, seq) - // Must delete message after updating per-subject info, to be consistent with file store. - delete(ms.msgs, seq) if ms.scb != nil { // We do not want to hold any locks here. diff --git a/server/raft_helpers_test.go b/server/raft_helpers_test.go index 218258a88d..662a6e9460 100644 --- a/server/raft_helpers_test.go +++ b/server/raft_helpers_test.go @@ -246,6 +246,7 @@ func (a *stateAdder) stop() { a.Lock() defer a.Unlock() a.n.Stop() + a.n.WaitForStop() } // Restart the group @@ -273,6 +274,11 @@ func (a *stateAdder) restart() { panic(err) } + // Must reset in-memory state. + // A real restart would not preserve it, but more importantly we have no way to detect if we + // already applied an entry. So, the sum must only be updated based on append entries or snapshots. + a.sum = 0 + a.n, err = a.s.startRaftNode(globalAccountName, a.cfg, pprofLabels{}) if err != nil { panic(err) diff --git a/server/raft_test.go b/server/raft_test.go index 0f30dec288..df028fc548 100644 --- a/server/raft_test.go +++ b/server/raft_test.go @@ -605,8 +605,9 @@ func TestNRGHeartbeatOnLeaderChange(t *testing.T) { leader := rg.leader().(*stateAdder) leader.proposeDelta(22) leader.proposeDelta(-11) - leader.proposeDelta(-11) - rg.waitOnTotal(t, 0) + leader.proposeDelta(-10) + // Must observe forward progress, so each iteration will check +1 total. + rg.waitOnTotal(t, int64(i+1)) leader.stop() leader.restart() rg.waitOnLeader() diff --git a/server/store.go b/server/store.go index 1c8f7f7ec1..72e039816e 100644 --- a/server/store.go +++ b/server/store.go @@ -166,8 +166,6 @@ type SimpleState struct { // Internal usage for when the first needs to be updated before use. firstNeedsUpdate bool - // Internal usage for when the last needs to be updated before use. - lastNeedsUpdate bool } // LostStreamData indicates msgs that have been lost. diff --git a/server/store_test.go b/server/store_test.go index a916ceedb8..e447017829 100644 --- a/server/store_test.go +++ b/server/store_test.go @@ -142,123 +142,6 @@ func TestStoreDeleteRange(t *testing.T) { require_Equal(t, num, 1) } -func TestStoreSubjectStateConsistency(t *testing.T) { - testAllStoreAllPermutations( - t, false, - StreamConfig{Name: "TEST", Subjects: []string{"foo"}}, - func(t *testing.T, fs StreamStore) { - getSubjectState := func() SimpleState { - t.Helper() - ss := fs.SubjectsState("foo") - return ss["foo"] - } - var smp StoreMsg - expectFirstSeq := func(eseq uint64) { - t.Helper() - sm, _, err := fs.LoadNextMsg("foo", false, 0, &smp) - require_NoError(t, err) - require_Equal(t, sm.seq, eseq) - } - expectLastSeq := func(eseq uint64) { - t.Helper() - sm, err := fs.LoadLastMsg("foo", &smp) - require_NoError(t, err) - require_Equal(t, sm.seq, eseq) - } - - // Publish an initial batch of messages. - for i := 0; i < 4; i++ { - _, _, err := fs.StoreMsg("foo", nil, nil) - require_NoError(t, err) - } - - // Expect 4 msgs, with first=1, last=4. - ss := getSubjectState() - require_Equal(t, ss.Msgs, 4) - require_Equal(t, ss.First, 1) - expectFirstSeq(1) - require_Equal(t, ss.Last, 4) - expectLastSeq(4) - - // Remove first message, ss.First is lazy so will only mark ss.firstNeedsUpdate. - removed, err := fs.RemoveMsg(1) - require_NoError(t, err) - require_True(t, removed) - - // Will update first, so corrects to seq 2. - ss = getSubjectState() - require_Equal(t, ss.Msgs, 3) - require_Equal(t, ss.First, 2) - expectFirstSeq(2) - require_Equal(t, ss.Last, 4) - expectLastSeq(4) - - // Remove last message, ss.Last is lazy so will only mark ss.lastNeedsUpdate. - removed, err = fs.RemoveMsg(4) - require_NoError(t, err) - require_True(t, removed) - - // Will update last, so corrects to 3. - ss = getSubjectState() - require_Equal(t, ss.Msgs, 2) - require_Equal(t, ss.First, 2) - expectFirstSeq(2) - require_Equal(t, ss.Last, 3) - expectLastSeq(3) - - // Remove first message again. - removed, err = fs.RemoveMsg(2) - require_NoError(t, err) - require_True(t, removed) - - // Since we only have one message left, must update ss.First and ensure ss.Last equals. - ss = getSubjectState() - require_Equal(t, ss.Msgs, 1) - require_Equal(t, ss.First, 3) - expectFirstSeq(3) - require_Equal(t, ss.Last, 3) - expectLastSeq(3) - - // Publish some more messages so we can test another scenario. - for i := 0; i < 3; i++ { - _, _, err := fs.StoreMsg("foo", nil, nil) - require_NoError(t, err) - } - - // Just check the state is complete again. - ss = getSubjectState() - require_Equal(t, ss.Msgs, 4) - require_Equal(t, ss.First, 3) - expectFirstSeq(3) - require_Equal(t, ss.Last, 7) - expectLastSeq(7) - - // Remove last sequence, ss.Last is lazy so doesn't get updated. - removed, err = fs.RemoveMsg(7) - require_NoError(t, err) - require_True(t, removed) - - // Remove first sequence, ss.First is lazy so doesn't get updated. - removed, err = fs.RemoveMsg(3) - require_NoError(t, err) - require_True(t, removed) - - // Remove (now) first sequence. Both ss.First and ss.Last are lazy and both need to be recalculated later. - removed, err = fs.RemoveMsg(5) - require_NoError(t, err) - require_True(t, removed) - - // ss.First and ss.Last should both be recalculated and equal each other. - ss = getSubjectState() - require_Equal(t, ss.Msgs, 1) - require_Equal(t, ss.First, 6) - expectFirstSeq(6) - require_Equal(t, ss.Last, 6) - expectLastSeq(6) - }, - ) -} - func TestStoreMaxMsgsPerUpdateBug(t *testing.T) { config := func() StreamConfig { return StreamConfig{Name: "TEST", Subjects: []string{"foo"}, MaxMsgsPer: 0}