Skip to content

Commit

Permalink
fix: Fix slow dist handle and slow observe (#38566)
Browse files Browse the repository at this point in the history
1. Provide partition&channel level indexing in the collection target.
2. Make `SegmentAction` not wait for distribution.
3. Remove scheduler and target manager mutex.
4. Optimize logging to reduce CPU overhead.

issue: #37630

---------

Signed-off-by: bigsheeper <[email protected]>
  • Loading branch information
bigsheeper authored Jan 15, 2025
1 parent 38881bf commit 657550c
Show file tree
Hide file tree
Showing 11 changed files with 324 additions and 394 deletions.
10 changes: 9 additions & 1 deletion internal/querycoordv2/dist/dist_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

"github.com/samber/lo"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
Expand All @@ -37,6 +38,7 @@ import (
"github.com/milvus-io/milvus/pkg/util/commonpbutil"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/timerecord"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)

Expand Down Expand Up @@ -91,7 +93,9 @@ func (dh *distHandler) start(ctx context.Context) {
}

func (dh *distHandler) pullDist(ctx context.Context, failures *int, dispatchTask bool) {
tr := timerecord.NewTimeRecorder("")
resp, err := dh.getDistribution(ctx)
d1 := tr.RecordSpan()
if err != nil {
node := dh.nodeManager.Get(dh.nodeID)
*failures = *failures + 1
Expand All @@ -100,11 +104,15 @@ func (dh *distHandler) pullDist(ctx context.Context, failures *int, dispatchTask
fields = append(fields, zap.Time("lastHeartbeat", node.LastHeartbeat()))
}
fields = append(fields, zap.Error(err))
log.RatedWarn(30.0, "failed to get data distribution", fields...)
log.Ctx(ctx).WithRateGroup("distHandler.pullDist", 1, 60).
RatedWarn(30.0, "failed to get data distribution", fields...)
} else {
*failures = 0
dh.handleDistResp(ctx, resp, dispatchTask)
}
log.Ctx(ctx).WithRateGroup("distHandler.pullDist", 1, 120).
RatedInfo(120.0, "pull and handle distribution done",
zap.Int("respSize", proto.Size(resp)), zap.Duration("pullDur", d1), zap.Duration("handleDur", tr.RecordSpan()))
}

func (dh *distHandler) handleDistResp(ctx context.Context, resp *querypb.GetDataDistributionResponse, dispatchTask bool) {
Expand Down
103 changes: 74 additions & 29 deletions internal/querycoordv2/meta/target.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,50 +26,80 @@ import (
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/proto/datapb"
"github.com/milvus-io/milvus/pkg/proto/querypb"
"github.com/milvus-io/milvus/pkg/util/lock"
"github.com/milvus-io/milvus/pkg/util/metricsinfo"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)

// CollectionTarget collection target is immutable,
type CollectionTarget struct {
segments map[int64]*datapb.SegmentInfo
dmChannels map[string]*DmChannel
partitions typeutil.Set[int64] // stores target partitions info
version int64
segments map[int64]*datapb.SegmentInfo
channel2Segments map[string][]*datapb.SegmentInfo
partition2Segments map[int64][]*datapb.SegmentInfo
dmChannels map[string]*DmChannel
partitions typeutil.Set[int64] // stores target partitions info
version int64

// record target status, if target has been save before milvus v2.4.19, then the target will lack of segment info.
lackSegmentInfo bool
}

func NewCollectionTarget(segments map[int64]*datapb.SegmentInfo, dmChannels map[string]*DmChannel, partitionIDs []int64) *CollectionTarget {
channel2Segments := make(map[string][]*datapb.SegmentInfo, len(dmChannels))
partition2Segments := make(map[int64][]*datapb.SegmentInfo, len(partitionIDs))
for _, segment := range segments {
channel := segment.GetInsertChannel()
if _, ok := channel2Segments[channel]; !ok {
channel2Segments[channel] = make([]*datapb.SegmentInfo, 0)
}
channel2Segments[channel] = append(channel2Segments[channel], segment)
partitionID := segment.GetPartitionID()
if _, ok := partition2Segments[partitionID]; !ok {
partition2Segments[partitionID] = make([]*datapb.SegmentInfo, 0)
}
partition2Segments[partitionID] = append(partition2Segments[partitionID], segment)
}
return &CollectionTarget{
segments: segments,
dmChannels: dmChannels,
partitions: typeutil.NewSet(partitionIDs...),
version: time.Now().UnixNano(),
segments: segments,
channel2Segments: channel2Segments,
partition2Segments: partition2Segments,
dmChannels: dmChannels,
partitions: typeutil.NewSet(partitionIDs...),
version: time.Now().UnixNano(),
}
}

func FromPbCollectionTarget(target *querypb.CollectionTarget) *CollectionTarget {
segments := make(map[int64]*datapb.SegmentInfo)
dmChannels := make(map[string]*DmChannel)
channel2Segments := make(map[string][]*datapb.SegmentInfo)
partition2Segments := make(map[int64][]*datapb.SegmentInfo)
var partitions []int64

lackSegmentInfo := false
for _, t := range target.GetChannelTargets() {
if _, ok := channel2Segments[t.GetChannelName()]; !ok {
channel2Segments[t.GetChannelName()] = make([]*datapb.SegmentInfo, 0)
}
for _, partition := range t.GetPartitionTargets() {
if _, ok := partition2Segments[partition.GetPartitionID()]; !ok {
partition2Segments[partition.GetPartitionID()] = make([]*datapb.SegmentInfo, 0, len(partition.GetSegments()))
}
for _, segment := range partition.GetSegments() {
if segment.GetNumOfRows() <= 0 {
lackSegmentInfo = true
}
segments[segment.GetID()] = &datapb.SegmentInfo{
info := &datapb.SegmentInfo{
ID: segment.GetID(),
Level: segment.GetLevel(),
CollectionID: target.GetCollectionID(),
PartitionID: partition.GetPartitionID(),
InsertChannel: t.GetChannelName(),
NumOfRows: segment.GetNumOfRows(),
}
segments[segment.GetID()] = info
channel2Segments[t.GetChannelName()] = append(channel2Segments[t.GetChannelName()], info)
partition2Segments[partition.GetPartitionID()] = append(partition2Segments[partition.GetPartitionID()], info)
}
partitions = append(partitions, partition.GetPartitionID())
}
Expand All @@ -90,11 +120,13 @@ func FromPbCollectionTarget(target *querypb.CollectionTarget) *CollectionTarget
}

return &CollectionTarget{
segments: segments,
dmChannels: dmChannels,
partitions: typeutil.NewSet(partitions...),
version: target.GetVersion(),
lackSegmentInfo: lackSegmentInfo,
segments: segments,
channel2Segments: channel2Segments,
partition2Segments: partition2Segments,
dmChannels: dmChannels,
partitions: typeutil.NewSet(partitions...),
version: target.GetVersion(),
lackSegmentInfo: lackSegmentInfo,
}
}

Expand Down Expand Up @@ -155,6 +187,14 @@ func (p *CollectionTarget) GetAllSegments() map[int64]*datapb.SegmentInfo {
return p.segments
}

func (p *CollectionTarget) GetChannelSegments(channel string) []*datapb.SegmentInfo {
return p.channel2Segments[channel]
}

func (p *CollectionTarget) GetPartitionSegments(partitionID int64) []*datapb.SegmentInfo {
return p.partition2Segments[partitionID]
}

func (p *CollectionTarget) GetTargetVersion() int64 {
return p.version
}
Expand All @@ -181,39 +221,43 @@ func (p *CollectionTarget) Ready() bool {
}

type target struct {
keyLock *lock.KeyLock[int64] // guards updateCollectionTarget
// just maintain target at collection level
collectionTargetMap map[int64]*CollectionTarget
collectionTargetMap *typeutil.ConcurrentMap[int64, *CollectionTarget]
}

func newTarget() *target {
return &target{
collectionTargetMap: make(map[int64]*CollectionTarget),
keyLock: lock.NewKeyLock[int64](),
collectionTargetMap: typeutil.NewConcurrentMap[int64, *CollectionTarget](),
}
}

func (t *target) updateCollectionTarget(collectionID int64, target *CollectionTarget) {
if t.collectionTargetMap[collectionID] != nil && target.GetTargetVersion() <= t.collectionTargetMap[collectionID].GetTargetVersion() {
t.keyLock.Lock(collectionID)
defer t.keyLock.Unlock(collectionID)
if old, ok := t.collectionTargetMap.Get(collectionID); ok && old != nil && target.GetTargetVersion() <= old.GetTargetVersion() {
return
}

t.collectionTargetMap[collectionID] = target
t.collectionTargetMap.Insert(collectionID, target)
}

func (t *target) removeCollectionTarget(collectionID int64) {
delete(t.collectionTargetMap, collectionID)
t.collectionTargetMap.Remove(collectionID)
}

func (t *target) getCollectionTarget(collectionID int64) *CollectionTarget {
return t.collectionTargetMap[collectionID]
ret, _ := t.collectionTargetMap.Get(collectionID)
return ret
}

func (t *target) toQueryCoordCollectionTargets(collectionID int64) []*metricsinfo.QueryCoordTarget {
var ret []*metricsinfo.QueryCoordTarget
for k, v := range t.collectionTargetMap {
targets := make([]*metricsinfo.QueryCoordTarget, 0, t.collectionTargetMap.Len())
t.collectionTargetMap.Range(func(k int64, v *CollectionTarget) bool {
if collectionID > 0 && collectionID != k {
continue
return true
}

segments := lo.MapToSlice(v.GetAllSegments(), func(k int64, s *datapb.SegmentInfo) *metricsinfo.Segment {
return metrics.NewSegmentFrom(s)
})
Expand All @@ -222,12 +266,13 @@ func (t *target) toQueryCoordCollectionTargets(collectionID int64) []*metricsinf
return metrics.NewDMChannelFrom(ch.VchannelInfo)
})

ret = append(ret, &metricsinfo.QueryCoordTarget{
qct := &metricsinfo.QueryCoordTarget{
CollectionID: k,
Segments: segments,
DMChannels: dmChannels,
})
}

return ret
}
targets = append(targets, qct)
return true
})
return targets
}
Loading

0 comments on commit 657550c

Please sign in to comment.