diff --git a/internal/flushcommon/writebuffer/l0_write_buffer.go b/internal/flushcommon/writebuffer/l0_write_buffer.go index 07237f43f7705..cc5fa750fb40f 100644 --- a/internal/flushcommon/writebuffer/l0_write_buffer.go +++ b/internal/flushcommon/writebuffer/l0_write_buffer.go @@ -171,17 +171,17 @@ func (wb *l0WriteBuffer) BufferData(insertData []*InsertData, deleteMsgs []*msgs // distribute delete msg // bf write buffer check bloom filter of segment and current insert batch to decide which segment to write delete data wb.dispatchDeleteMsgs(insertData, deleteMsgs, startPos, endPos) - } - // update pk oracle - for _, inData := range insertData { - // segment shall always exists after buffer insert - segments := wb.metaCache.GetSegmentsBy(metacache.WithSegmentIDs(inData.segmentID)) - for _, segment := range segments { - for _, fieldData := range inData.pkField { - err := segment.GetBloomFilterSet().UpdatePKRange(fieldData) - if err != nil { - return err + // update pk oracle + for _, inData := range insertData { + // segment shall always exists after buffer insert + segments := wb.metaCache.GetSegmentsBy(metacache.WithSegmentIDs(inData.segmentID)) + for _, segment := range segments { + for _, fieldData := range inData.pkField { + err := segment.GetBloomFilterSet().UpdatePKRange(fieldData) + if err != nil { + return err + } } } } diff --git a/internal/flushcommon/writebuffer/l0_write_buffer_test.go b/internal/flushcommon/writebuffer/l0_write_buffer_test.go index 53c911544a342..aa79ddaf2c90c 100644 --- a/internal/flushcommon/writebuffer/l0_write_buffer_test.go +++ b/internal/flushcommon/writebuffer/l0_write_buffer_test.go @@ -15,9 +15,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/allocator" "github.com/milvus-io/milvus/internal/flushcommon/metacache" - "github.com/milvus-io/milvus/internal/flushcommon/metacache/pkoracle" "github.com/milvus-io/milvus/internal/flushcommon/syncmgr" - "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/metrics" @@ -185,8 +183,6 @@ func (s *L0WriteBufferSuite) TestBufferData() { pks, msg := s.composeInsertMsg(1000, 10, 128, schemapb.DataType_Int64) delMsg := s.composeDeleteMsg(lo.Map(pks, func(id int64, _ int) storage.PrimaryKey { return storage.NewInt64PrimaryKey(id) })) - seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{ID: 1000}, pkoracle.NewBloomFilterSet(), nil) - s.metacache.EXPECT().GetSegmentsBy(mock.Anything, mock.Anything).Return([]*metacache.SegmentInfo{seg}) s.metacache.EXPECT().GetSegmentByID(int64(1000)).Return(nil, false).Once() s.metacache.EXPECT().AddSegment(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return() s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return() diff --git a/internal/querynodev2/segments/segment.go b/internal/querynodev2/segments/segment.go index 055b029063918..06a2b6d8479e7 100644 --- a/internal/querynodev2/segments/segment.go +++ b/internal/querynodev2/segments/segment.go @@ -90,6 +90,7 @@ type baseSegment struct { bloomFilterSet *pkoracle.BloomFilterSet loadInfo *atomic.Pointer[querypb.SegmentLoadInfo] isLazyLoad bool + skipGrowingBF bool channel metautil.Channel bm25Stats map[int64]*storage.BM25Stats @@ -113,6 +114,7 @@ func newBaseSegment(collection *Collection, segmentType SegmentType, version int bm25Stats: make(map[int64]*storage.BM25Stats), channel: channel, isLazyLoad: isLazyLoad(collection, segmentType), + skipGrowingBF: segmentType == SegmentTypeGrowing && paramtable.Get().QueryNodeCfg.SkipGrowingSegmentBF.GetAsBool(), resourceUsageCache: atomic.NewPointer[ResourceUsage](nil), needUpdatedVersion: atomic.NewInt64(0), @@ -186,6 +188,9 @@ func (s *baseSegment) LoadInfo() *querypb.SegmentLoadInfo { } func (s *baseSegment) UpdateBloomFilter(pks []storage.PrimaryKey) { + if s.skipGrowingBF { + return + } s.bloomFilterSet.UpdateBloomFilter(pks) } @@ -207,10 +212,20 @@ func (s *baseSegment) GetBM25Stats() map[int64]*storage.BM25Stats { // false otherwise, // may returns true even the PK doesn't exist actually func (s *baseSegment) MayPkExist(pk *storage.LocationsCache) bool { + if s.skipGrowingBF { + return true + } return s.bloomFilterSet.MayPkExist(pk) } func (s *baseSegment) BatchPkExist(lc *storage.BatchLocationsCache) []bool { + if s.skipGrowingBF { + allPositive := make([]bool, lc.Size()) + for i := 0; i < lc.Size(); i++ { + allPositive[i] = true + } + return allPositive + } return s.bloomFilterSet.BatchPkExist(lc) } diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index 64cc7ee008f68..3d476c3b36efa 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -2491,7 +2491,10 @@ type queryNodeConfig struct { UseStreamComputing ParamItem `refreshable:"false"` QueryStreamBatchSize ParamItem `refreshable:"false"` QueryStreamMaxBatchSize ParamItem `refreshable:"false"` - BloomFilterApplyParallelFactor ParamItem `refreshable:"true"` + + // BF + SkipGrowingSegmentBF ParamItem `refreshable:"true"` + BloomFilterApplyParallelFactor ParamItem `refreshable:"true"` // worker WorkerPoolingSize ParamItem `refreshable:"false"` @@ -3214,6 +3217,15 @@ user-task-polling: } p.BloomFilterApplyParallelFactor.Init(base.mgr) + p.SkipGrowingSegmentBF = ParamItem{ + Key: "queryNode.skipGrowingSegmentBF", + Version: "2.5", + DefaultValue: "true", + Doc: "indicates whether skipping the creation, maintenance, or checking of Bloom Filters for growing segments", + Export: true, + } + p.SkipGrowingSegmentBF.Init(base.mgr) + p.WorkerPoolingSize = ParamItem{ Key: "queryNode.workerPooling.size", Version: "2.4.7", diff --git a/pkg/util/paramtable/component_param_test.go b/pkg/util/paramtable/component_param_test.go index 204200a0fbad6..e5deb2a5b0c31 100644 --- a/pkg/util/paramtable/component_param_test.go +++ b/pkg/util/paramtable/component_param_test.go @@ -473,6 +473,8 @@ func TestComponentParam(t *testing.T) { assert.Equal(t, 3*time.Second, Params.LazyLoadRequestResourceRetryInterval.GetAsDuration(time.Millisecond)) assert.Equal(t, 4, Params.BloomFilterApplyParallelFactor.GetAsInt()) + assert.Equal(t, true, Params.SkipGrowingSegmentBF.GetAsBool()) + assert.Equal(t, "/var/lib/milvus/data/mmap", Params.MmapDirPath.GetValue()) assert.Equal(t, true, Params.MmapChunkCache.GetAsBool())