diff --git a/internal/flushcommon/writebuffer/l0_write_buffer.go b/internal/flushcommon/writebuffer/l0_write_buffer.go index 07237f43f7705..8020997e4e48c 100644 --- a/internal/flushcommon/writebuffer/l0_write_buffer.go +++ b/internal/flushcommon/writebuffer/l0_write_buffer.go @@ -165,23 +165,24 @@ func (wb *l0WriteBuffer) BufferData(insertData []*InsertData, deleteMsgs []*msgs if paramtable.Get().DataNodeCfg.SkipBFStatsLoad.GetAsBool() || streamingutil.IsStreamingServiceEnabled() { // In streaming service mode, flushed segments no longer maintain a bloom filter. - // So, here we skip filtering delete entries by bf. + // So, here we skip generating BF (growing segment's BF will be regenerated during the sync phase) + // and also skip filtering delete entries by bf. wb.dispatchDeleteMsgsWithoutFilter(deleteMsgs, startPos, endPos) } else { // distribute delete msg // bf write buffer check bloom filter of segment and current insert batch to decide which segment to write delete data wb.dispatchDeleteMsgs(insertData, deleteMsgs, startPos, endPos) - } - // update pk oracle - for _, inData := range insertData { - // segment shall always exists after buffer insert - segments := wb.metaCache.GetSegmentsBy(metacache.WithSegmentIDs(inData.segmentID)) - for _, segment := range segments { - for _, fieldData := range inData.pkField { - err := segment.GetBloomFilterSet().UpdatePKRange(fieldData) - if err != nil { - return err + // update pk oracle + for _, inData := range insertData { + // segment shall always exists after buffer insert + segments := wb.metaCache.GetSegmentsBy(metacache.WithSegmentIDs(inData.segmentID)) + for _, segment := range segments { + for _, fieldData := range inData.pkField { + err := segment.GetBloomFilterSet().UpdatePKRange(fieldData) + if err != nil { + return err + } } } } diff --git a/internal/flushcommon/writebuffer/l0_write_buffer_test.go b/internal/flushcommon/writebuffer/l0_write_buffer_test.go index 53c911544a342..aa79ddaf2c90c 100644 --- a/internal/flushcommon/writebuffer/l0_write_buffer_test.go +++ b/internal/flushcommon/writebuffer/l0_write_buffer_test.go @@ -15,9 +15,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/allocator" "github.com/milvus-io/milvus/internal/flushcommon/metacache" - "github.com/milvus-io/milvus/internal/flushcommon/metacache/pkoracle" "github.com/milvus-io/milvus/internal/flushcommon/syncmgr" - "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/metrics" @@ -185,8 +183,6 @@ func (s *L0WriteBufferSuite) TestBufferData() { pks, msg := s.composeInsertMsg(1000, 10, 128, schemapb.DataType_Int64) delMsg := s.composeDeleteMsg(lo.Map(pks, func(id int64, _ int) storage.PrimaryKey { return storage.NewInt64PrimaryKey(id) })) - seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{ID: 1000}, pkoracle.NewBloomFilterSet(), nil) - s.metacache.EXPECT().GetSegmentsBy(mock.Anything, mock.Anything).Return([]*metacache.SegmentInfo{seg}) s.metacache.EXPECT().GetSegmentByID(int64(1000)).Return(nil, false).Once() s.metacache.EXPECT().AddSegment(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return() s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return() diff --git a/internal/querynodev2/segments/segment.go b/internal/querynodev2/segments/segment.go index 055b029063918..b10ba0a9e2315 100644 --- a/internal/querynodev2/segments/segment.go +++ b/internal/querynodev2/segments/segment.go @@ -90,6 +90,7 @@ type baseSegment struct { bloomFilterSet *pkoracle.BloomFilterSet loadInfo *atomic.Pointer[querypb.SegmentLoadInfo] isLazyLoad bool + skipGrowingBF bool // Skip generating or maintaining BF for growing segments; deletion checks will be handled in segcore. channel metautil.Channel bm25Stats map[int64]*storage.BM25Stats @@ -113,6 +114,7 @@ func newBaseSegment(collection *Collection, segmentType SegmentType, version int bm25Stats: make(map[int64]*storage.BM25Stats), channel: channel, isLazyLoad: isLazyLoad(collection, segmentType), + skipGrowingBF: segmentType == SegmentTypeGrowing && paramtable.Get().QueryNodeCfg.SkipGrowingSegmentBF.GetAsBool(), resourceUsageCache: atomic.NewPointer[ResourceUsage](nil), needUpdatedVersion: atomic.NewInt64(0), @@ -186,6 +188,9 @@ func (s *baseSegment) LoadInfo() *querypb.SegmentLoadInfo { } func (s *baseSegment) UpdateBloomFilter(pks []storage.PrimaryKey) { + if s.skipGrowingBF { + return + } s.bloomFilterSet.UpdateBloomFilter(pks) } @@ -207,10 +212,20 @@ func (s *baseSegment) GetBM25Stats() map[int64]*storage.BM25Stats { // false otherwise, // may returns true even the PK doesn't exist actually func (s *baseSegment) MayPkExist(pk *storage.LocationsCache) bool { + if s.skipGrowingBF { + return true + } return s.bloomFilterSet.MayPkExist(pk) } func (s *baseSegment) BatchPkExist(lc *storage.BatchLocationsCache) []bool { + if s.skipGrowingBF { + allPositive := make([]bool, lc.Size()) + for i := 0; i < lc.Size(); i++ { + allPositive[i] = true + } + return allPositive + } return s.bloomFilterSet.BatchPkExist(lc) } diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index 64cc7ee008f68..a819d1f7a0e71 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -2491,7 +2491,10 @@ type queryNodeConfig struct { UseStreamComputing ParamItem `refreshable:"false"` QueryStreamBatchSize ParamItem `refreshable:"false"` QueryStreamMaxBatchSize ParamItem `refreshable:"false"` - BloomFilterApplyParallelFactor ParamItem `refreshable:"true"` + + // BF + SkipGrowingSegmentBF ParamItem `refreshable:"true"` + BloomFilterApplyParallelFactor ParamItem `refreshable:"true"` // worker WorkerPoolingSize ParamItem `refreshable:"false"` @@ -3214,6 +3217,14 @@ user-task-polling: } p.BloomFilterApplyParallelFactor.Init(base.mgr) + p.SkipGrowingSegmentBF = ParamItem{ + Key: "queryNode.skipGrowingSegmentBF", + Version: "2.5", + DefaultValue: "true", + Doc: "indicates whether skipping the creation, maintenance, or checking of Bloom Filters for growing segments", + } + p.SkipGrowingSegmentBF.Init(base.mgr) + p.WorkerPoolingSize = ParamItem{ Key: "queryNode.workerPooling.size", Version: "2.4.7", diff --git a/pkg/util/paramtable/component_param_test.go b/pkg/util/paramtable/component_param_test.go index 204200a0fbad6..e5deb2a5b0c31 100644 --- a/pkg/util/paramtable/component_param_test.go +++ b/pkg/util/paramtable/component_param_test.go @@ -473,6 +473,8 @@ func TestComponentParam(t *testing.T) { assert.Equal(t, 3*time.Second, Params.LazyLoadRequestResourceRetryInterval.GetAsDuration(time.Millisecond)) assert.Equal(t, 4, Params.BloomFilterApplyParallelFactor.GetAsInt()) + assert.Equal(t, true, Params.SkipGrowingSegmentBF.GetAsBool()) + assert.Equal(t, "/var/lib/milvus/data/mmap", Params.MmapDirPath.GetValue()) assert.Equal(t, true, Params.MmapChunkCache.GetAsBool())