diff --git a/server/filestore.go b/server/filestore.go index 90f39fc4b67..0e7e1351293 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -6378,17 +6378,25 @@ func (fs *fileStore) LoadNextMsg(filter string, wc bool, start uint64, sm *Store // Similar to above if start <= first seq. // TODO(dlc) - For v2 track these by filter subject since they will represent filtered consumers. if i == bi { - var ss SimpleState - fs.numFilteredPending(filter, &ss) - // Nothing available. - if ss.Msgs == 0 { - return nil, fs.state.LastSeq, ErrStoreEOF - } - // See if we can jump ahead here. - // Right now we can only spin on first, so if we have interior sparseness need to favor checking per block fss if loaded. - // For v2 will track all blocks that have matches for psim. - if nbi, _ := fs.selectMsgBlockWithIndex(ss.First); nbi > i { - i = nbi - 1 // For the iterator condition i++ + var fblk uint32 + fs.psim.Match(stringToBytes(filter), func(_ []byte, psi *psi) { + if fblk == 0 || psi.fblk < fblk { + fblk = psi.fblk + } + }) + if int(fblk) > bi { + var ss SimpleState + fs.numFilteredPending(filter, &ss) + // Nothing available. + if ss.Msgs == 0 { + return nil, fs.state.LastSeq, ErrStoreEOF + } + // See if we can jump ahead here. + // Right now we can only spin on first, so if we have interior sparseness need to favor checking per block fss if loaded. + // For v2 will track all blocks that have matches for psim. + if nbi, _ := fs.selectMsgBlockWithIndex(ss.First); nbi > i { + i = nbi - 1 // For the iterator condition i++ + } } } // Check is we can expire. diff --git a/server/filestore_test.go b/server/filestore_test.go index d9ed222121b..07fb50ce23b 100644 --- a/server/filestore_test.go +++ b/server/filestore_test.go @@ -7254,3 +7254,58 @@ func Benchmark_FileStoreLoadNextMsgVerySparseMsgsInBetweenWithWildcard(b *testin require_NoError(b, err) } } + +func Benchmark_FileStoreLoadNextMsgAvoidsLinearBIMScan(b *testing.B) { + fsc := FileStoreConfig{StoreDir: b.TempDir()} + sc := StreamConfig{Name: "zzz", Subjects: []string{"foo.*.*"}, Storage: FileStorage} + + fs, err := newFileStore(fsc, sc) + require_NoError(b, err) + defer fs.Stop() + + skipBlocks := func(size uint32) { + origLmb := fs.lmb + origIndex := origLmb.index + fs.lmb.index += size + fs.newMsgBlockForWrite() + origLmb.index = origIndex + } + + seq, _, err := fs.StoreMsg("foo.1.1", nil, []byte{1, 2, 3, 4, 5}) + require_NoError(b, err) + require_Equal(b, seq, 1) + + skipBlocks(100_000_000) + + seq, _, err = fs.StoreMsg("foo.1.2", nil, []byte{1, 2, 3, 4, 5}) + require_NoError(b, err) + require_Equal(b, seq, 2) + + fs.newMsgBlockForWrite() + seq, _, err = fs.StoreMsg("foo.1.1", nil, []byte{1, 2, 3, 4, 5}) + require_NoError(b, err) + require_Equal(b, seq, 3) + + skipBlocks(100_000_000) + + seq, _, err = fs.StoreMsg("foo.1.2", nil, []byte{1, 2, 3, 4, 5}) + require_NoError(b, err) + require_Equal(b, seq, 4) + + // Give the per-subject index amnesia about the flk. This is the + // critical condition for this benchmark to do the right thing. + psi, ok := fs.psim.Find([]byte("foo.1.2")) + require_True(b, ok) + psi.fblk = 0 + + b.ResetTimer() + + // Now start trying to load a message that isn't in the stream yet. + // It needs to match the subject filter from the StreamConfig, but + // not existing will force the linear scan. + var smv StoreMsg + for i := 0; i < b.N; i++ { + _, _, err = fs.LoadNextMsg("foo.1.2", true, 3, &smv) + require_NoError(b, err) + } +} diff --git a/server/test_test.go b/server/test_test.go index 58af5d76f43..cf7c5e9baae 100644 --- a/server/test_test.go +++ b/server/test_test.go @@ -52,14 +52,14 @@ func RunRandClientPortServer(t *testing.T) *Server { return RunServer(&opts) } -func require_True(t *testing.T, b bool) { +func require_True(t testing.TB, b bool) { t.Helper() if !b { t.Fatalf("require true, but got false") } } -func require_False(t *testing.T, b bool) { +func require_False(t testing.TB, b bool) { t.Helper() if b { t.Fatalf("require false, but got true") @@ -89,7 +89,7 @@ func require_Contains(t *testing.T, s string, subStrs ...string) { } } -func require_Error(t *testing.T, err error, expected ...error) { +func require_Error(t testing.TB, err error, expected ...error) { t.Helper() if err == nil { t.Fatalf("require error, but got none") @@ -112,21 +112,21 @@ func require_Error(t *testing.T, err error, expected ...error) { t.Fatalf("Expected one of %v, got '%v'", expected, err) } -func require_Equal[T comparable](t *testing.T, a, b T) { +func require_Equal[T comparable](t testing.TB, a, b T) { t.Helper() if a != b { t.Fatalf("require %T equal, but got: %v != %v", a, a, b) } } -func require_NotEqual[T comparable](t *testing.T, a, b T) { +func require_NotEqual[T comparable](t testing.TB, a, b T) { t.Helper() if a == b { t.Fatalf("require %T not equal, but got: %v == %v", a, a, b) } } -func require_Len(t *testing.T, a, b int) { +func require_Len(t testing.TB, a, b int) { t.Helper() if a != b { t.Fatalf("require len, but got: %v != %v", a, b)