From 338ccc9ff93b6c5efb3826c242ddb332163fc233 Mon Sep 17 00:00:00 2001 From: "yihao.dai" Date: Mon, 2 Dec 2024 14:41:19 +0800 Subject: [PATCH] enhance: [10kcp] Reduce memory usage of BF in DataNode and QueryNode (#38133) 1. DataNode: Skip generating BF during the insert phase (BF will be regenerated during the sync phase). 2. QueryNode: Skip generating or maintaining BF for growing segments; deletion checks will be handled in the segcore. issue: https://github.com/milvus-io/milvus/issues/37630 pr: https://github.com/milvus-io/milvus/pull/38129 --------- Signed-off-by: bigsheeper --- configs/milvus.yaml | 2 +- .../datanode/writebuffer/l0_write_buffer.go | 23 ++++++++++--------- .../writebuffer/l0_write_buffer_test.go | 2 -- internal/querynodev2/segments/segment.go | 15 ++++++++++++ pkg/util/paramtable/component_param.go | 12 +++++++++- pkg/util/paramtable/component_param_test.go | 2 ++ 6 files changed, 41 insertions(+), 15 deletions(-) diff --git a/configs/milvus.yaml b/configs/milvus.yaml index ea2dc4f7b2726..8d593121381d5 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -420,7 +420,7 @@ queryNode: # Enable memory mapping (mmap) to optimize the handling of growing raw data. # By activating this feature, the memory overhead associated with newly added or modified data will be significantly minimized. # However, this optimization may come at the cost of a slight decrease in query latency for the affected data segments. - growingMmapEnabled: false + growingMmapEnabled: true fixedFileSizeForMmapAlloc: 1 # tmp file size for mmap chunk manager maxDiskUsagePercentageForMmapAlloc: 50 # disk percentage used in mmap chunk manager lazyload: diff --git a/internal/datanode/writebuffer/l0_write_buffer.go b/internal/datanode/writebuffer/l0_write_buffer.go index 1a9966020f5a4..3ce9563bfbfc7 100644 --- a/internal/datanode/writebuffer/l0_write_buffer.go +++ b/internal/datanode/writebuffer/l0_write_buffer.go @@ -165,23 +165,24 @@ func (wb *l0WriteBuffer) BufferData(insertMsgs []*msgstream.InsertMsg, deleteMsg if paramtable.Get().DataNodeCfg.SkipBFStatsLoad.GetAsBool() { // In Skip BF mode, datanode no longer maintains bloom filters. - // So, here we skip filtering delete entries. + // So, here we skip generating BF (growing segment's BF will be regenerated during the sync phase) + // and also skip filtering delete entries by bf. wb.dispatchDeleteMsgsWithoutFilter(deleteMsgs, startPos, endPos) } else { // distribute delete msg // bf write buffer check bloom filter of segment and current insert batch to decide which segment to write delete data wb.dispatchDeleteMsgs(groups, deleteMsgs, startPos, endPos) - } - // update pk oracle - for _, inData := range groups { - // segment shall always exists after buffer insert - segments := wb.metaCache.GetSegmentsBy(metacache.WithSegmentIDs(inData.segmentID)) - for _, segment := range segments { - for _, fieldData := range inData.pkField { - err := segment.GetBloomFilterSet().UpdatePKRange(fieldData) - if err != nil { - return err + // update pk oracle + for _, inData := range groups { + // segment shall always exists after buffer insert + segments := wb.metaCache.GetSegmentsBy(metacache.WithSegmentIDs(inData.segmentID)) + for _, segment := range segments { + for _, fieldData := range inData.pkField { + err := segment.GetBloomFilterSet().UpdatePKRange(fieldData) + if err != nil { + return err + } } } } diff --git a/internal/datanode/writebuffer/l0_write_buffer_test.go b/internal/datanode/writebuffer/l0_write_buffer_test.go index 5d79a25b40203..7c39fabf926f3 100644 --- a/internal/datanode/writebuffer/l0_write_buffer_test.go +++ b/internal/datanode/writebuffer/l0_write_buffer_test.go @@ -181,8 +181,6 @@ func (s *L0WriteBufferSuite) TestBufferData() { pks, msg := s.composeInsertMsg(1000, 10, 128, schemapb.DataType_Int64) delMsg := s.composeDeleteMsg(lo.Map(pks, func(id int64, _ int) storage.PrimaryKey { return storage.NewInt64PrimaryKey(id) })) - seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{ID: 1000}, metacache.NewBloomFilterSet()) - s.metacache.EXPECT().GetSegmentsBy(mock.Anything, mock.Anything).Return([]*metacache.SegmentInfo{seg}) s.metacache.EXPECT().GetSegmentByID(int64(1000)).Return(nil, false).Once() s.metacache.EXPECT().AddSegment(mock.Anything, mock.Anything, mock.Anything).Return() s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return() diff --git a/internal/querynodev2/segments/segment.go b/internal/querynodev2/segments/segment.go index 00509135cb8d2..24a4e41db6f70 100644 --- a/internal/querynodev2/segments/segment.go +++ b/internal/querynodev2/segments/segment.go @@ -94,6 +94,7 @@ type baseSegment struct { bloomFilterSet *pkoracle.BloomFilterSet loadInfo *atomic.Pointer[querypb.SegmentLoadInfo] isLazyLoad bool + skipGrowingBF bool // Skip generating or maintaining BF for growing segments; deletion checks will be handled in segcore. channel metautil.Channel resourceUsageCache *atomic.Pointer[ResourceUsage] @@ -114,6 +115,7 @@ func newBaseSegment(collection *Collection, segmentType SegmentType, version int bloomFilterSet: pkoracle.NewBloomFilterSet(loadInfo.GetSegmentID(), loadInfo.GetPartitionID(), segmentType), channel: channel, isLazyLoad: isLazyLoad(collection, segmentType), + skipGrowingBF: segmentType == SegmentTypeGrowing && paramtable.Get().QueryNodeCfg.SkipGrowingSegmentBF.GetAsBool(), resourceUsageCache: atomic.NewPointer[ResourceUsage](nil), needUpdatedVersion: atomic.NewInt64(0), @@ -183,6 +185,9 @@ func (s *baseSegment) LoadInfo() *querypb.SegmentLoadInfo { } func (s *baseSegment) UpdateBloomFilter(pks []storage.PrimaryKey) { + if s.skipGrowingBF { + return + } s.bloomFilterSet.UpdateBloomFilter(pks) } @@ -190,10 +195,20 @@ func (s *baseSegment) UpdateBloomFilter(pks []storage.PrimaryKey) { // false otherwise, // may returns true even the PK doesn't exist actually func (s *baseSegment) MayPkExist(pk *storage.LocationsCache) bool { + if s.skipGrowingBF { + return true + } return s.bloomFilterSet.MayPkExist(pk) } func (s *baseSegment) BatchPkExist(lc *storage.BatchLocationsCache) []bool { + if s.skipGrowingBF { + allPositive := make([]bool, lc.Size()) + for i := 0; i < lc.Size(); i++ { + allPositive[i] = true + } + return allPositive + } return s.bloomFilterSet.BatchPkExist(lc) } diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index e1bf5075c157e..cf02043647711 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -2427,6 +2427,8 @@ type queryNodeConfig struct { DefaultSegmentFilterRatio ParamItem `refreshable:"false"` UseStreamComputing ParamItem `refreshable:"false"` + // BF + SkipGrowingSegmentBF ParamItem `refreshable:"true"` BloomFilterApplyParallelFactor ParamItem `refreshable:"true"` QueryStreamBatchSize ParamItem `refreshable:"false"` @@ -2698,7 +2700,7 @@ This defaults to true, indicating that Milvus creates temporary index for growin p.GrowingMmapEnabled = ParamItem{ Key: "queryNode.mmap.growingMmapEnabled", Version: "2.4.6", - DefaultValue: "false", + DefaultValue: "true", FallbackKeys: []string{"queryNode.growingMmapEnabled"}, Doc: `Enable memory mapping (mmap) to optimize the handling of growing raw data. By activating this feature, the memory overhead associated with newly added or modified data will be significantly minimized. @@ -3114,6 +3116,14 @@ user-task-polling: } p.BloomFilterApplyParallelFactor.Init(base.mgr) + p.SkipGrowingSegmentBF = ParamItem{ + Key: "queryNode.skipGrowingSegmentBF", + Version: "2.5", + DefaultValue: "true", + Doc: "indicates whether skipping the creation, maintenance, or checking of Bloom Filters for growing segments", + } + p.SkipGrowingSegmentBF.Init(base.mgr) + p.QueryStreamBatchSize = ParamItem{ Key: "queryNode.queryStreamBatchSize", Version: "2.4.1", diff --git a/pkg/util/paramtable/component_param_test.go b/pkg/util/paramtable/component_param_test.go index 4902b01847022..d979746d4c0eb 100644 --- a/pkg/util/paramtable/component_param_test.go +++ b/pkg/util/paramtable/component_param_test.go @@ -451,6 +451,8 @@ func TestComponentParam(t *testing.T) { assert.Equal(t, 3*time.Second, Params.LazyLoadRequestResourceRetryInterval.GetAsDuration(time.Millisecond)) assert.Equal(t, 4, Params.BloomFilterApplyParallelFactor.GetAsInt()) + assert.Equal(t, true, Params.SkipGrowingSegmentBF.GetAsBool()) + assert.Equal(t, "/var/lib/milvus/data/mmap", Params.MmapDirPath.GetValue()) assert.Equal(t, true, Params.MmapChunkCache.GetAsBool())