Skip to content

Commit

Permalink
enhance: Reduce mutex contention in datacoord meta (#38219)
Browse files Browse the repository at this point in the history
1. Using secondary index to avoid retrieving all segments at
`GetSegmentsChanPart`.
2. Perform batch SetAllocations to reduce the number of times the meta
lock is acquired.

issue: #37630

---------

Signed-off-by: bigsheeper <[email protected]>
  • Loading branch information
bigsheeper authored Jan 14, 2025
1 parent 5bf1b2b commit 272d95a
Show file tree
Hide file tree
Showing 8 changed files with 170 additions and 98 deletions.
7 changes: 3 additions & 4 deletions internal/datacoord/compaction_policy_clustering.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,15 +120,14 @@ func (policy *clusteringCompactionPolicy) triggerOneCollection(ctx context.Conte
return nil, 0, err
}

partSegments := policy.meta.GetSegmentsChanPart(func(segment *SegmentInfo) bool {
return segment.CollectionID == collectionID &&
isSegmentHealthy(segment) &&
partSegments := GetSegmentsChanPart(policy.meta, collectionID, SegmentFilterFunc(func(segment *SegmentInfo) bool {
return isSegmentHealthy(segment) &&
isFlush(segment) &&
!segment.isCompacting && // not compacting now
!segment.GetIsImporting() && // not importing now
segment.GetLevel() != datapb.SegmentLevel_L0 && // ignore level zero segments
!segment.GetIsInvisible()
})
}))

views := make([]CompactionView, 0)
// partSegments is list of chanPartSegments, which is channel-partition organized segments
Expand Down
7 changes: 3 additions & 4 deletions internal/datacoord/compaction_policy_single.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,15 +87,14 @@ func (policy *singleCompactionPolicy) triggerOneCollection(ctx context.Context,
return nil, 0, err
}

partSegments := policy.meta.GetSegmentsChanPart(func(segment *SegmentInfo) bool {
return segment.CollectionID == collectionID &&
isSegmentHealthy(segment) &&
partSegments := GetSegmentsChanPart(policy.meta, collectionID, SegmentFilterFunc(func(segment *SegmentInfo) bool {
return isSegmentHealthy(segment) &&
isFlush(segment) &&
!segment.isCompacting && // not compacting now
!segment.GetIsImporting() && // not importing now
segment.GetLevel() == datapb.SegmentLevel_L2 && // only support L2 for now
!segment.GetIsInvisible()
})
}))

views := make([]CompactionView, 0)
for _, group := range partSegments {
Expand Down
9 changes: 9 additions & 0 deletions internal/datacoord/compaction_policy_single_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,15 @@ func (s *SingleCompactionPolicySuite) TestL2SingleCompaction() {
segments[103] = buildTestSegment(101, collID, datapb.SegmentLevel_L2, 100, 10000, 1)
segmentsInfo := &SegmentsInfo{
segments: segments,
secondaryIndexes: segmentInfoIndexes{
coll2Segments: map[UniqueID]map[UniqueID]*SegmentInfo{
collID: {
101: segments[101],
102: segments[102],
103: segments[103],
},
},
},
}

compactionTaskMeta := newTestCompactionTaskMeta(s.T())
Expand Down
16 changes: 13 additions & 3 deletions internal/datacoord/compaction_trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,9 +291,8 @@ func (t *compactionTrigger) handleGlobalSignal(signal *compactionSignal) error {
zap.Int64("signal.collectionID", signal.collectionID),
zap.Int64("signal.partitionID", signal.partitionID),
zap.Int64("signal.segmentID", signal.segmentID))
partSegments := t.meta.GetSegmentsChanPart(func(segment *SegmentInfo) bool {
return (signal.collectionID == 0 || segment.CollectionID == signal.collectionID) &&
isSegmentHealthy(segment) &&
filter := SegmentFilterFunc(func(segment *SegmentInfo) bool {
return isSegmentHealthy(segment) &&
isFlush(segment) &&
!segment.isCompacting && // not compacting now
!segment.GetIsImporting() && // not importing now
Expand All @@ -302,6 +301,17 @@ func (t *compactionTrigger) handleGlobalSignal(signal *compactionSignal) error {
!segment.GetIsInvisible()
}) // partSegments is list of chanPartSegments, which is channel-partition organized segments

partSegments := make([]*chanPartSegments, 0)
// get all segments if signal.collection == 0, otherwise get collection segments
if signal.collectionID != 0 {
partSegments = GetSegmentsChanPart(t.meta, signal.collectionID, filter)
} else {
collections := t.meta.GetCollections()
for _, collection := range collections {
partSegments = append(partSegments, GetSegmentsChanPart(t.meta, collection.ID, filter)...)
}
}

if len(partSegments) == 0 {
log.Info("the length of SegmentsChanPart is 0, skip to handle compaction")
return nil
Expand Down
211 changes: 134 additions & 77 deletions internal/datacoord/compaction_trigger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,24 +122,34 @@ func Test_compactionTrigger_force_without_index(t *testing.T) {
},
}

segInfo := &datapb.SegmentInfo{
ID: 1,
CollectionID: collectionID,
PartitionID: 1,
LastExpireTime: 100,
NumOfRows: 100,
MaxRowNum: 300,
InsertChannel: "ch1",
State: commonpb.SegmentState_Flushed,
Binlogs: binlogs,
Deltalogs: deltaLogs,
IsSorted: true,
}
m := &meta{
catalog: catalog,
channelCPs: newChannelCps(),
segments: &SegmentsInfo{
segments: map[int64]*SegmentInfo{
1: {
SegmentInfo: &datapb.SegmentInfo{
ID: 1,
CollectionID: collectionID,
PartitionID: 1,
LastExpireTime: 100,
NumOfRows: 100,
MaxRowNum: 300,
InsertChannel: "ch1",
State: commonpb.SegmentState_Flushed,
Binlogs: binlogs,
Deltalogs: deltaLogs,
IsSorted: true,
SegmentInfo: segInfo,
},
},
secondaryIndexes: segmentInfoIndexes{
coll2Segments: map[UniqueID]map[UniqueID]*SegmentInfo{
collectionID: {
1: {
SegmentInfo: segInfo,
},
},
},
},
Expand Down Expand Up @@ -214,6 +224,76 @@ func Test_compactionTrigger_force(t *testing.T) {

mock0Allocator := newMock0Allocator(t)

seg1 := &SegmentInfo{
SegmentInfo: &datapb.SegmentInfo{
ID: 1,
CollectionID: 2,
PartitionID: 1,
LastExpireTime: 100,
NumOfRows: 100,
MaxRowNum: 300,
InsertChannel: "ch1",
State: commonpb.SegmentState_Flushed,
Binlogs: []*datapb.FieldBinlog{
{
Binlogs: []*datapb.Binlog{
{EntriesNum: 5, LogID: 1},
},
},
},
Deltalogs: []*datapb.FieldBinlog{
{
Binlogs: []*datapb.Binlog{
{EntriesNum: 5, LogID: 1},
},
},
},
IsSorted: true,
},
}

seg2 := &SegmentInfo{
SegmentInfo: &datapb.SegmentInfo{
ID: 2,
CollectionID: 2,
PartitionID: 1,
LastExpireTime: 100,
NumOfRows: 100,
MaxRowNum: 300,
InsertChannel: "ch1",
State: commonpb.SegmentState_Flushed,
Binlogs: []*datapb.FieldBinlog{
{
Binlogs: []*datapb.Binlog{
{EntriesNum: 5, LogID: 2},
},
},
},
Deltalogs: []*datapb.FieldBinlog{
{
Binlogs: []*datapb.Binlog{
{EntriesNum: 5, LogID: 2},
},
},
},
IsSorted: true,
},
}

seg3 := &SegmentInfo{
SegmentInfo: &datapb.SegmentInfo{
ID: 3,
CollectionID: 1111,
PartitionID: 1,
LastExpireTime: 100,
NumOfRows: 100,
MaxRowNum: 300,
InsertChannel: "ch1",
State: commonpb.SegmentState_Flushed,
IsSorted: true,
},
}

tests := []struct {
name string
fields fields
Expand All @@ -230,71 +310,18 @@ func Test_compactionTrigger_force(t *testing.T) {
channelCPs: newChannelCps(),
segments: &SegmentsInfo{
segments: map[int64]*SegmentInfo{
1: {
SegmentInfo: &datapb.SegmentInfo{
ID: 1,
CollectionID: 2,
PartitionID: 1,
LastExpireTime: 100,
NumOfRows: 100,
MaxRowNum: 300,
InsertChannel: "ch1",
State: commonpb.SegmentState_Flushed,
Binlogs: []*datapb.FieldBinlog{
{
Binlogs: []*datapb.Binlog{
{EntriesNum: 5, LogID: 1},
},
},
},
Deltalogs: []*datapb.FieldBinlog{
{
Binlogs: []*datapb.Binlog{
{EntriesNum: 5, LogID: 1},
},
},
},
IsSorted: true,
},
},
2: {
SegmentInfo: &datapb.SegmentInfo{
ID: 2,
CollectionID: 2,
PartitionID: 1,
LastExpireTime: 100,
NumOfRows: 100,
MaxRowNum: 300,
InsertChannel: "ch1",
State: commonpb.SegmentState_Flushed,
Binlogs: []*datapb.FieldBinlog{
{
Binlogs: []*datapb.Binlog{
{EntriesNum: 5, LogID: 2},
},
},
},
Deltalogs: []*datapb.FieldBinlog{
{
Binlogs: []*datapb.Binlog{
{EntriesNum: 5, LogID: 2},
},
},
},
IsSorted: true,
1: seg1,
2: seg2,
3: seg3,
},
secondaryIndexes: segmentInfoIndexes{
coll2Segments: map[UniqueID]map[UniqueID]*SegmentInfo{
2: {
seg1.GetID(): seg1,
seg2.GetID(): seg2,
},
},
3: {
SegmentInfo: &datapb.SegmentInfo{
ID: 3,
CollectionID: 1111,
PartitionID: 1,
LastExpireTime: 100,
NumOfRows: 100,
MaxRowNum: 300,
InsertChannel: "ch1",
State: commonpb.SegmentState_Flushed,
IsSorted: true,
1111: {
seg3.GetID(): seg3,
},
},
},
Expand Down Expand Up @@ -617,7 +644,13 @@ func Test_compactionTrigger_force(t *testing.T) {
t.Run(tt.name+" with DiskANN index", func(t *testing.T) {
for _, segment := range tt.fields.meta.segments.GetSegments() {
// Collection 1000 means it has DiskANN index
delete(tt.fields.meta.segments.secondaryIndexes.coll2Segments[segment.GetCollectionID()], segment.GetID())
segment.CollectionID = 1000
_, ok := tt.fields.meta.segments.secondaryIndexes.coll2Segments[segment.GetCollectionID()]
if !ok {
tt.fields.meta.segments.secondaryIndexes.coll2Segments[segment.GetCollectionID()] = make(map[UniqueID]*SegmentInfo)
}
tt.fields.meta.segments.secondaryIndexes.coll2Segments[segment.GetCollectionID()][segment.GetID()] = segment
}
tr := &compactionTrigger{
meta: tt.fields.meta,
Expand Down Expand Up @@ -725,6 +758,9 @@ func Test_compactionTrigger_force_maxSegmentLimit(t *testing.T) {
vecFieldID := int64(201)
segmentInfos := &SegmentsInfo{
segments: make(map[UniqueID]*SegmentInfo),
secondaryIndexes: segmentInfoIndexes{
coll2Segments: make(map[UniqueID]map[UniqueID]*SegmentInfo),
},
}

indexMeta := newSegmentIndexMeta(nil)
Expand All @@ -751,6 +787,7 @@ func Test_compactionTrigger_force_maxSegmentLimit(t *testing.T) {
},
}

segmentInfos.secondaryIndexes.coll2Segments[2] = make(map[UniqueID]*SegmentInfo)
nSegments := 50
for i := UniqueID(0); i < UniqueID(nSegments); i++ {
info := &SegmentInfo{
Expand Down Expand Up @@ -794,6 +831,7 @@ func Test_compactionTrigger_force_maxSegmentLimit(t *testing.T) {
})

segmentInfos.segments[i] = info
segmentInfos.secondaryIndexes.coll2Segments[2][i] = info
}

mock0Allocator := newMockAllocator(t)
Expand Down Expand Up @@ -1110,15 +1148,28 @@ func mockSegment(segID, rows, deleteRows, sizeInMB int64) *datapb.SegmentInfo {

func mockSegmentsInfo(sizeInMB ...int64) *SegmentsInfo {
segments := make(map[int64]*SegmentInfo, len(sizeInMB))
collectionID := int64(2)
channel := "ch1"
coll2Segments := make(map[UniqueID]map[UniqueID]*SegmentInfo)
coll2Segments[collectionID] = make(map[UniqueID]*SegmentInfo)
channel2Segments := make(map[string]map[UniqueID]*SegmentInfo)
channel2Segments[channel] = make(map[UniqueID]*SegmentInfo)
for i, size := range sizeInMB {
segId := int64(i + 1)
segments[segId] = &SegmentInfo{
info := &SegmentInfo{
SegmentInfo: mockSegment(segId, size, 1, size),
lastFlushTime: time.Now().Add(-100 * time.Minute),
}
segments[segId] = info
coll2Segments[collectionID][segId] = info
channel2Segments[channel][segId] = info
}
return &SegmentsInfo{
segments: segments,
secondaryIndexes: segmentInfoIndexes{
coll2Segments: coll2Segments,
channel2Segments: channel2Segments,
},
}
}

Expand Down Expand Up @@ -1564,6 +1615,10 @@ func Test_compactionTrigger_noplan_random_size(t *testing.T) {

segmentInfos := &SegmentsInfo{
segments: make(map[UniqueID]*SegmentInfo),
secondaryIndexes: segmentInfoIndexes{
coll2Segments: map[UniqueID]map[UniqueID]*SegmentInfo{2: {}},
channel2Segments: map[string]map[UniqueID]*SegmentInfo{"ch1": {}},
},
}

size := []int64{
Expand Down Expand Up @@ -1636,6 +1691,8 @@ func Test_compactionTrigger_noplan_random_size(t *testing.T) {
})

segmentInfos.segments[i] = info
segmentInfos.secondaryIndexes.coll2Segments[2][i] = info
segmentInfos.secondaryIndexes.channel2Segments["ch1"][i] = info
}

mock0Allocator := newMockAllocator(t)
Expand Down
Loading

0 comments on commit 272d95a

Please sign in to comment.