Skip to content

Commit

Permalink
MB-60844: MapReduce for PreSearch (#1999)
Browse files Browse the repository at this point in the history
- Instead of Merging all the PreSearch Results in one shot at the main
coordinator node of an alias tree, merge them incrementally at each
level of the tree instead, which would balance the reduction process
across all the indexes in a distributed Bleve index, leading to a more
even memory distribution.

---------

Co-authored-by: Abhinav Dangeti <[email protected]>
  • Loading branch information
CascadingRadium and abhinavdangeti authored Mar 20, 2024
1 parent a1e4a0e commit e76f594
Show file tree
Hide file tree
Showing 6 changed files with 143 additions and 80 deletions.
92 changes: 41 additions & 51 deletions index_alias_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,16 +163,16 @@ func (i *indexAliasImpl) SearchInContext(ctx context.Context, req *SearchRequest
return nil, ErrorAliasEmpty
}
if _, ok := ctx.Value(search.PreSearchKey).(bool); ok {
// since presearchKey is set, it means that the request
// is being executed as part of a presearch, which
// since preSearchKey is set, it means that the request
// is being executed as part of a preSearch, which
// indicates that this index alias is set as an Index
// in another alias, so we need to do a presearch search
// in another alias, so we need to do a preSearch search
// and NOT a real search
return preSearchDataSearch(ctx, req, i.indexes...)
}

// at this point we know we are doing a real search
// either after a presearch is done, or directly
// either after a preSearch is done, or directly
// on the alias

// check if request has preSearchData which would indicate that the
Expand Down Expand Up @@ -203,9 +203,9 @@ func (i *indexAliasImpl) SearchInContext(ctx context.Context, req *SearchRequest
// check if preSearchData needs to be gathered from all indexes
// before executing the query
var err error
// only perform presearch if
// only perform preSearch if
// - the request does not already have preSearchData
// - the request requires presearch
// - the request requires preSearch
var preSearchDuration time.Duration
var sr *SearchResult
if req.PreSearchData == nil && preSearchRequired(req) {
Expand All @@ -214,15 +214,19 @@ func (i *indexAliasImpl) SearchInContext(ctx context.Context, req *SearchRequest
if err != nil {
return nil, err
}
// check if the presearch result has any errors and if so
// check if the preSearch result has any errors and if so
// return the search result as is without executing the query
// so that the errors are not lost
if preSearchResult.Status.Failed > 0 {
if preSearchResult.Status.Failed > 0 || len(preSearchResult.Status.Errors) > 0 {
return preSearchResult, nil
}
// finalize the preSearch result now
finalizePreSearchResult(req, preSearchResult)

// if there are no errors, then merge the data in the presearch result
preSearchResult = mergePreSearchResult(req, preSearchResult, i.indexes)
// if there are no errors, then merge the data in the preSearch result
// and construct the preSearchData to be used in the actual search
// if the request is satisfied by the preSearch result, then we can
// directly return the preSearch result as the final result
if requestSatisfiedByPreSearch(req) {
sr = finalizeSearchResult(req, preSearchResult)
// no need to run the 2nd phase MultiSearch(..)
Expand All @@ -235,7 +239,7 @@ func (i *indexAliasImpl) SearchInContext(ctx context.Context, req *SearchRequest
preSearchDuration = time.Since(searchStart)
}

// check if search result was generated as part of presearch itself
// check if search result was generated as part of preSearch itself
if sr == nil {
sr, err = MultiSearch(ctx, req, preSearchData, i.indexes...)
if err != nil {
Expand Down Expand Up @@ -533,13 +537,7 @@ func preSearch(ctx context.Context, req *SearchRequest, indexes ...Index) (*Sear
return preSearchDataSearch(newCtx, dummyRequest, indexes...)
}

func tagHitsWithIndexName(sr *SearchResult, indexName string) {
for _, hit := range sr.Hits {
hit.IndexNames = append(hit.IndexNames, indexName)
}
}

// if the request is satisfied by just the presearch result,
// if the request is satisfied by just the preSearch result,
// finalize the result and return it directly without
// performing multi search
func finalizeSearchResult(req *SearchRequest, preSearchResult *SearchResult) *SearchResult {
Expand All @@ -551,7 +549,7 @@ func finalizeSearchResult(req *SearchRequest, preSearchResult *SearchResult) *Se
preSearchResult.Total = uint64(preSearchResult.Hits.Len())
maxScore := float64(0)
for i, hit := range preSearchResult.Hits {
// since we are now using the presearch result as the final result
// since we are now using the preSearch result as the final result
// we can discard the indexNames from the hits as they are no longer
// relevant.
hit.IndexNames = nil
Expand Down Expand Up @@ -587,14 +585,6 @@ func finalizeSearchResult(req *SearchRequest, preSearchResult *SearchResult) *Se
return preSearchResult
}

func mergePreSearchResult(req *SearchRequest, res *SearchResult,
indexes []Index) *SearchResult {
if requestHasKNN(req) {
res.Hits = mergeKNNDocumentMatches(req, res.Hits)
}
return res
}

func requestSatisfiedByPreSearch(req *SearchRequest) bool {
if requestHasKNN(req) && isKNNrequestSatisfiedByPreSearch(req) {
return true
Expand All @@ -609,7 +599,7 @@ func constructPreSearchData(req *SearchRequest, preSearchResult *SearchResult, i
}
var err error
if requestHasKNN(req) {
mergedOut, err = constructKnnPresearchData(mergedOut, preSearchResult, indexes)
mergedOut, err = constructKnnPreSearchData(mergedOut, preSearchResult, indexes)
if err != nil {
return nil, err
}
Expand All @@ -619,50 +609,53 @@ func constructPreSearchData(req *SearchRequest, preSearchResult *SearchResult, i

func preSearchDataSearch(ctx context.Context, req *SearchRequest, indexes ...Index) (*SearchResult, error) {
asyncResults := make(chan *asyncSearchResult, len(indexes))

// run search on each index in separate go routine
var waitGroup sync.WaitGroup

var searchChildIndex = func(in Index, childReq *SearchRequest) {
rv := asyncSearchResult{Name: in.Name()}
rv.Result, rv.Err = in.SearchInContext(ctx, childReq)
asyncResults <- &rv
waitGroup.Done()
}

waitGroup.Add(len(indexes))
for _, in := range indexes {
go searchChildIndex(in, createChildSearchRequest(req, nil))
}

// on another go routine, close after finished
go func() {
waitGroup.Wait()
close(asyncResults)
}()

// the final search result to be returned after combining the preSearch results
var sr *SearchResult
// the preSearch result processor
var prp preSearchResultProcessor
// error map
indexErrors := make(map[string]error)

for asr := range asyncResults {
if asr.Err == nil {
// a valid preSearch result
if prp == nil {
// first valid preSearch result
// create a new preSearch result processor
prp = createPreSearchResultProcessor(req)
}
prp.add(asr.Result, asr.Name)
if sr == nil {
// first result
sr = asr.Result
tagHitsWithIndexName(sr, asr.Name)
sr = &SearchResult{
Status: asr.Result.Status,
Cost: asr.Result.Cost,
}
} else {
// merge with previous
tagHitsWithIndexName(asr.Result, asr.Name)
sr.Merge(asr.Result)
sr.Status.Merge(asr.Result.Status)
sr.Cost += asr.Result.Cost
}
} else {
indexErrors[asr.Name] = asr.Err
}
}

// merge just concatenated all the hits
// now lets clean it up

// handle case where no results were successful
if sr == nil {
sr = &SearchResult{
Expand All @@ -671,16 +664,12 @@ func preSearchDataSearch(ctx context.Context, req *SearchRequest, indexes ...Ind
},
}
}

// in presearch partial results are not allowed as it can lead to
// in preSearch, partial results are not allowed as it can lead to
// the real search giving incorrect results, and hence the search
// result is reset.
// discard partial hits if some child index has failed or
// if some child alias has returned partial results.
// result is not populated with any of the processed data from
// the preSearch result processor if there are any errors
// or the preSearch result status has any failures
if len(indexErrors) > 0 || sr.Status.Failed > 0 {
sr = &SearchResult{
Status: sr.Status,
}
if sr.Status.Errors == nil {
sr.Status.Errors = make(map[string]error)
}
Expand All @@ -689,8 +678,9 @@ func preSearchDataSearch(ctx context.Context, req *SearchRequest, indexes ...Ind
sr.Status.Total++
sr.Status.Failed++
}
} else {
prp.finalize(sr)
}

return sr, nil
}

Expand Down
1 change: 0 additions & 1 deletion index_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -549,7 +549,6 @@ func (i *indexImpl) SearchInContext(ctx context.Context, req *SearchRequest) (sr
ctx = context.WithValue(ctx, search.GeoBufferPoolCallbackKey,
search.GeoBufferPoolCallbackFunc(getBufferPool))


searcher, err := req.Query.Searcher(ctx, indexReader, i.m, search.SearcherOptions{
Explain: req.Explain,
IncludeTermVectors: req.IncludeLocations || req.Highlight != nil,
Expand Down
59 changes: 59 additions & 0 deletions pre_search.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Copyright (c) 2024 Couchbase, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package bleve

// A preSearchResultProcessor processes the data in
// the preSearch result from multiple
// indexes in an alias and merges them together to
// create the final preSearch result
type preSearchResultProcessor interface {
// adds the preSearch result to the processor
add(*SearchResult, string)
// updates the final search result with the finalized
// data from the processor
finalize(*SearchResult)
}

type knnPreSearchResultProcessor struct {
addFn func(sr *SearchResult, indexName string)
finalizeFn func(sr *SearchResult)
}

func (k *knnPreSearchResultProcessor) add(sr *SearchResult, indexName string) {
if k.addFn != nil {
k.addFn(sr, indexName)
}
}

func (k *knnPreSearchResultProcessor) finalize(sr *SearchResult) {
if k.finalizeFn != nil {
k.finalizeFn(sr)
}
}

// -----------------------------------------------------------------------------

func finalizePreSearchResult(req *SearchRequest, preSearchResult *SearchResult) {
if requestHasKNN(req) {
preSearchResult.Hits = finalizeKNNResults(req, preSearchResult.Hits)
}
}

func createPreSearchResultProcessor(req *SearchRequest) preSearchResultProcessor {
if requestHasKNN(req) {
return newKnnPreSearchResultProcessor(req)
}
return &knnPreSearchResultProcessor{} // equivalent to nil
}
2 changes: 1 addition & 1 deletion search/scorer/scorer_knn.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func NewKNNQueryScorer(queryVector []float32, queryField string, queryBoost floa

// Score used when the knnMatch.Score = 0 ->
// the query and indexed vector are exactly the same.
const maxKNNScore = math.MaxFloat64
const maxKNNScore = math.MaxFloat32

func (sqs *KNNQueryScorer) Score(ctx *search.SearchContext,
knnMatch *index.VectorDoc) *search.DocumentMatch {
Expand Down
55 changes: 33 additions & 22 deletions search_knn.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,9 +294,9 @@ func (i *indexImpl) runKnnCollector(ctx context.Context, req *SearchRequest, rea
}
knnHits := knnCollector.Results()
if !preSearch {
knnHits = finalizeKNNResults(req, knnHits, len(req.KNN))
knnHits = finalizeKNNResults(req, knnHits)
}
// at this point, irrespective of whether it is a presearch or not,
// at this point, irrespective of whether it is a preSearch or not,
// the knn hits are populated with Sort and Fields.
// it must be ensured downstream that the Sort and Fields are not
// re-evaluated, for these hits.
Expand Down Expand Up @@ -324,13 +324,13 @@ func setKnnHitsInCollector(knnHits []*search.DocumentMatch, req *SearchRequest,
}
}

func finalizeKNNResults(req *SearchRequest, knnHits []*search.DocumentMatch, numKNNQueries int) []*search.DocumentMatch {
func finalizeKNNResults(req *SearchRequest, knnHits []*search.DocumentMatch) []*search.DocumentMatch {
// if the KNN operator is AND, then we need to filter out the hits that
// do not have match the KNN queries.
if req.KNNOperator == knnOperatorAnd {
idx := 0
for _, hit := range knnHits {
if len(hit.ScoreBreakdown) == numKNNQueries {
if len(hit.ScoreBreakdown) == len(req.KNN) {
knnHits[idx] = hit
idx++
}
Expand Down Expand Up @@ -362,22 +362,6 @@ func finalizeKNNResults(req *SearchRequest, knnHits []*search.DocumentMatch, num
return knnHits
}

func mergeKNNDocumentMatches(req *SearchRequest, knnHits []*search.DocumentMatch) []*search.DocumentMatch {
kArray := make([]int64, len(req.KNN))
for i, knnReq := range req.KNN {
kArray[i] = knnReq.K
}
knnStore := collector.GetNewKNNCollectorStore(kArray)
for _, hit := range knnHits {
knnStore.AddDocument(hit)
}
// passing nil as the document fixup function, because we don't need to
// fixup the document, since this was already done in the first phase.
// hence error is always nil.
mergedKNNhits, _ := knnStore.Final(nil)
return finalizeKNNResults(req, mergedKNNhits, len(req.KNN))
}

// when we are setting KNN hits in the preSearchData, we need to make sure that
// the KNN hit goes to the right index. This is because the KNN hits are
// collected from all the indexes in the alias, but the preSearchData is
Expand Down Expand Up @@ -433,7 +417,7 @@ func requestHasKNN(req *SearchRequest) bool {
}

// returns true if the search request contains a KNN request that can be
// satisfied by just performing a presearch, completely bypassing the
// satisfied by just performing a preSearch, completely bypassing the
// actual search.
func isKNNrequestSatisfiedByPreSearch(req *SearchRequest) bool {
// if req.Query is not match_none => then we need to go to phase 2
Expand All @@ -456,7 +440,7 @@ func isKNNrequestSatisfiedByPreSearch(req *SearchRequest) bool {
return true
}

func constructKnnPresearchData(mergedOut map[string]map[string]interface{}, preSearchResult *SearchResult,
func constructKnnPreSearchData(mergedOut map[string]map[string]interface{}, preSearchResult *SearchResult,
indexes []Index) (map[string]map[string]interface{}, error) {

distributedHits, err := validateAndDistributeKNNHits([]*search.DocumentMatch(preSearchResult.Hits), indexes)
Expand Down Expand Up @@ -511,3 +495,30 @@ func redistributeKNNPreSearchData(req *SearchRequest, indexes []Index) (map[stri
}
return rv, nil
}

func newKnnPreSearchResultProcessor(req *SearchRequest) *knnPreSearchResultProcessor {
kArray := make([]int64, len(req.KNN))
for i, knnReq := range req.KNN {
kArray[i] = knnReq.K
}
knnStore := collector.GetNewKNNCollectorStore(kArray)
return &knnPreSearchResultProcessor{
addFn: func(sr *SearchResult, indexName string) {
for _, hit := range sr.Hits {
// tag the hit with the index name, so that when the
// final search result is constructed, the hit will have
// a valid path to follow along the alias tree to reach
// the index.
hit.IndexNames = append(hit.IndexNames, indexName)
knnStore.AddDocument(hit)
}
},
finalizeFn: func(sr *SearchResult) {
// passing nil as the document fixup function, because we don't need to
// fixup the document, since this was already done in the first phase,
// hence error is always nil.
// the merged knn hits are finalized and set in the search result.
sr.Hits, _ = knnStore.Final(nil)
},
}
}
Loading

0 comments on commit e76f594

Please sign in to comment.