Skip to content

Commit

Permalink
[IMPROVED] Filtered consumers retrieval speed. (#5089)
Browse files Browse the repository at this point in the history
When doing firstMatching we were picking wrong on linearScans. We also
did not optimize the subjectIsSubsetMatch functionality.

Signed-off-by: Derek Collison <[email protected]>
  • Loading branch information
derekcollison authored Feb 15, 2024
2 parents 1b542ab + 71dede1 commit 7e30546
Showing 1 changed file with 24 additions and 6 deletions.
30 changes: 24 additions & 6 deletions server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -2246,22 +2246,33 @@ func (mb *msgBlock) firstMatching(filter string, wc bool, start uint64, sm *Stor
if !isAll && len(mb.fss) == 1 {
_, isAll = mb.fss[filter]
}
// Skip scan of mb.fss if number of messages in the block are less than
// 1/2 the number of subjects in mb.fss. Or we have a wc and lots of fss entries.
const linearScanMaxFSS = 32
// Make sure to start at mb.first.seq if fseq < mb.first.seq
if seq := atomic.LoadUint64(&mb.first.seq); seq > fseq {
fseq = seq
}
lseq := atomic.LoadUint64(&mb.last.seq)
doLinearScan := isAll || 2*int(lseq-fseq) < len(mb.fss) || (wc && len(mb.fss) > linearScanMaxFSS)

// Optionally build the isMatch for wildcard filters.
tsa := [32]string{}
fsa := [32]string{}
var fts []string
var isMatch func(subj string) bool
// Decide to build.
if wc {
fts = tokenizeSubjectIntoSlice(fsa[:0], filter)
isMatch = func(subj string) bool {
tts := tokenizeSubjectIntoSlice(tsa[:0], subj)
return isSubsetMatchTokenized(tts, fts)
}
}
// Only do linear scan if isAll or we are wildcarded and have to traverse more fss than actual messages.
doLinearScan := isAll || (wc && len(mb.fss) > int(lseq-fseq))
if !doLinearScan {
// If we have a wildcard match against all tracked subjects we know about.
if wc {
subs = subs[:0]
for subj := range mb.fss {
if subjectIsSubsetMatch(subj, filter) {
if isMatch(subj) {
subs = append(subs, subj)
}
}
Expand All @@ -2283,6 +2294,13 @@ func (mb *msgBlock) firstMatching(filter string, wc bool, start uint64, sm *Stor
}
}

// If we guess to not do a linear scan, but the above resulted in alot of subs that will
// need to be checked for every scanned message, revert.
// TODO(dlc) - we could memoize the subs across calls.
if len(subs) > int(lseq-fseq) {
doLinearScan = true
}

if fseq > lseq {
return nil, didLoad, ErrStoreMsgNotFound
}
Expand Down Expand Up @@ -2310,7 +2328,7 @@ func (mb *msgBlock) firstMatching(filter string, wc bool, start uint64, sm *Stor
return fsm, expireOk, nil
}
if doLinearScan {
if wc && subjectIsSubsetMatch(fsm.subj, filter) {
if wc && isMatch(sm.subj) {
return fsm, expireOk, nil
} else if !wc && fsm.subj == filter {
return fsm, expireOk, nil
Expand Down

0 comments on commit 7e30546

Please sign in to comment.