Skip to content

Commit

Permalink
enhance: Reduce memory usage of BF in DataNode and QueryNode
Browse files Browse the repository at this point in the history
Signed-off-by: bigsheeper <[email protected]>
  • Loading branch information
bigsheeper committed Dec 2, 2024
1 parent cfe5613 commit ef5e8e9
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 15 deletions.
20 changes: 10 additions & 10 deletions internal/flushcommon/writebuffer/l0_write_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,17 +171,17 @@ func (wb *l0WriteBuffer) BufferData(insertData []*InsertData, deleteMsgs []*msgs
// distribute delete msg
// bf write buffer check bloom filter of segment and current insert batch to decide which segment to write delete data
wb.dispatchDeleteMsgs(insertData, deleteMsgs, startPos, endPos)
}

// update pk oracle
for _, inData := range insertData {
// segment shall always exists after buffer insert
segments := wb.metaCache.GetSegmentsBy(metacache.WithSegmentIDs(inData.segmentID))
for _, segment := range segments {
for _, fieldData := range inData.pkField {
err := segment.GetBloomFilterSet().UpdatePKRange(fieldData)
if err != nil {
return err
// update pk oracle
for _, inData := range insertData {
// segment shall always exists after buffer insert
segments := wb.metaCache.GetSegmentsBy(metacache.WithSegmentIDs(inData.segmentID))
for _, segment := range segments {
for _, fieldData := range inData.pkField {
err := segment.GetBloomFilterSet().UpdatePKRange(fieldData)
if err != nil {
return err
}
}
}
}
Expand Down
4 changes: 0 additions & 4 deletions internal/flushcommon/writebuffer/l0_write_buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/allocator"
"github.com/milvus-io/milvus/internal/flushcommon/metacache"
"github.com/milvus-io/milvus/internal/flushcommon/metacache/pkoracle"
"github.com/milvus-io/milvus/internal/flushcommon/syncmgr"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/metrics"
Expand Down Expand Up @@ -185,8 +183,6 @@ func (s *L0WriteBufferSuite) TestBufferData() {
pks, msg := s.composeInsertMsg(1000, 10, 128, schemapb.DataType_Int64)
delMsg := s.composeDeleteMsg(lo.Map(pks, func(id int64, _ int) storage.PrimaryKey { return storage.NewInt64PrimaryKey(id) }))

seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{ID: 1000}, pkoracle.NewBloomFilterSet(), nil)
s.metacache.EXPECT().GetSegmentsBy(mock.Anything, mock.Anything).Return([]*metacache.SegmentInfo{seg})
s.metacache.EXPECT().GetSegmentByID(int64(1000)).Return(nil, false).Once()
s.metacache.EXPECT().AddSegment(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return()
s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return()
Expand Down
15 changes: 15 additions & 0 deletions internal/querynodev2/segments/segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ type baseSegment struct {
bloomFilterSet *pkoracle.BloomFilterSet
loadInfo *atomic.Pointer[querypb.SegmentLoadInfo]
isLazyLoad bool
skipGrowingBF bool
channel metautil.Channel

bm25Stats map[int64]*storage.BM25Stats
Expand All @@ -113,6 +114,7 @@ func newBaseSegment(collection *Collection, segmentType SegmentType, version int
bm25Stats: make(map[int64]*storage.BM25Stats),
channel: channel,
isLazyLoad: isLazyLoad(collection, segmentType),
skipGrowingBF: segmentType == SegmentTypeGrowing && paramtable.Get().QueryNodeCfg.SkipGrowingSegmentBF.GetAsBool(),

resourceUsageCache: atomic.NewPointer[ResourceUsage](nil),
needUpdatedVersion: atomic.NewInt64(0),
Expand Down Expand Up @@ -186,6 +188,9 @@ func (s *baseSegment) LoadInfo() *querypb.SegmentLoadInfo {
}

func (s *baseSegment) UpdateBloomFilter(pks []storage.PrimaryKey) {
if s.skipGrowingBF {
return
}
s.bloomFilterSet.UpdateBloomFilter(pks)
}

Expand All @@ -207,10 +212,20 @@ func (s *baseSegment) GetBM25Stats() map[int64]*storage.BM25Stats {
// false otherwise,
// may returns true even the PK doesn't exist actually
func (s *baseSegment) MayPkExist(pk *storage.LocationsCache) bool {
if s.skipGrowingBF {
return true
}
return s.bloomFilterSet.MayPkExist(pk)
}

func (s *baseSegment) BatchPkExist(lc *storage.BatchLocationsCache) []bool {
if s.skipGrowingBF {
allPositive := make([]bool, lc.Size())
for i := 0; i < lc.Size(); i++ {
allPositive[i] = true
}
return allPositive
}
return s.bloomFilterSet.BatchPkExist(lc)
}

Expand Down
14 changes: 13 additions & 1 deletion pkg/util/paramtable/component_param.go
Original file line number Diff line number Diff line change
Expand Up @@ -2491,7 +2491,10 @@ type queryNodeConfig struct {
UseStreamComputing ParamItem `refreshable:"false"`
QueryStreamBatchSize ParamItem `refreshable:"false"`
QueryStreamMaxBatchSize ParamItem `refreshable:"false"`
BloomFilterApplyParallelFactor ParamItem `refreshable:"true"`

// BF
SkipGrowingSegmentBF ParamItem `refreshable:"true"`
BloomFilterApplyParallelFactor ParamItem `refreshable:"true"`

// worker
WorkerPoolingSize ParamItem `refreshable:"false"`
Expand Down Expand Up @@ -3214,6 +3217,15 @@ user-task-polling:
}
p.BloomFilterApplyParallelFactor.Init(base.mgr)

p.SkipGrowingSegmentBF = ParamItem{
Key: "queryNode.skipGrowingSegmentBF",
Version: "2.5",
DefaultValue: "true",
Doc: "indicates whether skipping the creation, maintenance, or checking of Bloom Filters for growing segments",
Export: true,
}
p.SkipGrowingSegmentBF.Init(base.mgr)

p.WorkerPoolingSize = ParamItem{
Key: "queryNode.workerPooling.size",
Version: "2.4.7",
Expand Down
2 changes: 2 additions & 0 deletions pkg/util/paramtable/component_param_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,8 @@ func TestComponentParam(t *testing.T) {
assert.Equal(t, 3*time.Second, Params.LazyLoadRequestResourceRetryInterval.GetAsDuration(time.Millisecond))

assert.Equal(t, 4, Params.BloomFilterApplyParallelFactor.GetAsInt())
assert.Equal(t, true, Params.SkipGrowingSegmentBF.GetAsBool())

assert.Equal(t, "/var/lib/milvus/data/mmap", Params.MmapDirPath.GetValue())

assert.Equal(t, true, Params.MmapChunkCache.GetAsBool())
Expand Down

0 comments on commit ef5e8e9

Please sign in to comment.