From 272d95ad79194f7e63a90c4352cdc8aac04f0484 Mon Sep 17 00:00:00 2001 From: "yihao.dai" Date: Wed, 15 Jan 2025 01:15:02 +0800 Subject: [PATCH] enhance: Reduce mutex contention in datacoord meta (#38219) 1. Using secondary index to avoid retrieving all segments at `GetSegmentsChanPart`. 2. Perform batch SetAllocations to reduce the number of times the meta lock is acquired. issue: https://github.com/milvus-io/milvus/issues/37630 --------- Signed-off-by: bigsheeper --- .../datacoord/compaction_policy_clustering.go | 7 +- .../datacoord/compaction_policy_single.go | 7 +- .../compaction_policy_single_test.go | 9 + internal/datacoord/compaction_trigger.go | 16 +- internal/datacoord/compaction_trigger_test.go | 211 +++++++++++------- internal/datacoord/meta.go | 12 +- internal/datacoord/meta_test.go | 4 +- internal/datacoord/segment_info.go | 2 +- 8 files changed, 170 insertions(+), 98 deletions(-) diff --git a/internal/datacoord/compaction_policy_clustering.go b/internal/datacoord/compaction_policy_clustering.go index 9e206f4c76b41..fd0b0295ad06b 100644 --- a/internal/datacoord/compaction_policy_clustering.go +++ b/internal/datacoord/compaction_policy_clustering.go @@ -120,15 +120,14 @@ func (policy *clusteringCompactionPolicy) triggerOneCollection(ctx context.Conte return nil, 0, err } - partSegments := policy.meta.GetSegmentsChanPart(func(segment *SegmentInfo) bool { - return segment.CollectionID == collectionID && - isSegmentHealthy(segment) && + partSegments := GetSegmentsChanPart(policy.meta, collectionID, SegmentFilterFunc(func(segment *SegmentInfo) bool { + return isSegmentHealthy(segment) && isFlush(segment) && !segment.isCompacting && // not compacting now !segment.GetIsImporting() && // not importing now segment.GetLevel() != datapb.SegmentLevel_L0 && // ignore level zero segments !segment.GetIsInvisible() - }) + })) views := make([]CompactionView, 0) // partSegments is list of chanPartSegments, which is channel-partition organized segments diff --git a/internal/datacoord/compaction_policy_single.go b/internal/datacoord/compaction_policy_single.go index 76ed80b2ea3a4..edf1c4dbc2af0 100644 --- a/internal/datacoord/compaction_policy_single.go +++ b/internal/datacoord/compaction_policy_single.go @@ -87,15 +87,14 @@ func (policy *singleCompactionPolicy) triggerOneCollection(ctx context.Context, return nil, 0, err } - partSegments := policy.meta.GetSegmentsChanPart(func(segment *SegmentInfo) bool { - return segment.CollectionID == collectionID && - isSegmentHealthy(segment) && + partSegments := GetSegmentsChanPart(policy.meta, collectionID, SegmentFilterFunc(func(segment *SegmentInfo) bool { + return isSegmentHealthy(segment) && isFlush(segment) && !segment.isCompacting && // not compacting now !segment.GetIsImporting() && // not importing now segment.GetLevel() == datapb.SegmentLevel_L2 && // only support L2 for now !segment.GetIsInvisible() - }) + })) views := make([]CompactionView, 0) for _, group := range partSegments { diff --git a/internal/datacoord/compaction_policy_single_test.go b/internal/datacoord/compaction_policy_single_test.go index 66dc4f3bab8fa..064ef6afb8049 100644 --- a/internal/datacoord/compaction_policy_single_test.go +++ b/internal/datacoord/compaction_policy_single_test.go @@ -128,6 +128,15 @@ func (s *SingleCompactionPolicySuite) TestL2SingleCompaction() { segments[103] = buildTestSegment(101, collID, datapb.SegmentLevel_L2, 100, 10000, 1) segmentsInfo := &SegmentsInfo{ segments: segments, + secondaryIndexes: segmentInfoIndexes{ + coll2Segments: map[UniqueID]map[UniqueID]*SegmentInfo{ + collID: { + 101: segments[101], + 102: segments[102], + 103: segments[103], + }, + }, + }, } compactionTaskMeta := newTestCompactionTaskMeta(s.T()) diff --git a/internal/datacoord/compaction_trigger.go b/internal/datacoord/compaction_trigger.go index c49ce0f9a7c48..121920a52706c 100644 --- a/internal/datacoord/compaction_trigger.go +++ b/internal/datacoord/compaction_trigger.go @@ -291,9 +291,8 @@ func (t *compactionTrigger) handleGlobalSignal(signal *compactionSignal) error { zap.Int64("signal.collectionID", signal.collectionID), zap.Int64("signal.partitionID", signal.partitionID), zap.Int64("signal.segmentID", signal.segmentID)) - partSegments := t.meta.GetSegmentsChanPart(func(segment *SegmentInfo) bool { - return (signal.collectionID == 0 || segment.CollectionID == signal.collectionID) && - isSegmentHealthy(segment) && + filter := SegmentFilterFunc(func(segment *SegmentInfo) bool { + return isSegmentHealthy(segment) && isFlush(segment) && !segment.isCompacting && // not compacting now !segment.GetIsImporting() && // not importing now @@ -302,6 +301,17 @@ func (t *compactionTrigger) handleGlobalSignal(signal *compactionSignal) error { !segment.GetIsInvisible() }) // partSegments is list of chanPartSegments, which is channel-partition organized segments + partSegments := make([]*chanPartSegments, 0) + // get all segments if signal.collection == 0, otherwise get collection segments + if signal.collectionID != 0 { + partSegments = GetSegmentsChanPart(t.meta, signal.collectionID, filter) + } else { + collections := t.meta.GetCollections() + for _, collection := range collections { + partSegments = append(partSegments, GetSegmentsChanPart(t.meta, collection.ID, filter)...) + } + } + if len(partSegments) == 0 { log.Info("the length of SegmentsChanPart is 0, skip to handle compaction") return nil diff --git a/internal/datacoord/compaction_trigger_test.go b/internal/datacoord/compaction_trigger_test.go index c93491e339405..b0f2197b6eb5f 100644 --- a/internal/datacoord/compaction_trigger_test.go +++ b/internal/datacoord/compaction_trigger_test.go @@ -122,24 +122,34 @@ func Test_compactionTrigger_force_without_index(t *testing.T) { }, } + segInfo := &datapb.SegmentInfo{ + ID: 1, + CollectionID: collectionID, + PartitionID: 1, + LastExpireTime: 100, + NumOfRows: 100, + MaxRowNum: 300, + InsertChannel: "ch1", + State: commonpb.SegmentState_Flushed, + Binlogs: binlogs, + Deltalogs: deltaLogs, + IsSorted: true, + } m := &meta{ catalog: catalog, channelCPs: newChannelCps(), segments: &SegmentsInfo{ segments: map[int64]*SegmentInfo{ 1: { - SegmentInfo: &datapb.SegmentInfo{ - ID: 1, - CollectionID: collectionID, - PartitionID: 1, - LastExpireTime: 100, - NumOfRows: 100, - MaxRowNum: 300, - InsertChannel: "ch1", - State: commonpb.SegmentState_Flushed, - Binlogs: binlogs, - Deltalogs: deltaLogs, - IsSorted: true, + SegmentInfo: segInfo, + }, + }, + secondaryIndexes: segmentInfoIndexes{ + coll2Segments: map[UniqueID]map[UniqueID]*SegmentInfo{ + collectionID: { + 1: { + SegmentInfo: segInfo, + }, }, }, }, @@ -214,6 +224,76 @@ func Test_compactionTrigger_force(t *testing.T) { mock0Allocator := newMock0Allocator(t) + seg1 := &SegmentInfo{ + SegmentInfo: &datapb.SegmentInfo{ + ID: 1, + CollectionID: 2, + PartitionID: 1, + LastExpireTime: 100, + NumOfRows: 100, + MaxRowNum: 300, + InsertChannel: "ch1", + State: commonpb.SegmentState_Flushed, + Binlogs: []*datapb.FieldBinlog{ + { + Binlogs: []*datapb.Binlog{ + {EntriesNum: 5, LogID: 1}, + }, + }, + }, + Deltalogs: []*datapb.FieldBinlog{ + { + Binlogs: []*datapb.Binlog{ + {EntriesNum: 5, LogID: 1}, + }, + }, + }, + IsSorted: true, + }, + } + + seg2 := &SegmentInfo{ + SegmentInfo: &datapb.SegmentInfo{ + ID: 2, + CollectionID: 2, + PartitionID: 1, + LastExpireTime: 100, + NumOfRows: 100, + MaxRowNum: 300, + InsertChannel: "ch1", + State: commonpb.SegmentState_Flushed, + Binlogs: []*datapb.FieldBinlog{ + { + Binlogs: []*datapb.Binlog{ + {EntriesNum: 5, LogID: 2}, + }, + }, + }, + Deltalogs: []*datapb.FieldBinlog{ + { + Binlogs: []*datapb.Binlog{ + {EntriesNum: 5, LogID: 2}, + }, + }, + }, + IsSorted: true, + }, + } + + seg3 := &SegmentInfo{ + SegmentInfo: &datapb.SegmentInfo{ + ID: 3, + CollectionID: 1111, + PartitionID: 1, + LastExpireTime: 100, + NumOfRows: 100, + MaxRowNum: 300, + InsertChannel: "ch1", + State: commonpb.SegmentState_Flushed, + IsSorted: true, + }, + } + tests := []struct { name string fields fields @@ -230,71 +310,18 @@ func Test_compactionTrigger_force(t *testing.T) { channelCPs: newChannelCps(), segments: &SegmentsInfo{ segments: map[int64]*SegmentInfo{ - 1: { - SegmentInfo: &datapb.SegmentInfo{ - ID: 1, - CollectionID: 2, - PartitionID: 1, - LastExpireTime: 100, - NumOfRows: 100, - MaxRowNum: 300, - InsertChannel: "ch1", - State: commonpb.SegmentState_Flushed, - Binlogs: []*datapb.FieldBinlog{ - { - Binlogs: []*datapb.Binlog{ - {EntriesNum: 5, LogID: 1}, - }, - }, - }, - Deltalogs: []*datapb.FieldBinlog{ - { - Binlogs: []*datapb.Binlog{ - {EntriesNum: 5, LogID: 1}, - }, - }, - }, - IsSorted: true, - }, - }, - 2: { - SegmentInfo: &datapb.SegmentInfo{ - ID: 2, - CollectionID: 2, - PartitionID: 1, - LastExpireTime: 100, - NumOfRows: 100, - MaxRowNum: 300, - InsertChannel: "ch1", - State: commonpb.SegmentState_Flushed, - Binlogs: []*datapb.FieldBinlog{ - { - Binlogs: []*datapb.Binlog{ - {EntriesNum: 5, LogID: 2}, - }, - }, - }, - Deltalogs: []*datapb.FieldBinlog{ - { - Binlogs: []*datapb.Binlog{ - {EntriesNum: 5, LogID: 2}, - }, - }, - }, - IsSorted: true, + 1: seg1, + 2: seg2, + 3: seg3, + }, + secondaryIndexes: segmentInfoIndexes{ + coll2Segments: map[UniqueID]map[UniqueID]*SegmentInfo{ + 2: { + seg1.GetID(): seg1, + seg2.GetID(): seg2, }, - }, - 3: { - SegmentInfo: &datapb.SegmentInfo{ - ID: 3, - CollectionID: 1111, - PartitionID: 1, - LastExpireTime: 100, - NumOfRows: 100, - MaxRowNum: 300, - InsertChannel: "ch1", - State: commonpb.SegmentState_Flushed, - IsSorted: true, + 1111: { + seg3.GetID(): seg3, }, }, }, @@ -617,7 +644,13 @@ func Test_compactionTrigger_force(t *testing.T) { t.Run(tt.name+" with DiskANN index", func(t *testing.T) { for _, segment := range tt.fields.meta.segments.GetSegments() { // Collection 1000 means it has DiskANN index + delete(tt.fields.meta.segments.secondaryIndexes.coll2Segments[segment.GetCollectionID()], segment.GetID()) segment.CollectionID = 1000 + _, ok := tt.fields.meta.segments.secondaryIndexes.coll2Segments[segment.GetCollectionID()] + if !ok { + tt.fields.meta.segments.secondaryIndexes.coll2Segments[segment.GetCollectionID()] = make(map[UniqueID]*SegmentInfo) + } + tt.fields.meta.segments.secondaryIndexes.coll2Segments[segment.GetCollectionID()][segment.GetID()] = segment } tr := &compactionTrigger{ meta: tt.fields.meta, @@ -725,6 +758,9 @@ func Test_compactionTrigger_force_maxSegmentLimit(t *testing.T) { vecFieldID := int64(201) segmentInfos := &SegmentsInfo{ segments: make(map[UniqueID]*SegmentInfo), + secondaryIndexes: segmentInfoIndexes{ + coll2Segments: make(map[UniqueID]map[UniqueID]*SegmentInfo), + }, } indexMeta := newSegmentIndexMeta(nil) @@ -751,6 +787,7 @@ func Test_compactionTrigger_force_maxSegmentLimit(t *testing.T) { }, } + segmentInfos.secondaryIndexes.coll2Segments[2] = make(map[UniqueID]*SegmentInfo) nSegments := 50 for i := UniqueID(0); i < UniqueID(nSegments); i++ { info := &SegmentInfo{ @@ -794,6 +831,7 @@ func Test_compactionTrigger_force_maxSegmentLimit(t *testing.T) { }) segmentInfos.segments[i] = info + segmentInfos.secondaryIndexes.coll2Segments[2][i] = info } mock0Allocator := newMockAllocator(t) @@ -1110,15 +1148,28 @@ func mockSegment(segID, rows, deleteRows, sizeInMB int64) *datapb.SegmentInfo { func mockSegmentsInfo(sizeInMB ...int64) *SegmentsInfo { segments := make(map[int64]*SegmentInfo, len(sizeInMB)) + collectionID := int64(2) + channel := "ch1" + coll2Segments := make(map[UniqueID]map[UniqueID]*SegmentInfo) + coll2Segments[collectionID] = make(map[UniqueID]*SegmentInfo) + channel2Segments := make(map[string]map[UniqueID]*SegmentInfo) + channel2Segments[channel] = make(map[UniqueID]*SegmentInfo) for i, size := range sizeInMB { segId := int64(i + 1) - segments[segId] = &SegmentInfo{ + info := &SegmentInfo{ SegmentInfo: mockSegment(segId, size, 1, size), lastFlushTime: time.Now().Add(-100 * time.Minute), } + segments[segId] = info + coll2Segments[collectionID][segId] = info + channel2Segments[channel][segId] = info } return &SegmentsInfo{ segments: segments, + secondaryIndexes: segmentInfoIndexes{ + coll2Segments: coll2Segments, + channel2Segments: channel2Segments, + }, } } @@ -1564,6 +1615,10 @@ func Test_compactionTrigger_noplan_random_size(t *testing.T) { segmentInfos := &SegmentsInfo{ segments: make(map[UniqueID]*SegmentInfo), + secondaryIndexes: segmentInfoIndexes{ + coll2Segments: map[UniqueID]map[UniqueID]*SegmentInfo{2: {}}, + channel2Segments: map[string]map[UniqueID]*SegmentInfo{"ch1": {}}, + }, } size := []int64{ @@ -1636,6 +1691,8 @@ func Test_compactionTrigger_noplan_random_size(t *testing.T) { }) segmentInfos.segments[i] = info + segmentInfos.secondaryIndexes.coll2Segments[2][i] = info + segmentInfos.secondaryIndexes.channel2Segments["ch1"][i] = info } mock0Allocator := newMockAllocator(t) diff --git a/internal/datacoord/meta.go b/internal/datacoord/meta.go index c897b0c8b8a70..6b5fe127e442e 100644 --- a/internal/datacoord/meta.go +++ b/internal/datacoord/meta.go @@ -350,9 +350,8 @@ func (m *meta) GetClonedCollectionInfo(collectionID UniqueID) *collectionInfo { } // GetSegmentsChanPart returns segments organized in Channel-Partition dimension with selector applied -func (m *meta) GetSegmentsChanPart(selector SegmentInfoSelector) []*chanPartSegments { - m.RLock() - defer m.RUnlock() +// TODO: Move this function to the compaction module after reorganizing the DataCoord modules. +func GetSegmentsChanPart(m *meta, collectionID int64, filters ...SegmentFilter) []*chanPartSegments { type dim struct { partitionID int64 channelName string @@ -360,10 +359,9 @@ func (m *meta) GetSegmentsChanPart(selector SegmentInfoSelector) []*chanPartSegm mDimEntry := make(map[dim]*chanPartSegments) - for _, si := range m.segments.segments { - if !selector(si) { - continue - } + filters = append(filters, WithCollection(collectionID)) + candidates := m.SelectSegments(context.Background(), filters...) + for _, si := range candidates { d := dim{si.PartitionID, si.InsertChannel} entry, ok := mDimEntry[d] if !ok { diff --git a/internal/datacoord/meta_test.go b/internal/datacoord/meta_test.go index 41f137d8142a4..f0d002deb20f2 100644 --- a/internal/datacoord/meta_test.go +++ b/internal/datacoord/meta_test.go @@ -609,7 +609,7 @@ func TestMeta_Basic(t *testing.T) { }) t.Run("Test GetSegmentsChanPart", func(t *testing.T) { - result := meta.GetSegmentsChanPart(func(*SegmentInfo) bool { return true }) + result := GetSegmentsChanPart(meta, collID, SegmentFilterFunc(func(segment *SegmentInfo) bool { return true })) assert.Equal(t, 2, len(result)) for _, entry := range result { assert.Equal(t, "c1", entry.channelName) @@ -620,7 +620,7 @@ func TestMeta_Basic(t *testing.T) { assert.Equal(t, 1, len(entry.segments)) } } - result = meta.GetSegmentsChanPart(func(seg *SegmentInfo) bool { return seg.GetCollectionID() == 10 }) + result = GetSegmentsChanPart(meta, 10) assert.Equal(t, 0, len(result)) }) diff --git a/internal/datacoord/segment_info.go b/internal/datacoord/segment_info.go index b4cc6e0a7144e..dcd08e8ccec5c 100644 --- a/internal/datacoord/segment_info.go +++ b/internal/datacoord/segment_info.go @@ -144,7 +144,7 @@ func (s *SegmentsInfo) GetSegmentsBySelector(filters ...SegmentFilter) []*Segmen // apply criterion candidates := s.getCandidates(criterion) - var result []*SegmentInfo + result := make([]*SegmentInfo, 0, len(candidates)) for _, segment := range candidates { if criterion.Match(segment) { result = append(result, segment)