Skip to content

Commit

Permalink
Revert "Fix linear scan in LoadNextMsg with partial per-subject inf…
Browse files Browse the repository at this point in the history
…o state"

This reverts commit 4396a5d.
  • Loading branch information
neilalexander committed Jun 14, 2024
1 parent c5927f8 commit bf3cbcc
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 83 deletions.
32 changes: 10 additions & 22 deletions server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -6378,29 +6378,17 @@ func (fs *fileStore) LoadNextMsg(filter string, wc bool, start uint64, sm *Store
// Similar to above if start <= first seq.
// TODO(dlc) - For v2 track these by filter subject since they will represent filtered consumers.
if i == bi {
var fblk uint32
if subjectHasWildcard(filter) {
fs.psim.Match(stringToBytes(filter), func(_ []byte, psi *psi) {
if fblk == 0 || psi.fblk < fblk {
fblk = psi.fblk
}
})
} else if psi, ok := fs.psim.Find(stringToBytes(filter)); ok {
fblk = psi.fblk
var ss SimpleState
fs.numFilteredPending(filter, &ss)
// Nothing available.
if ss.Msgs == 0 {
return nil, fs.state.LastSeq, ErrStoreEOF
}
if int(fblk) > bi {
var ss SimpleState
fs.numFilteredPending(filter, &ss)
// Nothing available.
if ss.Msgs == 0 {
return nil, fs.state.LastSeq, ErrStoreEOF
}
// See if we can jump ahead here.
// Right now we can only spin on first, so if we have interior sparseness need to favor checking per block fss if loaded.
// For v2 will track all blocks that have matches for psim.
if nbi, _ := fs.selectMsgBlockWithIndex(ss.First); nbi > i {
i = nbi - 1 // For the iterator condition i++
}
// See if we can jump ahead here.
// Right now we can only spin on first, so if we have interior sparseness need to favor checking per block fss if loaded.
// For v2 will track all blocks that have matches for psim.
if nbi, _ := fs.selectMsgBlockWithIndex(ss.First); nbi > i {
i = nbi - 1 // For the iterator condition i++
}
}
// Check is we can expire.
Expand Down
55 changes: 0 additions & 55 deletions server/filestore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7254,58 +7254,3 @@ func Benchmark_FileStoreLoadNextMsgVerySparseMsgsInBetweenWithWildcard(b *testin
require_NoError(b, err)
}
}

func Benchmark_FileStoreLoadNextMsgAvoidsLinearBIMScan(b *testing.B) {
fsc := FileStoreConfig{StoreDir: b.TempDir()}
sc := StreamConfig{Name: "zzz", Subjects: []string{"foo.*.*"}, Storage: FileStorage}

fs, err := newFileStore(fsc, sc)
require_NoError(b, err)
defer fs.Stop()

skipBlocks := func(size uint32) {
origLmb := fs.lmb
origIndex := origLmb.index
fs.lmb.index += size
fs.newMsgBlockForWrite()
origLmb.index = origIndex
}

seq, _, err := fs.StoreMsg("foo.1.1", nil, []byte{1, 2, 3, 4, 5})
require_NoError(b, err)
require_Equal(b, seq, 1)

skipBlocks(100_000_000)

seq, _, err = fs.StoreMsg("foo.1.2", nil, []byte{1, 2, 3, 4, 5})
require_NoError(b, err)
require_Equal(b, seq, 2)

fs.newMsgBlockForWrite()
seq, _, err = fs.StoreMsg("foo.1.1", nil, []byte{1, 2, 3, 4, 5})
require_NoError(b, err)
require_Equal(b, seq, 3)

skipBlocks(100_000_000)

seq, _, err = fs.StoreMsg("foo.1.2", nil, []byte{1, 2, 3, 4, 5})
require_NoError(b, err)
require_Equal(b, seq, 4)

// Give the per-subject index amnesia about the flk. This is the
// critical condition for this benchmark to do the right thing.
psi, ok := fs.psim.Find([]byte("foo.1.2"))
require_True(b, ok)
psi.fblk = 0

b.ResetTimer()

// Now start trying to load a message that isn't in the stream yet.
// It needs to match the subject filter from the StreamConfig, but
// not existing will force the linear scan.
var smv StoreMsg
for i := 0; i < b.N; i++ {
_, _, err = fs.LoadNextMsg("foo.1.2", true, 3, &smv)
require_NoError(b, err)
}
}
12 changes: 6 additions & 6 deletions server/test_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,14 @@ func RunRandClientPortServer(t *testing.T) *Server {
return RunServer(&opts)
}

func require_True(t testing.TB, b bool) {
func require_True(t *testing.T, b bool) {
t.Helper()
if !b {
t.Fatalf("require true, but got false")
}
}

func require_False(t testing.TB, b bool) {
func require_False(t *testing.T, b bool) {
t.Helper()
if b {
t.Fatalf("require false, but got true")
Expand Down Expand Up @@ -89,7 +89,7 @@ func require_Contains(t *testing.T, s string, subStrs ...string) {
}
}

func require_Error(t testing.TB, err error, expected ...error) {
func require_Error(t *testing.T, err error, expected ...error) {
t.Helper()
if err == nil {
t.Fatalf("require error, but got none")
Expand All @@ -112,21 +112,21 @@ func require_Error(t testing.TB, err error, expected ...error) {
t.Fatalf("Expected one of %v, got '%v'", expected, err)
}

func require_Equal[T comparable](t testing.TB, a, b T) {
func require_Equal[T comparable](t *testing.T, a, b T) {
t.Helper()
if a != b {
t.Fatalf("require %T equal, but got: %v != %v", a, a, b)
}
}

func require_NotEqual[T comparable](t testing.TB, a, b T) {
func require_NotEqual[T comparable](t *testing.T, a, b T) {
t.Helper()
if a == b {
t.Fatalf("require %T not equal, but got: %v == %v", a, a, b)
}
}

func require_Len(t testing.TB, a, b int) {
func require_Len(t *testing.T, a, b int) {
t.Helper()
if a != b {
t.Fatalf("require len, but got: %v != %v", a, b)
Expand Down

0 comments on commit bf3cbcc

Please sign in to comment.