Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

switch to segment.Bitmap interface #1557

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions index/scorch/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"os"
"sync"

"github.com/RoaringBitmap/roaring"
index "github.com/blevesearch/bleve_index_api"
segment "github.com/blevesearch/scorch_segment_api/v2"
bolt "go.etcd.io/bbolt"
Expand Down Expand Up @@ -209,7 +208,7 @@ func (o *Builder) doMerge() error {

// do the merge
mergedSegPath := o.buildPath + string(os.PathSeparator) + zapFileName(o.segCount)
drops := make([]*roaring.Bitmap, mergeCount)
drops := make([]segment.Bitmap, mergeCount)
_, _, err := o.segPlugin.Merge(mergeSegs, drops, mergedSegPath, nil, nil)
if err != nil {
_ = closeOpenedSegs()
Expand Down
11 changes: 5 additions & 6 deletions index/scorch/introducer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,14 @@ import (
"fmt"
"sync/atomic"

"github.com/RoaringBitmap/roaring"
index "github.com/blevesearch/bleve_index_api"
segment "github.com/blevesearch/scorch_segment_api/v2"
)

type segmentIntroduction struct {
id uint64
data segment.Segment
obsoletes map[uint64]*roaring.Bitmap
obsoletes map[uint64]segment.Bitmap
ids []string
internal map[string][]byte

Expand Down Expand Up @@ -143,7 +142,7 @@ func (s *Scorch) introduceSegment(next *segmentIntroduction) error {
if root.segment[i].deleted == nil {
newss.deleted = delta
} else {
newss.deleted = roaring.Or(root.segment[i].deleted, delta)
newss.deleted = root.segment[i].deleted.OrNew(delta)
}
if newss.deleted.IsEmpty() {
newss.deleted = nil
Expand Down Expand Up @@ -321,7 +320,7 @@ func (s *Scorch) introduceMerge(nextMerge *segmentMerge) {
}

// iterate through current segments
newSegmentDeleted := roaring.NewBitmap()
newSegmentDeleted := s.segPlugin.NewBitmap()
var running, docsToPersistCount, memSegments, fileSegments uint64
for i := range root.segment {
segmentID := root.segment[i].id
Expand All @@ -332,7 +331,7 @@ func (s *Scorch) introduceMerge(nextMerge *segmentMerge) {
deletedSince := root.segment[i].deleted
// if we already knew about some of them, remove
if segSnapAtMerge.deleted != nil {
deletedSince = roaring.AndNot(root.segment[i].deleted, segSnapAtMerge.deleted)
deletedSince = root.segment[i].deleted.AndNotNew(segSnapAtMerge.deleted)
}
deletedSinceItr := deletedSince.Iterator()
for deletedSinceItr.HasNext() {
Expand Down Expand Up @@ -373,7 +372,7 @@ func (s *Scorch) introduceMerge(nextMerge *segmentMerge) {
// merged segment wrt the current root segments, hence
// applying the obsolete segment contents to newly merged segment
for segID, ss := range nextMerge.old {
obsoleted := ss.DocNumbersLive()
obsoleted := ss.DocNumbersLive(s.segPlugin.NewBitmap())
if obsoleted != nil {
obsoletedIter := obsoleted.Iterator()
for obsoletedIter.HasNext() {
Expand Down
5 changes: 2 additions & 3 deletions index/scorch/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"sync/atomic"
"time"

"github.com/RoaringBitmap/roaring"
"github.com/blevesearch/bleve/v2/index/scorch/mergeplan"
segment "github.com/blevesearch/scorch_segment_api/v2"
)
Expand Down Expand Up @@ -285,7 +284,7 @@ func (s *Scorch) planMergeAtSnapshot(ctx context.Context,
oldMap := make(map[uint64]*SegmentSnapshot)
newSegmentID := atomic.AddUint64(&s.nextSegmentID, 1)
segmentsToMerge := make([]segment.Segment, 0, len(task.Segments))
docsToDrop := make([]*roaring.Bitmap, 0, len(task.Segments))
docsToDrop := make([]segment.Bitmap, 0, len(task.Segments))

for _, planSegment := range task.Segments {
if segSnapshot, ok := planSegment.(*SegmentSnapshot); ok {
Expand Down Expand Up @@ -422,7 +421,7 @@ type segmentMerge struct {
// persisted segment, and synchronously introduce that new segment
// into the root
func (s *Scorch) mergeSegmentBases(snapshot *IndexSnapshot,
sbs []segment.Segment, sbsDrops []*roaring.Bitmap,
sbs []segment.Segment, sbsDrops []segment.Bitmap,
sbsIndexes []int) (*IndexSnapshot, uint64, error) {
atomic.AddUint64(&s.stats.TotMemMergeBeg, 1)

Expand Down
17 changes: 8 additions & 9 deletions index/scorch/optimize.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package scorch

import (
"fmt"
"github.com/RoaringBitmap/roaring"
index "github.com/blevesearch/bleve_index_api"
segment "github.com/blevesearch/scorch_segment_api/v2"
"sync/atomic"
Expand Down Expand Up @@ -89,7 +88,7 @@ func (o *OptimizeTFRConjunction) Finish() (index.Optimized, error) {
continue
}

bm := roaring.And(itr0.ActualBitmap(), itr1.ActualBitmap())
bm := itr0.ActualBitmap().AndNew(itr1.ActualBitmap())

for _, tfr := range o.tfrs[2:] {
itr, ok := tfr.iterators[i].(segment.OptimizablePostingsIterator)
Expand Down Expand Up @@ -164,7 +163,7 @@ func (o *OptimizeTFRConjunctionUnadorned) Finish() (rv index.Optimized, err erro
oTFR := o.snapshot.unadornedTermFieldReader(
OptimizeTFRConjunctionUnadornedTerm, OptimizeTFRConjunctionUnadornedField)

var actualBMs []*roaring.Bitmap // Collected from regular posting lists.
var actualBMs []segment.Bitmap // Collected from regular posting lists.

OUTER:
for i := range o.snapshot.segment {
Expand Down Expand Up @@ -248,7 +247,7 @@ OUTER:
}

// Else, AND together our collected bitmaps as our result.
bm := roaring.And(actualBMs[0], actualBMs[1])
bm := actualBMs[0].AndNew(actualBMs[1])

for _, actualBM := range actualBMs[2:] {
bm.And(actualBM)
Expand Down Expand Up @@ -331,7 +330,7 @@ func (o *OptimizeTFRDisjunctionUnadorned) Finish() (rv index.Optimized, err erro
OptimizeTFRDisjunctionUnadornedTerm, OptimizeTFRDisjunctionUnadornedField)

var docNums []uint32 // Collected docNum's from 1-hit posting lists.
var actualBMs []*roaring.Bitmap // Collected from regular posting lists.
var actualBMs []segment.Bitmap // Collected from regular posting lists.

for i := range o.snapshot.segment {
docNums = docNums[:0]
Expand All @@ -354,17 +353,17 @@ func (o *OptimizeTFRDisjunctionUnadorned) Finish() (rv index.Optimized, err erro
}
}

var bm *roaring.Bitmap
var bm segment.Bitmap
if len(actualBMs) > 2 {
bm = roaring.HeapOr(actualBMs...)
bm = actualBMs[0].HeapOrNew(actualBMs[1:]...)
} else if len(actualBMs) == 2 {
bm = roaring.Or(actualBMs[0], actualBMs[1])
bm = actualBMs[0].OrNew(actualBMs[1])
} else if len(actualBMs) == 1 {
bm = actualBMs[0].Clone()
}

if bm == nil {
bm = roaring.New()
bm = o.snapshot.parent.segPlugin.NewBitmap()
}

bm.AddMany(docNums)
Expand Down
13 changes: 6 additions & 7 deletions index/scorch/persister.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"sync/atomic"
"time"

"github.com/RoaringBitmap/roaring"
index "github.com/blevesearch/bleve_index_api"
segment "github.com/blevesearch/scorch_segment_api/v2"
bolt "go.etcd.io/bbolt"
Expand Down Expand Up @@ -358,7 +357,7 @@ func (s *Scorch) persistSnapshotMaybeMerge(snapshot *IndexSnapshot) (
bool, error) {
// collect the in-memory zap segments (SegmentBase instances)
var sbs []segment.Segment
var sbsDrops []*roaring.Bitmap
var sbsDrops []segment.Bitmap
var sbsIndexes []int

for i, segmentSnapshot := range snapshot.segment {
Expand Down Expand Up @@ -506,13 +505,13 @@ func prepareBoltSnapshot(snapshot *IndexSnapshot, tx *bolt.Tx, path string,
return nil, nil, fmt.Errorf("unknown segment type: %T", seg)
}
// store current deleted bits
var roaringBuf bytes.Buffer
var bitmapBuf bytes.Buffer
if segmentSnapshot.deleted != nil {
_, err = segmentSnapshot.deleted.WriteTo(&roaringBuf)
_, err = segmentSnapshot.deleted.WriteTo(&bitmapBuf)
if err != nil {
return nil, nil, fmt.Errorf("error persisting roaring bytes: %v", err)
return nil, nil, fmt.Errorf("error persisting bitmap bytes: %v", err)
}
err = snapshotSegmentBucket.Put(boltDeletedKey, roaringBuf.Bytes())
err = snapshotSegmentBucket.Put(boltDeletedKey, bitmapBuf.Bytes())
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -774,7 +773,7 @@ func (s *Scorch) loadSegment(segmentBucket *bolt.Bucket) (*SegmentSnapshot, erro
}
deletedBytes := segmentBucket.Get(boltDeletedKey)
if deletedBytes != nil {
deletedBitmap := roaring.NewBitmap()
deletedBitmap := s.segPlugin.NewBitmap()
r := bytes.NewReader(deletedBytes)
_, err := deletedBitmap.ReadFrom(r)
if err != nil {
Expand Down
3 changes: 1 addition & 2 deletions index/scorch/scorch.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"sync/atomic"
"time"

"github.com/RoaringBitmap/roaring"
"github.com/blevesearch/bleve/v2/registry"
index "github.com/blevesearch/bleve_index_api"
segment "github.com/blevesearch/scorch_segment_api/v2"
Expand Down Expand Up @@ -413,7 +412,7 @@ func (s *Scorch) prepareSegment(newSegment segment.Segment, ids []string,
id: atomic.AddUint64(&s.nextSegmentID, 1),
data: newSegment,
ids: ids,
obsoletes: make(map[uint64]*roaring.Bitmap),
obsoletes: make(map[uint64]segment.Bitmap),
internal: internalOps,
applied: make(chan error),
persistedCallback: persistedCallback,
Expand Down
8 changes: 5 additions & 3 deletions index/scorch/segment_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,8 @@ package scorch

import (
"fmt"
"github.com/RoaringBitmap/roaring"
index "github.com/blevesearch/bleve_index_api"

index "github.com/blevesearch/bleve_index_api"
segment "github.com/blevesearch/scorch_segment_api/v2"

zapv11 "github.com/blevesearch/zapx/v11"
Expand Down Expand Up @@ -61,9 +60,12 @@ type SegmentPlugin interface {
// document number in the newly merged segment.
// The number of bytes written to the new segment file.
// An error, if any occurred.
Merge(segments []segment.Segment, drops []*roaring.Bitmap, path string,
Merge(segments []segment.Segment, drops []segment.Bitmap, path string,
closeCh chan struct{}, s segment.StatsReporter) (
[][]uint64, uint64, error)

// NewBitmap returns a new empty Bitmap compatible with this implementation
NewBitmap() segment.Bitmap
}

var supportedSegmentPlugins map[string]map[uint32]SegmentPlugin
Expand Down
9 changes: 4 additions & 5 deletions index/scorch/snapshot_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"sync"
"sync/atomic"

"github.com/RoaringBitmap/roaring"
"github.com/blevesearch/bleve/v2/document"
index "github.com/blevesearch/bleve_index_api"
segment "github.com/blevesearch/scorch_segment_api/v2"
Expand All @@ -39,7 +38,7 @@ type asynchSegmentResult struct {
dictItr segment.DictionaryIterator

index int
docs *roaring.Bitmap
docs segment.Bitmap

postings segment.PostingsList

Expand Down Expand Up @@ -302,7 +301,7 @@ func (i *IndexSnapshot) DocIDReaderAll() (index.DocIDReader, error) {
go func(index int, segment *SegmentSnapshot) {
results <- &asynchSegmentResult{
index: index,
docs: segment.DocNumbersLive(),
docs: segment.DocNumbersLive(i.parent.segPlugin.NewBitmap()),
}
}(index, segment)
}
Expand Down Expand Up @@ -332,7 +331,7 @@ func (i *IndexSnapshot) DocIDReaderOnly(ids []string) (index.DocIDReader, error)
func (i *IndexSnapshot) newDocIDReader(results chan *asynchSegmentResult) (index.DocIDReader, error) {
rv := &IndexSnapshotDocIDReader{
snapshot: i,
iterators: make([]roaring.IntIterable, len(i.segment)),
iterators: make([]segment.IntIterable, len(i.segment)),
}
var err error
for count := 0; count < len(i.segment); count++ {
Expand Down Expand Up @@ -557,7 +556,7 @@ func (i *IndexSnapshot) recycleTermFieldReader(tfr *IndexSnapshotTermFieldReader
if !tfr.recycle {
// Do not recycle an optimized unadorned term field reader (used for
// ConjunctionUnadorned or DisjunctionUnadorned), during when a fresh
// roaring.Bitmap is built by AND-ing or OR-ing individual bitmaps,
// segment.Bitmap is built by AND-ing or OR-ing individual bitmaps,
// and we'll need to release them for GC. (See MB-40916)
return
}
Expand Down
4 changes: 2 additions & 2 deletions index/scorch/snapshot_index_doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ package scorch

import (
"bytes"
segment "github.com/blevesearch/scorch_segment_api/v2"
"reflect"

"github.com/RoaringBitmap/roaring"
"github.com/blevesearch/bleve/v2/size"
index "github.com/blevesearch/bleve_index_api"
)
Expand All @@ -32,7 +32,7 @@ func init() {

type IndexSnapshotDocIDReader struct {
snapshot *IndexSnapshot
iterators []roaring.IntIterable
iterators []segment.IntIterable
segmentOffset int
}

Expand Down
10 changes: 4 additions & 6 deletions index/scorch/snapshot_segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"sync"
"sync/atomic"

"github.com/RoaringBitmap/roaring"
"github.com/blevesearch/bleve/v2/size"
index "github.com/blevesearch/bleve_index_api"
segment "github.com/blevesearch/scorch_segment_api/v2"
Expand All @@ -32,7 +31,7 @@ var TermSeparatorSplitSlice = []byte{TermSeparator}
type SegmentSnapshot struct {
id uint64
segment segment.Segment
deleted *roaring.Bitmap
deleted segment.Bitmap
creator string

cachedDocs *cachedDocs
Expand All @@ -42,7 +41,7 @@ func (s *SegmentSnapshot) Segment() segment.Segment {
return s.segment
}

func (s *SegmentSnapshot) Deleted() *roaring.Bitmap {
func (s *SegmentSnapshot) Deleted() segment.Bitmap {
return s.deleted
}

Expand Down Expand Up @@ -78,7 +77,7 @@ func (s *SegmentSnapshot) Count() uint64 {
return rv
}

func (s *SegmentSnapshot) DocNumbers(docIDs []string) (*roaring.Bitmap, error) {
func (s *SegmentSnapshot) DocNumbers(docIDs []string) (segment.Bitmap, error) {
rv, err := s.segment.DocNumbers(docIDs)
if err != nil {
return nil, err
Expand All @@ -90,8 +89,7 @@ func (s *SegmentSnapshot) DocNumbers(docIDs []string) (*roaring.Bitmap, error) {
}

// DocNumbersLive returns a bitmap containing doc numbers for all live docs
func (s *SegmentSnapshot) DocNumbersLive() *roaring.Bitmap {
rv := roaring.NewBitmap()
func (s *SegmentSnapshot) DocNumbersLive(rv segment.Bitmap) segment.Bitmap {
rv.AddRange(0, s.segment.Count())
if s.deleted != nil {
rv.AndNot(s.deleted)
Expand Down
Loading