Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Fix slow dist handle and slow observe #38566

Merged
merged 24 commits into from
Jan 15, 2025
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
d10908a
fix: Fix slow dist handle and slow observe
bigsheeper Dec 18, 2024
f4b2487
print observe and dist handler time
bigsheeper Dec 19, 2024
86abc77
fix conflicts
bigsheeper Dec 26, 2024
609039d
opt release
bigsheeper Dec 26, 2024
f0e0aae
Merge branch 'master' of https://github.com/milvus-io/milvus into 241…
bigsheeper Dec 27, 2024
8802f4e
enhance: Add channel index in target, optimize logs
bigsheeper Dec 27, 2024
2ca3464
Merge branch 'master' of https://github.com/milvus-io/milvus into 241…
bigsheeper Dec 30, 2024
4bbd59e
Merge branch 'master' of https://github.com/milvus-io/milvus into 241…
bigsheeper Dec 30, 2024
dc67e81
Merge branch 'master' of https://github.com/milvus-io/milvus into 241…
bigsheeper Dec 31, 2024
d59d1e7
fix
bigsheeper Dec 31, 2024
7580181
fix ut
bigsheeper Jan 1, 2025
52e51a3
Merge branch 'master' of https://github.com/milvus-io/milvus into 241…
bigsheeper Jan 1, 2025
9e4c859
code format
bigsheeper Jan 1, 2025
19c5d33
enhance: Reducing the granularity of locks in the target manager
bigsheeper Jan 2, 2025
2ce9dd0
Merge branch 'master' of https://github.com/milvus-io/milvus into 241…
bigsheeper Jan 2, 2025
c99949b
enhance: Remove scheduler and target manager mutex
bigsheeper Jan 3, 2025
bd3f5cd
Merge branch 'master' of https://github.com/milvus-io/milvus into 241…
bigsheeper Jan 3, 2025
c71657b
fix
bigsheeper Jan 3, 2025
66e7e05
Merge branch 'master' of https://github.com/milvus-io/milvus into 241…
bigsheeper Jan 3, 2025
beedd9e
fix conflicts
bigsheeper Jan 8, 2025
086bbf4
fix conflicts
bigsheeper Jan 14, 2025
9d4ee7f
update
bigsheeper Jan 14, 2025
16cac14
Merge branch 'master' of https://github.com/milvus-io/milvus into 241…
bigsheeper Jan 15, 2025
e76bcab
fix data rce
bigsheeper Jan 15, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion internal/querycoordv2/dist/dist_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

"github.com/samber/lo"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
Expand All @@ -37,6 +38,7 @@ import (
"github.com/milvus-io/milvus/pkg/util/commonpbutil"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/timerecord"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)

Expand Down Expand Up @@ -91,7 +93,9 @@ func (dh *distHandler) start(ctx context.Context) {
}

func (dh *distHandler) pullDist(ctx context.Context, failures *int, dispatchTask bool) {
tr := timerecord.NewTimeRecorder("")
resp, err := dh.getDistribution(ctx)
d1 := tr.RecordSpan()
if err != nil {
node := dh.nodeManager.Get(dh.nodeID)
*failures = *failures + 1
Expand All @@ -100,11 +104,15 @@ func (dh *distHandler) pullDist(ctx context.Context, failures *int, dispatchTask
fields = append(fields, zap.Time("lastHeartbeat", node.LastHeartbeat()))
}
fields = append(fields, zap.Error(err))
log.RatedWarn(30.0, "failed to get data distribution", fields...)
log.Ctx(ctx).WithRateGroup("distHandler.pullDist", 1, 60).
RatedWarn(30.0, "failed to get data distribution", fields...)
} else {
*failures = 0
dh.handleDistResp(ctx, resp, dispatchTask)
}
log.Ctx(ctx).WithRateGroup("distHandler.pullDist", 1, 120).
RatedInfo(120.0, "pull and handle distribution done",
zap.Int("respSize", proto.Size(resp)), zap.Duration("pullDur", d1), zap.Duration("handleDur", tr.RecordSpan()))
}

func (dh *distHandler) handleDistResp(ctx context.Context, resp *querypb.GetDataDistributionResponse, dispatchTask bool) {
Expand Down
93 changes: 71 additions & 22 deletions internal/querycoordv2/meta/target.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,50 +26,80 @@ import (
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/util/metrics"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/lock"
"github.com/milvus-io/milvus/pkg/util/metricsinfo"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)

// CollectionTarget collection target is immutable,
type CollectionTarget struct {
segments map[int64]*datapb.SegmentInfo
dmChannels map[string]*DmChannel
partitions typeutil.Set[int64] // stores target partitions info
version int64
segments map[int64]*datapb.SegmentInfo
channel2Segments map[string][]*datapb.SegmentInfo
partition2Segments map[int64][]*datapb.SegmentInfo
dmChannels map[string]*DmChannel
partitions typeutil.Set[int64] // stores target partitions info
version int64

// record target status, if target has been save before milvus v2.4.19, then the target will lack of segment info.
lackSegmentInfo bool
}

func NewCollectionTarget(segments map[int64]*datapb.SegmentInfo, dmChannels map[string]*DmChannel, partitionIDs []int64) *CollectionTarget {
channel2Segments := make(map[string][]*datapb.SegmentInfo, len(dmChannels))
partition2Segments := make(map[int64][]*datapb.SegmentInfo, len(partitionIDs))
for _, segment := range segments {
channel := segment.GetInsertChannel()
if _, ok := channel2Segments[channel]; !ok {
channel2Segments[channel] = make([]*datapb.SegmentInfo, 0)
}
channel2Segments[channel] = append(channel2Segments[channel], segment)
partitionID := segment.GetPartitionID()
if _, ok := partition2Segments[partitionID]; !ok {
partition2Segments[partitionID] = make([]*datapb.SegmentInfo, 0)
}
partition2Segments[partitionID] = append(partition2Segments[partitionID], segment)
}
return &CollectionTarget{
segments: segments,
dmChannels: dmChannels,
partitions: typeutil.NewSet(partitionIDs...),
version: time.Now().UnixNano(),
segments: segments,
channel2Segments: channel2Segments,
partition2Segments: partition2Segments,
dmChannels: dmChannels,
partitions: typeutil.NewSet(partitionIDs...),
version: time.Now().UnixNano(),
}
}

func FromPbCollectionTarget(target *querypb.CollectionTarget) *CollectionTarget {
segments := make(map[int64]*datapb.SegmentInfo)
dmChannels := make(map[string]*DmChannel)
channel2Segments := make(map[string][]*datapb.SegmentInfo)
partition2Segments := make(map[int64][]*datapb.SegmentInfo)
var partitions []int64

lackSegmentInfo := false
for _, t := range target.GetChannelTargets() {
if _, ok := channel2Segments[t.GetChannelName()]; !ok {
channel2Segments[t.GetChannelName()] = make([]*datapb.SegmentInfo, 0)
}
for _, partition := range t.GetPartitionTargets() {
if _, ok := partition2Segments[partition.GetPartitionID()]; !ok {
partition2Segments[partition.GetPartitionID()] = make([]*datapb.SegmentInfo, 0, len(partition.GetSegments()))
}
for _, segment := range partition.GetSegments() {
if segment.GetNumOfRows() <= 0 {
lackSegmentInfo = true
}
segments[segment.GetID()] = &datapb.SegmentInfo{
info := &datapb.SegmentInfo{
ID: segment.GetID(),
Level: segment.GetLevel(),
CollectionID: target.GetCollectionID(),
PartitionID: partition.GetPartitionID(),
InsertChannel: t.GetChannelName(),
NumOfRows: segment.GetNumOfRows(),
}
segments[segment.GetID()] = info
channel2Segments[t.GetChannelName()] = append(channel2Segments[t.GetChannelName()], info)
partition2Segments[partition.GetPartitionID()] = append(partition2Segments[partition.GetPartitionID()], info)
bigsheeper marked this conversation as resolved.
Show resolved Hide resolved
}
partitions = append(partitions, partition.GetPartitionID())
}
Expand All @@ -90,11 +120,13 @@ func FromPbCollectionTarget(target *querypb.CollectionTarget) *CollectionTarget
}

return &CollectionTarget{
segments: segments,
dmChannels: dmChannels,
partitions: typeutil.NewSet(partitions...),
version: target.GetVersion(),
lackSegmentInfo: lackSegmentInfo,
segments: segments,
channel2Segments: channel2Segments,
partition2Segments: partition2Segments,
dmChannels: dmChannels,
partitions: typeutil.NewSet(partitions...),
version: target.GetVersion(),
lackSegmentInfo: lackSegmentInfo,
}
}

Expand Down Expand Up @@ -155,6 +187,14 @@ func (p *CollectionTarget) GetAllSegments() map[int64]*datapb.SegmentInfo {
return p.segments
}

func (p *CollectionTarget) GetChannelSegments(channel string) []*datapb.SegmentInfo {
return p.channel2Segments[channel]
}

func (p *CollectionTarget) GetPartitionSegments(partitionID int64) []*datapb.SegmentInfo {
return p.partition2Segments[partitionID]
}

func (p *CollectionTarget) GetTargetVersion() int64 {
return p.version
}
Expand All @@ -181,34 +221,40 @@ func (p *CollectionTarget) Ready() bool {
}

type target struct {
keyLock *lock.KeyLock[int64] // guards updateCollectionTarget
// just maintain target at collection level
collectionTargetMap map[int64]*CollectionTarget
collectionTargetMap *typeutil.ConcurrentMap[int64, *CollectionTarget]
}

func newTarget() *target {
return &target{
collectionTargetMap: make(map[int64]*CollectionTarget),
keyLock: lock.NewKeyLock[int64](),
collectionTargetMap: typeutil.NewConcurrentMap[int64, *CollectionTarget](),
}
}

func (t *target) updateCollectionTarget(collectionID int64, target *CollectionTarget) {
if t.collectionTargetMap[collectionID] != nil && target.GetTargetVersion() <= t.collectionTargetMap[collectionID].GetTargetVersion() {
t.keyLock.Lock(collectionID)
defer t.keyLock.Unlock(collectionID)
if old, ok := t.collectionTargetMap.Get(collectionID); ok && old != nil && target.GetTargetVersion() <= old.GetTargetVersion() {
return
}

t.collectionTargetMap[collectionID] = target
t.collectionTargetMap.Insert(collectionID, target)
}

func (t *target) removeCollectionTarget(collectionID int64) {
delete(t.collectionTargetMap, collectionID)
t.collectionTargetMap.Remove(collectionID)
}

func (t *target) getCollectionTarget(collectionID int64) *CollectionTarget {
return t.collectionTargetMap[collectionID]
ret, _ := t.collectionTargetMap.Get(collectionID)
return ret
}

func (t *target) toQueryCoordCollectionTargets() []*metricsinfo.QueryCoordTarget {
return lo.MapToSlice(t.collectionTargetMap, func(k int64, v *CollectionTarget) *metricsinfo.QueryCoordTarget {
targets := make([]*metricsinfo.QueryCoordTarget, 0, t.collectionTargetMap.Len())
t.collectionTargetMap.Range(func(k int64, v *CollectionTarget) bool {
segments := lo.MapToSlice(v.GetAllSegments(), func(k int64, s *datapb.SegmentInfo) *metricsinfo.Segment {
return metrics.NewSegmentFrom(s)
})
Expand All @@ -217,10 +263,13 @@ func (t *target) toQueryCoordCollectionTargets() []*metricsinfo.QueryCoordTarget
return metrics.NewDMChannelFrom(ch.VchannelInfo)
})

return &metricsinfo.QueryCoordTarget{
qct := &metricsinfo.QueryCoordTarget{
CollectionID: k,
Segments: segments,
DMChannels: dmChannels,
}
targets = append(targets, qct)
return true
})
return targets
}
Loading
Loading