Skip to content

Commit

Permalink
enhance: [10kcp] Reduce memory usage of BF in DataNode and QueryNode (#…
Browse files Browse the repository at this point in the history
…38133)

1. DataNode: Skip generating BF during the insert phase (BF will be
regenerated during the sync phase).
2. QueryNode: Skip generating or maintaining BF for growing segments;
deletion checks will be handled in the segcore.

issue: #37630

pr: #38129

---------

Signed-off-by: bigsheeper <[email protected]>
  • Loading branch information
bigsheeper authored Dec 2, 2024
1 parent 0930430 commit 338ccc9
Show file tree
Hide file tree
Showing 6 changed files with 41 additions and 15 deletions.
2 changes: 1 addition & 1 deletion configs/milvus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,7 @@ queryNode:
# Enable memory mapping (mmap) to optimize the handling of growing raw data.
# By activating this feature, the memory overhead associated with newly added or modified data will be significantly minimized.
# However, this optimization may come at the cost of a slight decrease in query latency for the affected data segments.
growingMmapEnabled: false
growingMmapEnabled: true
fixedFileSizeForMmapAlloc: 1 # tmp file size for mmap chunk manager
maxDiskUsagePercentageForMmapAlloc: 50 # disk percentage used in mmap chunk manager
lazyload:
Expand Down
23 changes: 12 additions & 11 deletions internal/datanode/writebuffer/l0_write_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,23 +165,24 @@ func (wb *l0WriteBuffer) BufferData(insertMsgs []*msgstream.InsertMsg, deleteMsg

if paramtable.Get().DataNodeCfg.SkipBFStatsLoad.GetAsBool() {
// In Skip BF mode, datanode no longer maintains bloom filters.
// So, here we skip filtering delete entries.
// So, here we skip generating BF (growing segment's BF will be regenerated during the sync phase)
// and also skip filtering delete entries by bf.
wb.dispatchDeleteMsgsWithoutFilter(deleteMsgs, startPos, endPos)
} else {
// distribute delete msg
// bf write buffer check bloom filter of segment and current insert batch to decide which segment to write delete data
wb.dispatchDeleteMsgs(groups, deleteMsgs, startPos, endPos)
}

// update pk oracle
for _, inData := range groups {
// segment shall always exists after buffer insert
segments := wb.metaCache.GetSegmentsBy(metacache.WithSegmentIDs(inData.segmentID))
for _, segment := range segments {
for _, fieldData := range inData.pkField {
err := segment.GetBloomFilterSet().UpdatePKRange(fieldData)
if err != nil {
return err
// update pk oracle
for _, inData := range groups {
// segment shall always exists after buffer insert
segments := wb.metaCache.GetSegmentsBy(metacache.WithSegmentIDs(inData.segmentID))
for _, segment := range segments {
for _, fieldData := range inData.pkField {
err := segment.GetBloomFilterSet().UpdatePKRange(fieldData)
if err != nil {
return err
}
}
}
}
Expand Down
2 changes: 0 additions & 2 deletions internal/datanode/writebuffer/l0_write_buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,8 +181,6 @@ func (s *L0WriteBufferSuite) TestBufferData() {
pks, msg := s.composeInsertMsg(1000, 10, 128, schemapb.DataType_Int64)
delMsg := s.composeDeleteMsg(lo.Map(pks, func(id int64, _ int) storage.PrimaryKey { return storage.NewInt64PrimaryKey(id) }))

seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{ID: 1000}, metacache.NewBloomFilterSet())
s.metacache.EXPECT().GetSegmentsBy(mock.Anything, mock.Anything).Return([]*metacache.SegmentInfo{seg})
s.metacache.EXPECT().GetSegmentByID(int64(1000)).Return(nil, false).Once()
s.metacache.EXPECT().AddSegment(mock.Anything, mock.Anything, mock.Anything).Return()
s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return()
Expand Down
15 changes: 15 additions & 0 deletions internal/querynodev2/segments/segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ type baseSegment struct {
bloomFilterSet *pkoracle.BloomFilterSet
loadInfo *atomic.Pointer[querypb.SegmentLoadInfo]
isLazyLoad bool
skipGrowingBF bool // Skip generating or maintaining BF for growing segments; deletion checks will be handled in segcore.
channel metautil.Channel

resourceUsageCache *atomic.Pointer[ResourceUsage]
Expand All @@ -114,6 +115,7 @@ func newBaseSegment(collection *Collection, segmentType SegmentType, version int
bloomFilterSet: pkoracle.NewBloomFilterSet(loadInfo.GetSegmentID(), loadInfo.GetPartitionID(), segmentType),
channel: channel,
isLazyLoad: isLazyLoad(collection, segmentType),
skipGrowingBF: segmentType == SegmentTypeGrowing && paramtable.Get().QueryNodeCfg.SkipGrowingSegmentBF.GetAsBool(),

resourceUsageCache: atomic.NewPointer[ResourceUsage](nil),
needUpdatedVersion: atomic.NewInt64(0),
Expand Down Expand Up @@ -183,17 +185,30 @@ func (s *baseSegment) LoadInfo() *querypb.SegmentLoadInfo {
}

func (s *baseSegment) UpdateBloomFilter(pks []storage.PrimaryKey) {
if s.skipGrowingBF {
return
}
s.bloomFilterSet.UpdateBloomFilter(pks)
}

// MayPkExist returns true if the given PK exists in the PK range and being positive through the bloom filter,
// false otherwise,
// may returns true even the PK doesn't exist actually
func (s *baseSegment) MayPkExist(pk *storage.LocationsCache) bool {
if s.skipGrowingBF {
return true
}
return s.bloomFilterSet.MayPkExist(pk)
}

func (s *baseSegment) BatchPkExist(lc *storage.BatchLocationsCache) []bool {
if s.skipGrowingBF {
allPositive := make([]bool, lc.Size())
for i := 0; i < lc.Size(); i++ {
allPositive[i] = true
}
return allPositive
}
return s.bloomFilterSet.BatchPkExist(lc)
}

Expand Down
12 changes: 11 additions & 1 deletion pkg/util/paramtable/component_param.go
Original file line number Diff line number Diff line change
Expand Up @@ -2427,6 +2427,8 @@ type queryNodeConfig struct {
DefaultSegmentFilterRatio ParamItem `refreshable:"false"`
UseStreamComputing ParamItem `refreshable:"false"`

// BF
SkipGrowingSegmentBF ParamItem `refreshable:"true"`
BloomFilterApplyParallelFactor ParamItem `refreshable:"true"`

QueryStreamBatchSize ParamItem `refreshable:"false"`
Expand Down Expand Up @@ -2698,7 +2700,7 @@ This defaults to true, indicating that Milvus creates temporary index for growin
p.GrowingMmapEnabled = ParamItem{
Key: "queryNode.mmap.growingMmapEnabled",
Version: "2.4.6",
DefaultValue: "false",
DefaultValue: "true",
FallbackKeys: []string{"queryNode.growingMmapEnabled"},
Doc: `Enable memory mapping (mmap) to optimize the handling of growing raw data.
By activating this feature, the memory overhead associated with newly added or modified data will be significantly minimized.
Expand Down Expand Up @@ -3114,6 +3116,14 @@ user-task-polling:
}
p.BloomFilterApplyParallelFactor.Init(base.mgr)

p.SkipGrowingSegmentBF = ParamItem{
Key: "queryNode.skipGrowingSegmentBF",
Version: "2.5",
DefaultValue: "true",
Doc: "indicates whether skipping the creation, maintenance, or checking of Bloom Filters for growing segments",
}
p.SkipGrowingSegmentBF.Init(base.mgr)

p.QueryStreamBatchSize = ParamItem{
Key: "queryNode.queryStreamBatchSize",
Version: "2.4.1",
Expand Down
2 changes: 2 additions & 0 deletions pkg/util/paramtable/component_param_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,8 @@ func TestComponentParam(t *testing.T) {
assert.Equal(t, 3*time.Second, Params.LazyLoadRequestResourceRetryInterval.GetAsDuration(time.Millisecond))

assert.Equal(t, 4, Params.BloomFilterApplyParallelFactor.GetAsInt())
assert.Equal(t, true, Params.SkipGrowingSegmentBF.GetAsBool())

assert.Equal(t, "/var/lib/milvus/data/mmap", Params.MmapDirPath.GetValue())

assert.Equal(t, true, Params.MmapChunkCache.GetAsBool())
Expand Down

0 comments on commit 338ccc9

Please sign in to comment.