Skip to content

Commit

Permalink
Fix linear scan in LoadNextMsg with partial per-subject info state
Browse files Browse the repository at this point in the history
If the per-subject info didn't have a correct first block (which is possible
as it is populated lazily), then `LoadNextMsg` could end up making a very
expensive call through to `numFilteredPending` which would do a linear scan
of potentially many blocks.

Signed-off-by: Neil Twigg <[email protected]>
  • Loading branch information
neilalexander committed Jun 13, 2024
1 parent f263d75 commit 4396a5d
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 16 deletions.
32 changes: 22 additions & 10 deletions server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -6378,17 +6378,29 @@ func (fs *fileStore) LoadNextMsg(filter string, wc bool, start uint64, sm *Store
// Similar to above if start <= first seq.
// TODO(dlc) - For v2 track these by filter subject since they will represent filtered consumers.
if i == bi {
var ss SimpleState
fs.numFilteredPending(filter, &ss)
// Nothing available.
if ss.Msgs == 0 {
return nil, fs.state.LastSeq, ErrStoreEOF
var fblk uint32
if subjectHasWildcard(filter) {
fs.psim.Match(stringToBytes(filter), func(_ []byte, psi *psi) {
if fblk == 0 || psi.fblk < fblk {
fblk = psi.fblk
}
})
} else if psi, ok := fs.psim.Find(stringToBytes(filter)); ok {
fblk = psi.fblk
}
// See if we can jump ahead here.
// Right now we can only spin on first, so if we have interior sparseness need to favor checking per block fss if loaded.
// For v2 will track all blocks that have matches for psim.
if nbi, _ := fs.selectMsgBlockWithIndex(ss.First); nbi > i {
i = nbi - 1 // For the iterator condition i++
if int(fblk) > bi {
var ss SimpleState
fs.numFilteredPending(filter, &ss)
// Nothing available.
if ss.Msgs == 0 {
return nil, fs.state.LastSeq, ErrStoreEOF
}
// See if we can jump ahead here.
// Right now we can only spin on first, so if we have interior sparseness need to favor checking per block fss if loaded.
// For v2 will track all blocks that have matches for psim.
if nbi, _ := fs.selectMsgBlockWithIndex(ss.First); nbi > i {
i = nbi - 1 // For the iterator condition i++
}
}
}
// Check is we can expire.
Expand Down
55 changes: 55 additions & 0 deletions server/filestore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7254,3 +7254,58 @@ func Benchmark_FileStoreLoadNextMsgVerySparseMsgsInBetweenWithWildcard(b *testin
require_NoError(b, err)
}
}

func Benchmark_FileStoreLoadNextMsgAvoidsLinearBIMScan(b *testing.B) {
fsc := FileStoreConfig{StoreDir: b.TempDir()}
sc := StreamConfig{Name: "zzz", Subjects: []string{"foo.*.*"}, Storage: FileStorage}

fs, err := newFileStore(fsc, sc)
require_NoError(b, err)
defer fs.Stop()

skipBlocks := func(size uint32) {
origLmb := fs.lmb
origIndex := origLmb.index
fs.lmb.index += size
fs.newMsgBlockForWrite()
origLmb.index = origIndex
}

seq, _, err := fs.StoreMsg("foo.1.1", nil, []byte{1, 2, 3, 4, 5})
require_NoError(b, err)
require_Equal(b, seq, 1)

skipBlocks(100_000_000)

seq, _, err = fs.StoreMsg("foo.1.2", nil, []byte{1, 2, 3, 4, 5})
require_NoError(b, err)
require_Equal(b, seq, 2)

fs.newMsgBlockForWrite()
seq, _, err = fs.StoreMsg("foo.1.1", nil, []byte{1, 2, 3, 4, 5})
require_NoError(b, err)
require_Equal(b, seq, 3)

skipBlocks(100_000_000)

seq, _, err = fs.StoreMsg("foo.1.2", nil, []byte{1, 2, 3, 4, 5})
require_NoError(b, err)
require_Equal(b, seq, 4)

// Give the per-subject index amnesia about the flk. This is the
// critical condition for this benchmark to do the right thing.
psi, ok := fs.psim.Find([]byte("foo.1.2"))
require_True(b, ok)
psi.fblk = 0

b.ResetTimer()

// Now start trying to load a message that isn't in the stream yet.
// It needs to match the subject filter from the StreamConfig, but
// not existing will force the linear scan.
var smv StoreMsg
for i := 0; i < b.N; i++ {
_, _, err = fs.LoadNextMsg("foo.1.2", true, 3, &smv)
require_NoError(b, err)
}
}
12 changes: 6 additions & 6 deletions server/test_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,14 @@ func RunRandClientPortServer(t *testing.T) *Server {
return RunServer(&opts)
}

func require_True(t *testing.T, b bool) {
func require_True(t testing.TB, b bool) {
t.Helper()
if !b {
t.Fatalf("require true, but got false")
}
}

func require_False(t *testing.T, b bool) {
func require_False(t testing.TB, b bool) {
t.Helper()
if b {
t.Fatalf("require false, but got true")
Expand Down Expand Up @@ -89,7 +89,7 @@ func require_Contains(t *testing.T, s string, subStrs ...string) {
}
}

func require_Error(t *testing.T, err error, expected ...error) {
func require_Error(t testing.TB, err error, expected ...error) {
t.Helper()
if err == nil {
t.Fatalf("require error, but got none")
Expand All @@ -112,21 +112,21 @@ func require_Error(t *testing.T, err error, expected ...error) {
t.Fatalf("Expected one of %v, got '%v'", expected, err)
}

func require_Equal[T comparable](t *testing.T, a, b T) {
func require_Equal[T comparable](t testing.TB, a, b T) {
t.Helper()
if a != b {
t.Fatalf("require %T equal, but got: %v != %v", a, a, b)
}
}

func require_NotEqual[T comparable](t *testing.T, a, b T) {
func require_NotEqual[T comparable](t testing.TB, a, b T) {
t.Helper()
if a == b {
t.Fatalf("require %T not equal, but got: %v == %v", a, a, b)
}
}

func require_Len(t *testing.T, a, b int) {
func require_Len(t testing.TB, a, b int) {
t.Helper()
if a != b {
t.Fatalf("require len, but got: %v != %v", a, b)
Expand Down

0 comments on commit 4396a5d

Please sign in to comment.