Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix linear scan in LoadNextMsg with partial per-subject info state #5532

Merged
merged 1 commit into from
Jun 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 22 additions & 10 deletions server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -6378,17 +6378,29 @@ func (fs *fileStore) LoadNextMsg(filter string, wc bool, start uint64, sm *Store
// Similar to above if start <= first seq.
// TODO(dlc) - For v2 track these by filter subject since they will represent filtered consumers.
if i == bi {
var ss SimpleState
fs.numFilteredPending(filter, &ss)
// Nothing available.
if ss.Msgs == 0 {
return nil, fs.state.LastSeq, ErrStoreEOF
var fblk uint32
if subjectHasWildcard(filter) {
fs.psim.Match(stringToBytes(filter), func(_ []byte, psi *psi) {
if fblk == 0 || psi.fblk < fblk {
fblk = psi.fblk
}
})
} else if psi, ok := fs.psim.Find(stringToBytes(filter)); ok {
fblk = psi.fblk
}
// See if we can jump ahead here.
// Right now we can only spin on first, so if we have interior sparseness need to favor checking per block fss if loaded.
// For v2 will track all blocks that have matches for psim.
if nbi, _ := fs.selectMsgBlockWithIndex(ss.First); nbi > i {
i = nbi - 1 // For the iterator condition i++
if int(fblk) > bi {
var ss SimpleState
fs.numFilteredPending(filter, &ss)
// Nothing available.
if ss.Msgs == 0 {
return nil, fs.state.LastSeq, ErrStoreEOF
}
// See if we can jump ahead here.
// Right now we can only spin on first, so if we have interior sparseness need to favor checking per block fss if loaded.
// For v2 will track all blocks that have matches for psim.
if nbi, _ := fs.selectMsgBlockWithIndex(ss.First); nbi > i {
i = nbi - 1 // For the iterator condition i++
}
}
}
// Check is we can expire.
Expand Down
55 changes: 55 additions & 0 deletions server/filestore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7254,3 +7254,58 @@ func Benchmark_FileStoreLoadNextMsgVerySparseMsgsInBetweenWithWildcard(b *testin
require_NoError(b, err)
}
}

func Benchmark_FileStoreLoadNextMsgAvoidsLinearBIMScan(b *testing.B) {
fsc := FileStoreConfig{StoreDir: b.TempDir()}
sc := StreamConfig{Name: "zzz", Subjects: []string{"foo.*.*"}, Storage: FileStorage}

fs, err := newFileStore(fsc, sc)
require_NoError(b, err)
defer fs.Stop()

skipBlocks := func(size uint32) {
origLmb := fs.lmb
origIndex := origLmb.index
fs.lmb.index += size
fs.newMsgBlockForWrite()
origLmb.index = origIndex
}

seq, _, err := fs.StoreMsg("foo.1.1", nil, []byte{1, 2, 3, 4, 5})
require_NoError(b, err)
require_Equal(b, seq, 1)

skipBlocks(100_000_000)

seq, _, err = fs.StoreMsg("foo.1.2", nil, []byte{1, 2, 3, 4, 5})
require_NoError(b, err)
require_Equal(b, seq, 2)

fs.newMsgBlockForWrite()
seq, _, err = fs.StoreMsg("foo.1.1", nil, []byte{1, 2, 3, 4, 5})
require_NoError(b, err)
require_Equal(b, seq, 3)

skipBlocks(100_000_000)

seq, _, err = fs.StoreMsg("foo.1.2", nil, []byte{1, 2, 3, 4, 5})
require_NoError(b, err)
require_Equal(b, seq, 4)

// Give the per-subject index amnesia about the flk. This is the
// critical condition for this benchmark to do the right thing.
psi, ok := fs.psim.Find([]byte("foo.1.2"))
require_True(b, ok)
psi.fblk = 0

b.ResetTimer()

// Now start trying to load a message that isn't in the stream yet.
// It needs to match the subject filter from the StreamConfig, but
// not existing will force the linear scan.
var smv StoreMsg
for i := 0; i < b.N; i++ {
_, _, err = fs.LoadNextMsg("foo.1.2", true, 3, &smv)
require_NoError(b, err)
}
}
12 changes: 6 additions & 6 deletions server/test_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,14 @@ func RunRandClientPortServer(t *testing.T) *Server {
return RunServer(&opts)
}

func require_True(t *testing.T, b bool) {
func require_True(t testing.TB, b bool) {
t.Helper()
if !b {
t.Fatalf("require true, but got false")
}
}

func require_False(t *testing.T, b bool) {
func require_False(t testing.TB, b bool) {
t.Helper()
if b {
t.Fatalf("require false, but got true")
Expand Down Expand Up @@ -89,7 +89,7 @@ func require_Contains(t *testing.T, s string, subStrs ...string) {
}
}

func require_Error(t *testing.T, err error, expected ...error) {
func require_Error(t testing.TB, err error, expected ...error) {
t.Helper()
if err == nil {
t.Fatalf("require error, but got none")
Expand All @@ -112,21 +112,21 @@ func require_Error(t *testing.T, err error, expected ...error) {
t.Fatalf("Expected one of %v, got '%v'", expected, err)
}

func require_Equal[T comparable](t *testing.T, a, b T) {
func require_Equal[T comparable](t testing.TB, a, b T) {
t.Helper()
if a != b {
t.Fatalf("require %T equal, but got: %v != %v", a, a, b)
}
}

func require_NotEqual[T comparable](t *testing.T, a, b T) {
func require_NotEqual[T comparable](t testing.TB, a, b T) {
t.Helper()
if a == b {
t.Fatalf("require %T not equal, but got: %v == %v", a, a, b)
}
}

func require_Len(t *testing.T, a, b int) {
func require_Len(t testing.TB, a, b int) {
t.Helper()
if a != b {
t.Fatalf("require len, but got: %v != %v", a, b)
Expand Down