diff --git a/internal/querycoordv2/dist/dist_handler.go b/internal/querycoordv2/dist/dist_handler.go index 4eec668eda1ec..100520aff4833 100644 --- a/internal/querycoordv2/dist/dist_handler.go +++ b/internal/querycoordv2/dist/dist_handler.go @@ -24,6 +24,7 @@ import ( "github.com/samber/lo" "go.uber.org/zap" + "google.golang.org/protobuf/proto" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus/internal/querycoordv2/meta" @@ -37,6 +38,7 @@ import ( "github.com/milvus-io/milvus/pkg/util/commonpbutil" "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/paramtable" + "github.com/milvus-io/milvus/pkg/util/timerecord" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -91,7 +93,9 @@ func (dh *distHandler) start(ctx context.Context) { } func (dh *distHandler) pullDist(ctx context.Context, failures *int, dispatchTask bool) { + tr := timerecord.NewTimeRecorder("") resp, err := dh.getDistribution(ctx) + d1 := tr.RecordSpan() if err != nil { node := dh.nodeManager.Get(dh.nodeID) *failures = *failures + 1 @@ -100,11 +104,15 @@ func (dh *distHandler) pullDist(ctx context.Context, failures *int, dispatchTask fields = append(fields, zap.Time("lastHeartbeat", node.LastHeartbeat())) } fields = append(fields, zap.Error(err)) - log.RatedWarn(30.0, "failed to get data distribution", fields...) + log.Ctx(ctx).WithRateGroup("distHandler.pullDist", 1, 60). + RatedWarn(30.0, "failed to get data distribution", fields...) } else { *failures = 0 dh.handleDistResp(resp, dispatchTask) } + log.Ctx(ctx).WithRateGroup("distHandler.pullDist", 1, 120). + RatedInfo(120.0, "pull and handle distribution done", + zap.Int("respSize", proto.Size(resp)), zap.Duration("pullDur", d1), zap.Duration("handleDur", tr.RecordSpan())) } func (dh *distHandler) handleDistResp(resp *querypb.GetDataDistributionResponse, dispatchTask bool) { diff --git a/internal/querycoordv2/meta/target.go b/internal/querycoordv2/meta/target.go index 2aab218711b82..54e7bebbe8887 100644 --- a/internal/querycoordv2/meta/target.go +++ b/internal/querycoordv2/meta/target.go @@ -25,42 +25,69 @@ import ( "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/proto/datapb" "github.com/milvus-io/milvus/pkg/proto/querypb" + "github.com/milvus-io/milvus/pkg/util/lock" "github.com/milvus-io/milvus/pkg/util/typeutil" ) // CollectionTarget collection target is immutable, type CollectionTarget struct { - segments map[int64]*datapb.SegmentInfo - dmChannels map[string]*DmChannel - partitions typeutil.Set[int64] // stores target partitions info - version int64 + segments map[int64]*datapb.SegmentInfo + channel2Segments map[string][]*datapb.SegmentInfo + partition2Segments map[int64][]*datapb.SegmentInfo + dmChannels map[string]*DmChannel + partitions typeutil.Set[int64] // stores target partitions info + version int64 // record target status, if target has been save before milvus v2.4.19, then the target will lack of segment info. lackSegmentInfo bool } func NewCollectionTarget(segments map[int64]*datapb.SegmentInfo, dmChannels map[string]*DmChannel, partitionIDs []int64) *CollectionTarget { + channel2Segments := make(map[string][]*datapb.SegmentInfo, len(dmChannels)) + partition2Segments := make(map[int64][]*datapb.SegmentInfo, len(partitionIDs)) + for _, segment := range segments { + channel := segment.GetInsertChannel() + if _, ok := channel2Segments[channel]; !ok { + channel2Segments[channel] = make([]*datapb.SegmentInfo, 0) + } + channel2Segments[channel] = append(channel2Segments[channel], segment) + partitionID := segment.GetPartitionID() + if _, ok := partition2Segments[partitionID]; !ok { + partition2Segments[partitionID] = make([]*datapb.SegmentInfo, 0) + } + partition2Segments[partitionID] = append(partition2Segments[partitionID], segment) + } return &CollectionTarget{ - segments: segments, - dmChannels: dmChannels, - partitions: typeutil.NewSet(partitionIDs...), - version: time.Now().UnixNano(), + segments: segments, + channel2Segments: channel2Segments, + partition2Segments: partition2Segments, + dmChannels: dmChannels, + partitions: typeutil.NewSet(partitionIDs...), + version: time.Now().UnixNano(), } } func FromPbCollectionTarget(target *querypb.CollectionTarget) *CollectionTarget { segments := make(map[int64]*datapb.SegmentInfo) dmChannels := make(map[string]*DmChannel) + channel2Segments := make(map[string][]*datapb.SegmentInfo) + partition2Segments := make(map[int64][]*datapb.SegmentInfo) var partitions []int64 lackSegmentInfo := false for _, t := range target.GetChannelTargets() { + if _, ok := channel2Segments[t.GetChannelName()]; !ok { + channel2Segments[t.GetChannelName()] = make([]*datapb.SegmentInfo, 0) + } for _, partition := range t.GetPartitionTargets() { + if _, ok := partition2Segments[partition.GetPartitionID()]; !ok { + partition2Segments[partition.GetPartitionID()] = make([]*datapb.SegmentInfo, 0, len(partition.GetSegments())) + } for _, segment := range partition.GetSegments() { if segment.GetNumOfRows() <= 0 { lackSegmentInfo = true } - segments[segment.GetID()] = &datapb.SegmentInfo{ + info := &datapb.SegmentInfo{ ID: segment.GetID(), Level: segment.GetLevel(), CollectionID: target.GetCollectionID(), @@ -68,6 +95,9 @@ func FromPbCollectionTarget(target *querypb.CollectionTarget) *CollectionTarget InsertChannel: t.GetChannelName(), NumOfRows: segment.GetNumOfRows(), } + segments[segment.GetID()] = info + channel2Segments[t.GetChannelName()] = append(channel2Segments[t.GetChannelName()], info) + partition2Segments[partition.GetPartitionID()] = append(partition2Segments[partition.GetPartitionID()], info) } partitions = append(partitions, partition.GetPartitionID()) } @@ -88,11 +118,13 @@ func FromPbCollectionTarget(target *querypb.CollectionTarget) *CollectionTarget } return &CollectionTarget{ - segments: segments, - dmChannels: dmChannels, - partitions: typeutil.NewSet(partitions...), - version: target.GetVersion(), - lackSegmentInfo: lackSegmentInfo, + segments: segments, + channel2Segments: channel2Segments, + partition2Segments: partition2Segments, + dmChannels: dmChannels, + partitions: typeutil.NewSet(partitions...), + version: target.GetVersion(), + lackSegmentInfo: lackSegmentInfo, } } @@ -153,6 +185,14 @@ func (p *CollectionTarget) GetAllSegments() map[int64]*datapb.SegmentInfo { return p.segments } +func (p *CollectionTarget) GetChannelSegments(channel string) []*datapb.SegmentInfo { + return p.channel2Segments[channel] +} + +func (p *CollectionTarget) GetPartitionSegments(partitionID int64) []*datapb.SegmentInfo { + return p.partition2Segments[partitionID] +} + func (p *CollectionTarget) GetTargetVersion() int64 { return p.version } @@ -179,28 +219,33 @@ func (p *CollectionTarget) Ready() bool { } type target struct { + keyLock *lock.KeyLock[int64] // guards updateCollectionTarget // just maintain target at collection level - collectionTargetMap map[int64]*CollectionTarget + collectionTargetMap *typeutil.ConcurrentMap[int64, *CollectionTarget] } func newTarget() *target { return &target{ - collectionTargetMap: make(map[int64]*CollectionTarget), + keyLock: lock.NewKeyLock[int64](), + collectionTargetMap: typeutil.NewConcurrentMap[int64, *CollectionTarget](), } } func (t *target) updateCollectionTarget(collectionID int64, target *CollectionTarget) { - if t.collectionTargetMap[collectionID] != nil && target.GetTargetVersion() <= t.collectionTargetMap[collectionID].GetTargetVersion() { + t.keyLock.Lock(collectionID) + defer t.keyLock.Unlock(collectionID) + if old, ok := t.collectionTargetMap.Get(collectionID); ok && old != nil && target.GetTargetVersion() <= old.GetTargetVersion() { return } - t.collectionTargetMap[collectionID] = target + t.collectionTargetMap.Insert(collectionID, target) } func (t *target) removeCollectionTarget(collectionID int64) { - delete(t.collectionTargetMap, collectionID) + t.collectionTargetMap.Remove(collectionID) } func (t *target) getCollectionTarget(collectionID int64) *CollectionTarget { - return t.collectionTargetMap[collectionID] + ret, _ := t.collectionTargetMap.Get(collectionID) + return ret } diff --git a/internal/querycoordv2/meta/target_manager.go b/internal/querycoordv2/meta/target_manager.go index 89b8710e1efc4..8759285439401 100644 --- a/internal/querycoordv2/meta/target_manager.go +++ b/internal/querycoordv2/meta/target_manager.go @@ -72,9 +72,8 @@ type TargetManagerInterface interface { } type TargetManager struct { - rwMutex sync.RWMutex - broker Broker - meta *Meta + broker Broker + meta *Meta // all read segment/channel operation happens on current -> only current target are visible to outer // all add segment/channel operation happens on next -> changes can only happen on next target @@ -96,8 +95,6 @@ func NewTargetManager(broker Broker, meta *Meta) *TargetManager { // WARN: DO NOT call this method for an existing collection as target observer running, or it will lead to a double-update, // which may make the current target not available func (mgr *TargetManager) UpdateCollectionCurrentTarget(collectionID int64) bool { - mgr.rwMutex.Lock() - defer mgr.rwMutex.Unlock() log := log.With(zap.Int64("collectionID", collectionID)) log.Debug("start to update current target for collection") @@ -152,8 +149,6 @@ func (mgr *TargetManager) UpdateCollectionNextTarget(collectionID int64) error { return err } - mgr.rwMutex.Lock() - defer mgr.rwMutex.Unlock() partitions := mgr.meta.GetPartitionsByCollection(collectionID) partitionIDs := lo.Map(partitions, func(partition *Partition, i int) int64 { return partition.PartitionID @@ -183,7 +178,7 @@ func (mgr *TargetManager) UpdateCollectionNextTarget(collectionID int64) error { } for _, infos := range channelInfos { - merged := mgr.mergeDmChannelInfo(infos) + merged := mergeDmChannelInfo(infos) dmChannels[merged.GetChannelName()] = merged } @@ -193,7 +188,9 @@ func (mgr *TargetManager) UpdateCollectionNextTarget(collectionID int64) error { } allocatedTarget := NewCollectionTarget(segments, dmChannels, partitionIDs) + mgr.next.updateCollectionTarget(collectionID, allocatedTarget) + log.Debug("finish to update next targets for collection", zap.Int64("collectionID", collectionID), zap.Int64s("PartitionIDs", partitionIDs)) @@ -201,7 +198,7 @@ func (mgr *TargetManager) UpdateCollectionNextTarget(collectionID int64) error { return nil } -func (mgr *TargetManager) mergeDmChannelInfo(infos []*datapb.VchannelInfo) *DmChannel { +func mergeDmChannelInfo(infos []*datapb.VchannelInfo) *DmChannel { var dmChannel *DmChannel for _, info := range infos { @@ -223,8 +220,6 @@ func (mgr *TargetManager) mergeDmChannelInfo(infos []*datapb.VchannelInfo) *DmCh // RemoveCollection removes all channels and segments in the given collection func (mgr *TargetManager) RemoveCollection(collectionID int64) { - mgr.rwMutex.Lock() - defer mgr.rwMutex.Unlock() log.Info("remove collection from targets", zap.Int64("collectionID", collectionID)) @@ -245,9 +240,6 @@ func (mgr *TargetManager) RemoveCollection(collectionID int64) { // RemovePartition removes all segment in the given partition, // NOTE: this doesn't remove any channel even the given one is the only partition func (mgr *TargetManager) RemovePartition(collectionID int64, partitionIDs ...int64) { - mgr.rwMutex.Lock() - defer mgr.rwMutex.Unlock() - log := log.With(zap.Int64("collectionID", collectionID), zap.Int64s("PartitionIDs", partitionIDs)) @@ -354,9 +346,6 @@ func (mgr *TargetManager) getCollectionTarget(scope TargetScope, collectionID in func (mgr *TargetManager) GetGrowingSegmentsByCollection(collectionID int64, scope TargetScope, ) typeutil.UniqueSet { - mgr.rwMutex.RLock() - defer mgr.rwMutex.RUnlock() - targets := mgr.getCollectionTarget(scope, collectionID) for _, t := range targets { @@ -377,9 +366,6 @@ func (mgr *TargetManager) GetGrowingSegmentsByChannel(collectionID int64, channelName string, scope TargetScope, ) typeutil.UniqueSet { - mgr.rwMutex.RLock() - defer mgr.rwMutex.RUnlock() - targets := mgr.getCollectionTarget(scope, collectionID) for _, t := range targets { segments := typeutil.NewUniqueSet() @@ -400,9 +386,6 @@ func (mgr *TargetManager) GetGrowingSegmentsByChannel(collectionID int64, func (mgr *TargetManager) GetSealedSegmentsByCollection(collectionID int64, scope TargetScope, ) map[int64]*datapb.SegmentInfo { - mgr.rwMutex.RLock() - defer mgr.rwMutex.RUnlock() - targets := mgr.getCollectionTarget(scope, collectionID) for _, t := range targets { @@ -416,17 +399,11 @@ func (mgr *TargetManager) GetSealedSegmentsByChannel(collectionID int64, channelName string, scope TargetScope, ) map[int64]*datapb.SegmentInfo { - mgr.rwMutex.RLock() - defer mgr.rwMutex.RUnlock() - targets := mgr.getCollectionTarget(scope, collectionID) for _, t := range targets { - ret := make(map[int64]*datapb.SegmentInfo) - for k, v := range t.GetAllSegments() { - if v.GetInsertChannel() == channelName { - ret[k] = v - } - } + ret := lo.KeyBy(t.GetChannelSegments(channelName), func(s *datapb.SegmentInfo) int64 { + return s.GetID() + }) if len(ret) > 0 { return ret @@ -440,9 +417,6 @@ func (mgr *TargetManager) GetDroppedSegmentsByChannel(collectionID int64, channelName string, scope TargetScope, ) []int64 { - mgr.rwMutex.RLock() - defer mgr.rwMutex.RUnlock() - targets := mgr.getCollectionTarget(scope, collectionID) for _, t := range targets { if channel, ok := t.dmChannels[channelName]; ok { @@ -457,16 +431,11 @@ func (mgr *TargetManager) GetSealedSegmentsByPartition(collectionID int64, partitionID int64, scope TargetScope, ) map[int64]*datapb.SegmentInfo { - mgr.rwMutex.RLock() - defer mgr.rwMutex.RUnlock() - targets := mgr.getCollectionTarget(scope, collectionID) for _, t := range targets { segments := make(map[int64]*datapb.SegmentInfo) - for _, s := range t.GetAllSegments() { - if s.GetPartitionID() == partitionID { - segments[s.GetID()] = s - } + for _, s := range t.GetPartitionSegments(partitionID) { + segments[s.GetID()] = s } if len(segments) > 0 { @@ -478,9 +447,6 @@ func (mgr *TargetManager) GetSealedSegmentsByPartition(collectionID int64, } func (mgr *TargetManager) GetDmChannelsByCollection(collectionID int64, scope TargetScope) map[string]*DmChannel { - mgr.rwMutex.RLock() - defer mgr.rwMutex.RUnlock() - targets := mgr.getCollectionTarget(scope, collectionID) for _, t := range targets { @@ -491,9 +457,6 @@ func (mgr *TargetManager) GetDmChannelsByCollection(collectionID int64, scope Ta } func (mgr *TargetManager) GetDmChannel(collectionID int64, channel string, scope TargetScope) *DmChannel { - mgr.rwMutex.RLock() - defer mgr.rwMutex.RUnlock() - targets := mgr.getCollectionTarget(scope, collectionID) for _, t := range targets { if ch, ok := t.GetAllDmChannels()[channel]; ok { @@ -504,9 +467,6 @@ func (mgr *TargetManager) GetDmChannel(collectionID int64, channel string, scope } func (mgr *TargetManager) GetSealedSegment(collectionID int64, id int64, scope TargetScope) *datapb.SegmentInfo { - mgr.rwMutex.RLock() - defer mgr.rwMutex.RUnlock() - targets := mgr.getCollectionTarget(scope, collectionID) for _, t := range targets { if s, ok := t.GetAllSegments()[id]; ok { @@ -518,9 +478,6 @@ func (mgr *TargetManager) GetSealedSegment(collectionID int64, id int64, scope T } func (mgr *TargetManager) GetCollectionTargetVersion(collectionID int64, scope TargetScope) int64 { - mgr.rwMutex.RLock() - defer mgr.rwMutex.RUnlock() - targets := mgr.getCollectionTarget(scope, collectionID) for _, t := range targets { if t.GetTargetVersion() > 0 { @@ -532,9 +489,6 @@ func (mgr *TargetManager) GetCollectionTargetVersion(collectionID int64, scope T } func (mgr *TargetManager) IsCurrentTargetExist(collectionID int64, partitionID int64) bool { - mgr.rwMutex.RLock() - defer mgr.rwMutex.RUnlock() - targets := mgr.getCollectionTarget(CurrentTarget, collectionID) return len(targets) > 0 && (targets[0].partitions.Contain(partitionID) || partitionID == common.AllPartitionsID) && len(targets[0].dmChannels) > 0 @@ -547,8 +501,6 @@ func (mgr *TargetManager) IsNextTargetExist(collectionID int64) bool { } func (mgr *TargetManager) SaveCurrentTarget(catalog metastore.QueryCoordCatalog) { - mgr.rwMutex.Lock() - defer mgr.rwMutex.Unlock() if mgr.current != nil { // use pool here to control maximal writer used by save target pool := conc.NewPool[any](runtime.GOMAXPROCS(0) * 2) @@ -572,13 +524,14 @@ func (mgr *TargetManager) SaveCurrentTarget(catalog metastore.QueryCoordCatalog) }) } tasks := make([]typeutil.Pair[int64, *querypb.CollectionTarget], 0, batchSize) - for id, target := range mgr.current.collectionTargetMap { + mgr.current.collectionTargetMap.Range(func(id int64, target *CollectionTarget) bool { tasks = append(tasks, typeutil.NewPair(id, target.toPbMsg())) if len(tasks) >= batchSize { submit(tasks) tasks = make([]typeutil.Pair[int64, *querypb.CollectionTarget], 0, batchSize) } - } + return true + }) if len(tasks) > 0 { submit(tasks) } @@ -587,9 +540,6 @@ func (mgr *TargetManager) SaveCurrentTarget(catalog metastore.QueryCoordCatalog) } func (mgr *TargetManager) Recover(catalog metastore.QueryCoordCatalog) error { - mgr.rwMutex.Lock() - defer mgr.rwMutex.Unlock() - targets, err := catalog.GetCollectionTargets() if err != nil { log.Warn("failed to recover collection target from etcd", zap.Error(err)) @@ -618,8 +568,6 @@ func (mgr *TargetManager) Recover(catalog metastore.QueryCoordCatalog) error { // if segment isn't l0 segment, and exist in current/next target, then it can be moved func (mgr *TargetManager) CanSegmentBeMoved(collectionID, segmentID int64) bool { - mgr.rwMutex.Lock() - defer mgr.rwMutex.Unlock() current := mgr.current.getCollectionTarget(collectionID) if current != nil && current.segments[segmentID] != nil && current.segments[segmentID].GetLevel() != datapb.SegmentLevel_L0 { return true @@ -634,9 +582,7 @@ func (mgr *TargetManager) CanSegmentBeMoved(collectionID, segmentID int64) bool } func (mgr *TargetManager) IsCurrentTargetReady(collectionID int64) bool { - mgr.rwMutex.RLock() - defer mgr.rwMutex.RUnlock() - target, ok := mgr.current.collectionTargetMap[collectionID] + target, ok := mgr.current.collectionTargetMap.Get(collectionID) if !ok { return false } diff --git a/internal/querycoordv2/meta/target_manager_test.go b/internal/querycoordv2/meta/target_manager_test.go index 9d45ac2a0c209..19a032abe5b95 100644 --- a/internal/querycoordv2/meta/target_manager_test.go +++ b/internal/querycoordv2/meta/target_manager_test.go @@ -412,33 +412,38 @@ func (suite *TargetManagerSuite) TestGetTarget() { current := &CollectionTarget{} next := &CollectionTarget{} + t1 := typeutil.NewConcurrentMap[int64, *CollectionTarget]() + t2 := typeutil.NewConcurrentMap[int64, *CollectionTarget]() + t3 := typeutil.NewConcurrentMap[int64, *CollectionTarget]() + t4 := typeutil.NewConcurrentMap[int64, *CollectionTarget]() + t1.Insert(1000, current) + t2.Insert(1000, next) + t3.Insert(1000, current) + t4.Insert(1000, current) + bothMgr := &TargetManager{ current: &target{ - collectionTargetMap: map[int64]*CollectionTarget{ - 1000: current, - }, + collectionTargetMap: t1, }, next: &target{ - collectionTargetMap: map[int64]*CollectionTarget{ - 1000: next, - }, + collectionTargetMap: t2, }, } currentMgr := &TargetManager{ current: &target{ - collectionTargetMap: map[int64]*CollectionTarget{ - 1000: current, - }, + collectionTargetMap: t3, + }, + next: &target{ + collectionTargetMap: typeutil.NewConcurrentMap[int64, *CollectionTarget](), }, - next: &target{}, } nextMgr := &TargetManager{ next: &target{ - collectionTargetMap: map[int64]*CollectionTarget{ - 1000: current, - }, + collectionTargetMap: t4, + }, + current: &target{ + collectionTargetMap: typeutil.NewConcurrentMap[int64, *CollectionTarget](), }, - current: &target{}, } cases := []testCase{ @@ -647,7 +652,7 @@ func BenchmarkTargetManager(b *testing.B) { collectionNum := 10000 for i := 0; i < collectionNum; i++ { - mgr.current.collectionTargetMap[int64(i)] = NewCollectionTarget(segments, channels, nil) + mgr.current.collectionTargetMap.Insert(int64(i), NewCollectionTarget(segments, channels, nil)) } b.ResetTimer() for i := 0; i < b.N; i++ { diff --git a/internal/querycoordv2/observers/collection_observer.go b/internal/querycoordv2/observers/collection_observer.go index d9c1e4340e424..fb0b3d3b18eb9 100644 --- a/internal/querycoordv2/observers/collection_observer.go +++ b/internal/querycoordv2/observers/collection_observer.go @@ -240,9 +240,13 @@ func (ob *CollectionObserver) readyToObserve(collectionID int64) bool { func (ob *CollectionObserver) observeLoadStatus(ctx context.Context) { loading := false + observeTaskNum := 0 + observeStart := time.Now() ob.loadTasks.Range(func(traceID string, task LoadTask) bool { loading = true + observeTaskNum++ + start := time.Now() collection := ob.meta.CollectionManager.GetCollection(task.CollectionID) if collection == nil { return true @@ -296,9 +300,14 @@ func (ob *CollectionObserver) observeLoadStatus(ctx context.Context) { ob.loadTasks.Remove(traceID) } + log.Info("observe collection done", zap.Int64("collectionID", task.CollectionID), zap.Duration("dur", time.Since(start))) return true }) + if observeTaskNum > 0 { + log.Info("observe all collections done", zap.Int("num", observeTaskNum), zap.Duration("dur", time.Since(observeStart))) + } + // trigger check logic when loading collections/partitions if loading { ob.checkerController.Check() @@ -325,11 +334,6 @@ func (ob *CollectionObserver) observeChannelStatus(ctx context.Context, collecti } func (ob *CollectionObserver) observePartitionLoadStatus(ctx context.Context, partition *meta.Partition, replicaNum int32, channelTargetNum, subChannelCount int) bool { - log := log.Ctx(ctx).WithRateGroup("qcv2.observePartitionLoadStatus", 1, 60).With( - zap.Int64("collectionID", partition.GetCollectionID()), - zap.Int64("partitionID", partition.GetPartitionID()), - ) - segmentTargets := ob.targetMgr.GetSealedSegmentsByPartition(partition.GetCollectionID(), partition.GetPartitionID(), meta.NextTarget) targetNum := len(segmentTargets) + channelTargetNum @@ -338,7 +342,9 @@ func (ob *CollectionObserver) observePartitionLoadStatus(ctx context.Context, pa return false } - log.RatedInfo(10, "partition targets", + log.Ctx(ctx).WithRateGroup("qcv2.observePartitionLoadStatus", 1, 60).RatedInfo(10, "partition targets", + zap.Int64("collectionID", partition.GetCollectionID()), + zap.Int64("partitionID", partition.GetPartitionID()), zap.Int("segmentTargetNum", len(segmentTargets)), zap.Int("channelTargetNum", channelTargetNum), zap.Int("totalTargetNum", targetNum), @@ -355,11 +361,6 @@ func (ob *CollectionObserver) observePartitionLoadStatus(ctx context.Context, pa group := utils.GroupNodesByReplica(ob.meta.ReplicaManager, partition.GetCollectionID(), nodes) loadedCount += len(group) } - if loadedCount > 0 { - log.Info("partition load progress", - zap.Int("subChannelCount", subChannelCount), - zap.Int("loadSegmentCount", loadedCount-subChannelCount)) - } loadPercentage = int32(loadedCount * 100 / (targetNum * int(replicaNum))) if loadedCount <= ob.partitionLoadedCount[partition.GetPartitionID()] && loadPercentage != 100 { @@ -370,30 +371,37 @@ func (ob *CollectionObserver) observePartitionLoadStatus(ctx context.Context, pa ob.partitionLoadedCount[partition.GetPartitionID()] = loadedCount if loadPercentage == 100 { if !ob.targetObserver.Check(ctx, partition.GetCollectionID(), partition.PartitionID) { - log.Warn("failed to manual check current target, skip update load status") + log.Ctx(ctx).Warn("failed to manual check current target, skip update load status", + zap.Int64("collectionID", partition.GetCollectionID()), + zap.Int64("partitionID", partition.GetPartitionID())) return false } delete(ob.partitionLoadedCount, partition.GetPartitionID()) } err := ob.meta.CollectionManager.UpdatePartitionLoadPercent(partition.PartitionID, loadPercentage) if err != nil { - log.Warn("failed to update partition load percentage") + log.Ctx(ctx).Warn("failed to update partition load percentage", + zap.Int64("collectionID", partition.GetCollectionID()), + zap.Int64("partitionID", partition.GetPartitionID())) } - log.Info("partition load status updated", + log.Ctx(ctx).Info("partition load status updated", + zap.Int64("collectionID", partition.GetCollectionID()), + zap.Int64("partitionID", partition.GetPartitionID()), zap.Int32("partitionLoadPercentage", loadPercentage), + zap.Int("subChannelCount", subChannelCount), + zap.Int("loadSegmentCount", loadedCount-subChannelCount), ) eventlog.Record(eventlog.NewRawEvt(eventlog.Level_Info, fmt.Sprintf("partition %d load percentage update: %d", partition.PartitionID, loadPercentage))) return true } func (ob *CollectionObserver) observeCollectionLoadStatus(ctx context.Context, collectionID int64) { - log := log.Ctx(ctx).With(zap.Int64("collectionID", collectionID)) - collectionPercentage, err := ob.meta.CollectionManager.UpdateCollectionLoadPercent(collectionID) if err != nil { - log.Warn("failed to update collection load percentage") + log.Ctx(ctx).Warn("failed to update collection load percentage", zap.Int64("collectionID", collectionID)) } - log.Info("collection load status updated", + log.Ctx(ctx).Info("collection load status updated", + zap.Int64("collectionID", collectionID), zap.Int32("collectionLoadPercentage", collectionPercentage), ) if collectionPercentage == 100 { diff --git a/internal/querycoordv2/task/action.go b/internal/querycoordv2/task/action.go index d68e0d38b7c60..31f4c80377465 100644 --- a/internal/querycoordv2/task/action.go +++ b/internal/querycoordv2/task/action.go @@ -24,7 +24,6 @@ import ( "github.com/milvus-io/milvus/internal/querycoordv2/meta" "github.com/milvus-io/milvus/pkg/proto/querypb" - "github.com/milvus-io/milvus/pkg/util/funcutil" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -115,49 +114,7 @@ func (action *SegmentAction) Scope() querypb.DataScope { } func (action *SegmentAction) IsFinished(distMgr *meta.DistributionManager) bool { - if action.Type() == ActionTypeGrow { - // rpc finished - if !action.rpcReturned.Load() { - return false - } - - // segment found in leader view - views := distMgr.LeaderViewManager.GetByFilter( - meta.WithChannelName2LeaderView(action.Shard()), - meta.WithSegment2LeaderView(action.segmentID, false)) - if len(views) == 0 { - return false - } - - // segment found in dist - segmentInTargetNode := distMgr.SegmentDistManager.GetByFilter(meta.WithNodeID(action.Node()), meta.WithSegmentID(action.SegmentID())) - return len(segmentInTargetNode) > 0 - } else if action.Type() == ActionTypeReduce { - // FIXME: Now shard leader's segment view is a map of segment ID to node ID, - // loading segment replaces the node ID with the new one, - // which confuses the condition of finishing, - // the leader should return a map of segment ID to list of nodes, - // now, we just always commit the release task to executor once. - // NOTE: DO NOT create a task containing release action and the action is not the last action - sealed := distMgr.SegmentDistManager.GetByFilter(meta.WithNodeID(action.Node())) - views := distMgr.LeaderViewManager.GetByFilter(meta.WithNodeID2LeaderView(action.Node())) - growing := lo.FlatMap(views, func(view *meta.LeaderView, _ int) []int64 { - return lo.Keys(view.GrowingSegments) - }) - segments := make([]int64, 0, len(sealed)+len(growing)) - for _, segment := range sealed { - segments = append(segments, segment.GetID()) - } - segments = append(segments, growing...) - if !funcutil.SliceContain(segments, action.SegmentID()) { - return true - } - return action.rpcReturned.Load() - } else if action.Type() == ActionTypeUpdate { - return action.rpcReturned.Load() - } - - return true + return action.rpcReturned.Load() } func (action *SegmentAction) String() string { diff --git a/internal/querycoordv2/task/scheduler.go b/internal/querycoordv2/task/scheduler.go index 8fb604dd9d635..3332153a81328 100644 --- a/internal/querycoordv2/task/scheduler.go +++ b/internal/querycoordv2/task/scheduler.go @@ -36,7 +36,9 @@ import ( "github.com/milvus-io/milvus/pkg/proto/querypb" "github.com/milvus-io/milvus/pkg/util/funcutil" "github.com/milvus-io/milvus/pkg/util/hardware" + "github.com/milvus-io/milvus/pkg/util/lock" "github.com/milvus-io/milvus/pkg/util/merr" + "github.com/milvus-io/milvus/pkg/util/timerecord" . "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -89,6 +91,7 @@ type replicaChannelIndex struct { } type taskQueue struct { + mu sync.RWMutex // TaskPriority -> TaskID -> Task buckets []map[int64]Task } @@ -104,6 +107,8 @@ func newTaskQueue() *taskQueue { } func (queue *taskQueue) Len() int { + queue.mu.RLock() + defer queue.mu.RUnlock() taskNum := 0 for _, tasks := range queue.buckets { taskNum += len(tasks) @@ -113,17 +118,23 @@ func (queue *taskQueue) Len() int { } func (queue *taskQueue) Add(task Task) { + queue.mu.Lock() + defer queue.mu.Unlock() bucket := queue.buckets[task.Priority()] bucket[task.ID()] = task } func (queue *taskQueue) Remove(task Task) { + queue.mu.Lock() + defer queue.mu.Unlock() bucket := queue.buckets[task.Priority()] delete(bucket, task.ID()) } // Range iterates all tasks in the queue ordered by priority from high to low func (queue *taskQueue) Range(fn func(task Task) bool) { + queue.mu.RLock() + defer queue.mu.RUnlock() for priority := len(queue.buckets) - 1; priority >= 0; priority-- { for _, task := range queue.buckets[priority] { if !fn(task) { @@ -150,9 +161,8 @@ type Scheduler interface { } type taskScheduler struct { - rwmutex sync.RWMutex ctx context.Context - executors map[int64]*Executor // NodeID -> Executor + executors *ConcurrentMap[int64, *Executor] // NodeID -> Executor idAllocator func() UniqueID distMgr *meta.DistributionManager @@ -162,9 +172,11 @@ type taskScheduler struct { cluster session.Cluster nodeMgr *session.NodeManager - tasks UniqueSet - segmentTasks map[replicaSegmentIndex]Task - channelTasks map[replicaChannelIndex]Task + scheduleMu sync.Mutex // guards schedule() + collKeyLock *lock.KeyLock[int64] // guards Add() + tasks *ConcurrentMap[UniqueID, struct{}] + segmentTasks *ConcurrentMap[replicaSegmentIndex, Task] + channelTasks *ConcurrentMap[replicaChannelIndex, Task] processQueue *taskQueue waitQueue *taskQueue } @@ -180,7 +192,7 @@ func NewScheduler(ctx context.Context, id := time.Now().UnixMilli() return &taskScheduler{ ctx: ctx, - executors: make(map[int64]*Executor), + executors: NewConcurrentMap[int64, *Executor](), idAllocator: func() UniqueID { id++ return id @@ -193,9 +205,10 @@ func NewScheduler(ctx context.Context, cluster: cluster, nodeMgr: nodeMgr, - tasks: make(UniqueSet), - segmentTasks: make(map[replicaSegmentIndex]Task), - channelTasks: make(map[replicaChannelIndex]Task), + collKeyLock: lock.NewKeyLock[int64](), + tasks: NewConcurrentMap[UniqueID, struct{}](), + segmentTasks: NewConcurrentMap[replicaSegmentIndex, Task](), + channelTasks: NewConcurrentMap[replicaChannelIndex, Task](), processQueue: newTaskQueue(), waitQueue: newTaskQueue(), } @@ -204,30 +217,22 @@ func NewScheduler(ctx context.Context, func (scheduler *taskScheduler) Start() {} func (scheduler *taskScheduler) Stop() { - scheduler.rwmutex.Lock() - defer scheduler.rwmutex.Unlock() - - for nodeID, executor := range scheduler.executors { + scheduler.executors.Range(func(nodeID int64, executor *Executor) bool { executor.Stop() - delete(scheduler.executors, nodeID) - } + return true + }) - for _, task := range scheduler.segmentTasks { + scheduler.segmentTasks.Range(func(_ replicaSegmentIndex, task Task) bool { scheduler.remove(task) - } - for _, task := range scheduler.channelTasks { + return true + }) + scheduler.channelTasks.Range(func(_ replicaChannelIndex, task Task) bool { scheduler.remove(task) - } + return true + }) } func (scheduler *taskScheduler) AddExecutor(nodeID int64) { - scheduler.rwmutex.Lock() - defer scheduler.rwmutex.Unlock() - - if _, exist := scheduler.executors[nodeID]; exist { - return - } - executor := NewExecutor(scheduler.meta, scheduler.distMgr, scheduler.broker, @@ -235,27 +240,24 @@ func (scheduler *taskScheduler) AddExecutor(nodeID int64) { scheduler.cluster, scheduler.nodeMgr) - scheduler.executors[nodeID] = executor + if _, exist := scheduler.executors.GetOrInsert(nodeID, executor); exist { + return + } executor.Start(scheduler.ctx) log.Info("add executor for new QueryNode", zap.Int64("nodeID", nodeID)) } func (scheduler *taskScheduler) RemoveExecutor(nodeID int64) { - scheduler.rwmutex.Lock() - defer scheduler.rwmutex.Unlock() - - executor, ok := scheduler.executors[nodeID] + executor, ok := scheduler.executors.GetAndRemove(nodeID) if ok { executor.Stop() - delete(scheduler.executors, nodeID) - log.Info("remove executor of offline QueryNode", zap.Int64("nodeID", nodeID)) + log.Ctx(scheduler.ctx).Info("remove executor of offline QueryNode", zap.Int64("nodeID", nodeID)) } } func (scheduler *taskScheduler) Add(task Task) error { - scheduler.rwmutex.Lock() - defer scheduler.rwmutex.Unlock() - + scheduler.collKeyLock.Lock(task.CollectionID()) + defer scheduler.collKeyLock.Unlock(task.CollectionID()) err := scheduler.preAdd(task) if err != nil { task.Cancel(err) @@ -264,19 +266,19 @@ func (scheduler *taskScheduler) Add(task Task) error { task.SetID(scheduler.idAllocator()) scheduler.waitQueue.Add(task) - scheduler.tasks.Insert(task.ID()) + scheduler.tasks.Insert(task.ID(), struct{}{}) switch task := task.(type) { case *SegmentTask: index := NewReplicaSegmentIndex(task) - scheduler.segmentTasks[index] = task + scheduler.segmentTasks.Insert(index, task) case *ChannelTask: index := replicaChannelIndex{task.ReplicaID(), task.Channel()} - scheduler.channelTasks[index] = task + scheduler.channelTasks.Insert(index, task) case *LeaderTask: index := NewReplicaLeaderIndex(task) - scheduler.segmentTasks[index] = task + scheduler.segmentTasks.Insert(index, task) } scheduler.updateTaskMetrics() @@ -286,21 +288,39 @@ func (scheduler *taskScheduler) Add(task Task) error { } func (scheduler *taskScheduler) updateTaskMetrics() { - segmentGrowNum, segmentReduceNum, segmentMoveNum := 0, 0, 0 + segmentGrowNum, segmentReduceNum, segmentUpdateNum, segmentMoveNum := 0, 0, 0, 0 + leaderGrowNum, leaderReduceNum, leaderUpdateNum := 0, 0, 0 channelGrowNum, channelReduceNum, channelMoveNum := 0, 0, 0 - for _, task := range scheduler.segmentTasks { - taskType := GetTaskType(task) - switch taskType { - case TaskTypeGrow: - segmentGrowNum++ - case TaskTypeReduce: - segmentReduceNum++ - case TaskTypeMove: + scheduler.segmentTasks.Range(func(_ replicaSegmentIndex, task Task) bool { + switch { + case len(task.Actions()) > 1: segmentMoveNum++ + case task.Actions()[0].Type() == ActionTypeGrow: + if _, ok := task.Actions()[0].(*SegmentAction); ok { + segmentGrowNum++ + } + if _, ok := task.Actions()[0].(*LeaderAction); ok { + leaderGrowNum++ + } + case task.Actions()[0].Type() == ActionTypeReduce: + if _, ok := task.Actions()[0].(*SegmentAction); ok { + segmentReduceNum++ + } + if _, ok := task.Actions()[0].(*LeaderAction); ok { + leaderReduceNum++ + } + case task.Actions()[0].Type() == ActionTypeUpdate: + if _, ok := task.Actions()[0].(*SegmentAction); ok { + segmentUpdateNum++ + } + if _, ok := task.Actions()[0].(*LeaderAction); ok { + leaderUpdateNum++ + } } - } + return true + }) - for _, task := range scheduler.channelTasks { + scheduler.channelTasks.Range(func(_ replicaChannelIndex, task Task) bool { taskType := GetTaskType(task) switch taskType { case TaskTypeGrow: @@ -310,11 +330,18 @@ func (scheduler *taskScheduler) updateTaskMetrics() { case TaskTypeMove: channelMoveNum++ } - } + return true + }) metrics.QueryCoordTaskNum.WithLabelValues(metrics.SegmentGrowTaskLabel).Set(float64(segmentGrowNum)) metrics.QueryCoordTaskNum.WithLabelValues(metrics.SegmentReduceTaskLabel).Set(float64(segmentReduceNum)) metrics.QueryCoordTaskNum.WithLabelValues(metrics.SegmentMoveTaskLabel).Set(float64(segmentMoveNum)) + metrics.QueryCoordTaskNum.WithLabelValues(metrics.SegmentUpdateTaskLabel).Set(float64(segmentUpdateNum)) + + metrics.QueryCoordTaskNum.WithLabelValues(metrics.LeaderGrowTaskLabel).Set(float64(leaderGrowNum)) + metrics.QueryCoordTaskNum.WithLabelValues(metrics.LeaderReduceTaskLabel).Set(float64(leaderReduceNum)) + metrics.QueryCoordTaskNum.WithLabelValues(metrics.LeaderUpdateTaskLabel).Set(float64(leaderUpdateNum)) + metrics.QueryCoordTaskNum.WithLabelValues(metrics.ChannelGrowTaskLabel).Set(float64(channelGrowNum)) metrics.QueryCoordTaskNum.WithLabelValues(metrics.ChannelReduceTaskLabel).Set(float64(channelReduceNum)) metrics.QueryCoordTaskNum.WithLabelValues(metrics.ChannelMoveTaskLabel).Set(float64(channelMoveNum)) @@ -326,7 +353,7 @@ func (scheduler *taskScheduler) preAdd(task Task) error { switch task := task.(type) { case *SegmentTask: index := NewReplicaSegmentIndex(task) - if old, ok := scheduler.segmentTasks[index]; ok { + if old, ok := scheduler.segmentTasks.Get(index); ok { if task.Priority() > old.Priority() { log.Info("replace old task, the new one with higher priority", zap.Int64("oldID", old.ID()), @@ -359,7 +386,7 @@ func (scheduler *taskScheduler) preAdd(task Task) error { case *ChannelTask: index := replicaChannelIndex{task.ReplicaID(), task.Channel()} - if old, ok := scheduler.channelTasks[index]; ok { + if old, ok := scheduler.channelTasks.Get(index); ok { if task.Priority() > old.Priority() { log.Info("replace old task, the new one with higher priority", zap.Int64("oldID", old.ID()), @@ -392,7 +419,7 @@ func (scheduler *taskScheduler) preAdd(task Task) error { } case *LeaderTask: index := NewReplicaLeaderIndex(task) - if old, ok := scheduler.segmentTasks[index]; ok { + if old, ok := scheduler.segmentTasks.Get(index); ok { if task.Priority() > old.Priority() { log.Info("replace old task, the new one with higher priority", zap.Int64("oldID", old.ID()), @@ -471,46 +498,42 @@ func (scheduler *taskScheduler) Dispatch(node int64) { log.Info("scheduler stopped") default: - scheduler.rwmutex.Lock() - defer scheduler.rwmutex.Unlock() + scheduler.scheduleMu.Lock() + defer scheduler.scheduleMu.Unlock() scheduler.schedule(node) } } func (scheduler *taskScheduler) GetSegmentTaskDelta(nodeID, collectionID int64) int { - scheduler.rwmutex.RLock() - defer scheduler.rwmutex.RUnlock() - targetActions := make(map[int64][]Action) - for _, task := range scheduler.segmentTasks { // Map key: replicaSegmentIndex + scheduler.segmentTasks.Range(func(_ replicaSegmentIndex, task Task) bool { taskCollID := task.CollectionID() if collectionID != -1 && collectionID != taskCollID { - continue + return true } actions := filterActions(task.Actions(), nodeID) if len(actions) > 0 { targetActions[taskCollID] = append(targetActions[taskCollID], actions...) } - } + return true + }) return scheduler.calculateTaskDelta(targetActions) } func (scheduler *taskScheduler) GetChannelTaskDelta(nodeID, collectionID int64) int { - scheduler.rwmutex.RLock() - defer scheduler.rwmutex.RUnlock() - targetActions := make(map[int64][]Action) - for _, task := range scheduler.channelTasks { // Map key: replicaChannelIndex + scheduler.channelTasks.Range(func(_ replicaChannelIndex, task Task) bool { taskCollID := task.CollectionID() if collectionID != -1 && collectionID != taskCollID { - continue + return true } actions := filterActions(task.Actions(), nodeID) if len(actions) > 0 { targetActions[taskCollID] = append(targetActions[taskCollID], actions...) } - } + return true + }) return scheduler.calculateTaskDelta(targetActions) } @@ -555,10 +578,7 @@ func (scheduler *taskScheduler) calculateTaskDelta(targetActions map[int64][]Act } func (scheduler *taskScheduler) GetExecutedFlag(nodeID int64) <-chan struct{} { - scheduler.rwmutex.RLock() - defer scheduler.rwmutex.RUnlock() - - executor, ok := scheduler.executors[nodeID] + executor, ok := scheduler.executors.Get(nodeID) if !ok { return nil } @@ -581,16 +601,13 @@ func WithTaskTypeFilter(taskType Type) TaskFilter { } func (scheduler *taskScheduler) GetChannelTaskNum(filters ...TaskFilter) int { - scheduler.rwmutex.RLock() - defer scheduler.rwmutex.RUnlock() - if len(filters) == 0 { - return len(scheduler.channelTasks) + return scheduler.channelTasks.Len() } // rewrite this with for loop counter := 0 - for _, task := range scheduler.channelTasks { + scheduler.channelTasks.Range(func(_ replicaChannelIndex, task Task) bool { allMatch := true for _, filter := range filters { if !filter(task) { @@ -601,21 +618,19 @@ func (scheduler *taskScheduler) GetChannelTaskNum(filters ...TaskFilter) int { if allMatch { counter++ } - } + return true + }) return counter } func (scheduler *taskScheduler) GetSegmentTaskNum(filters ...TaskFilter) int { - scheduler.rwmutex.RLock() - defer scheduler.rwmutex.RUnlock() - if len(filters) == 0 { - return len(scheduler.segmentTasks) + scheduler.segmentTasks.Len() } // rewrite this with for loop counter := 0 - for _, task := range scheduler.segmentTasks { + scheduler.segmentTasks.Range(func(_ replicaSegmentIndex, task Task) bool { allMatch := true for _, filter := range filters { if !filter(task) { @@ -626,7 +641,8 @@ func (scheduler *taskScheduler) GetSegmentTaskNum(filters ...TaskFilter) int { if allMatch { counter++ } - } + return true + }) return counter } @@ -639,17 +655,19 @@ func (scheduler *taskScheduler) schedule(node int64) { return } - log := log.With( + tr := timerecord.NewTimeRecorder("") + log := log.Ctx(scheduler.ctx).With( zap.Int64("nodeID", node), ) scheduler.tryPromoteAll() + promoteDur := tr.RecordSpan() log.Debug("process tasks related to node", zap.Int("processingTaskNum", scheduler.processQueue.Len()), zap.Int("waitingTaskNum", scheduler.waitQueue.Len()), - zap.Int("segmentTaskNum", len(scheduler.segmentTasks)), - zap.Int("channelTaskNum", len(scheduler.channelTasks)), + zap.Int("segmentTaskNum", scheduler.segmentTasks.Len()), + zap.Int("channelTaskNum", scheduler.channelTasks.Len()), ) // Process tasks @@ -665,6 +683,7 @@ func (scheduler *taskScheduler) schedule(node int64) { return true }) + preprocessDur := tr.RecordSpan() // The scheduler doesn't limit the number of tasks, // to commit tasks to executors as soon as possible, to reach higher merge possibility @@ -675,22 +694,29 @@ func (scheduler *taskScheduler) schedule(node int64) { } return nil }, "process") + processDur := tr.RecordSpan() for _, task := range toRemove { scheduler.remove(task) } + scheduler.updateTaskMetrics() + log.Info("processed tasks", zap.Int("toProcessNum", len(toProcess)), zap.Int32("committedNum", commmittedNum.Load()), zap.Int("toRemoveNum", len(toRemove)), + zap.Duration("promoteDur", promoteDur), + zap.Duration("preprocessDUr", preprocessDur), + zap.Duration("processDUr", processDur), + zap.Duration("totalDur", tr.ElapseSpan()), ) log.Info("process tasks related to node done", zap.Int("processingTaskNum", scheduler.processQueue.Len()), zap.Int("waitingTaskNum", scheduler.waitQueue.Len()), - zap.Int("segmentTaskNum", len(scheduler.segmentTasks)), - zap.Int("channelTaskNum", len(scheduler.channelTasks)), + zap.Int("segmentTaskNum", scheduler.segmentTasks.Len()), + zap.Int("channelTaskNum", scheduler.channelTasks.Len()), ) } @@ -731,10 +757,6 @@ func (scheduler *taskScheduler) isRelated(task Task, node int64) bool { // return true if the task should be executed, // false otherwise func (scheduler *taskScheduler) preProcess(task Task) bool { - log := log.Ctx(scheduler.ctx).WithRateGroup("qcv2.taskScheduler", 1, 60).With( - zap.Int64("collectionID", task.CollectionID()), - zap.Int64("taskID", task.ID()), - ) if task.Status() != TaskStatusStarted { return false } @@ -757,7 +779,9 @@ func (scheduler *taskScheduler) preProcess(task Task) bool { } if !ready { - log.RatedInfo(30, "Blocking reduce action in balance channel task") + log.Ctx(scheduler.ctx).WithRateGroup("qcv2.taskScheduler", 1, 60).RatedInfo(30, "Blocking reduce action in balance channel task", + zap.Int64("collectionID", task.CollectionID()), + zap.Int64("taskID", task.ID())) break } } @@ -788,7 +812,7 @@ func (scheduler *taskScheduler) process(task Task) bool { ) actions, step := task.Actions(), task.Step() - executor, ok := scheduler.executors[actions[step].Node()] + executor, ok := scheduler.executors.Get(actions[step].Node()) if !ok { log.Warn("no executor for QueryNode", zap.Int("step", step), @@ -809,19 +833,18 @@ func (scheduler *taskScheduler) check(task Task) error { } func (scheduler *taskScheduler) RemoveByNode(node int64) { - scheduler.rwmutex.Lock() - defer scheduler.rwmutex.Unlock() - - for _, task := range scheduler.segmentTasks { + scheduler.segmentTasks.Range(func(_ replicaSegmentIndex, task Task) bool { if scheduler.isRelated(task, node) { scheduler.remove(task) } - } - for _, task := range scheduler.channelTasks { + return true + }) + scheduler.channelTasks.Range(func(_ replicaChannelIndex, task Task) bool { if scheduler.isRelated(task, node) { scheduler.remove(task) } - } + return true + }) } func (scheduler *taskScheduler) recordSegmentTaskError(task *SegmentTask) { @@ -857,7 +880,7 @@ func (scheduler *taskScheduler) remove(task Task) { switch task := task.(type) { case *SegmentTask: index := NewReplicaSegmentIndex(task) - delete(scheduler.segmentTasks, index) + scheduler.segmentTasks.Remove(index) log = log.With(zap.Int64("segmentID", task.SegmentID())) if task.Status() == TaskStatusFailed && task.Err() != nil && @@ -867,16 +890,15 @@ func (scheduler *taskScheduler) remove(task Task) { case *ChannelTask: index := replicaChannelIndex{task.ReplicaID(), task.Channel()} - delete(scheduler.channelTasks, index) + scheduler.channelTasks.Remove(index) log = log.With(zap.String("channel", task.Channel())) case *LeaderTask: index := NewReplicaLeaderIndex(task) - delete(scheduler.segmentTasks, index) + scheduler.segmentTasks.Remove(index) log = log.With(zap.Int64("segmentID", task.SegmentID())) } - scheduler.updateTaskMetrics() log.Info("task removed") if scheduler.meta.Exist(task.CollectionID()) { @@ -922,14 +944,18 @@ func (scheduler *taskScheduler) getTaskMetricsLabel(task Task) string { return metrics.UnknownTaskLabel } -func (scheduler *taskScheduler) checkStale(task Task) error { - log := log.With( +func WrapTaskLog(task Task, fields ...zap.Field) []zap.Field { + res := []zap.Field{ zap.Int64("taskID", task.ID()), zap.Int64("collectionID", task.CollectionID()), zap.Int64("replicaID", task.ReplicaID()), zap.String("source", task.Source().String()), - ) + } + res = append(res, fields...) + return res +} +func (scheduler *taskScheduler) checkStale(task Task) error { switch task := task.(type) { case *SegmentTask: if err := scheduler.checkSegmentTaskStale(task); err != nil { @@ -956,7 +982,9 @@ func (scheduler *taskScheduler) checkStale(task Task) error { zap.Int("step", step)) if scheduler.nodeMgr.Get(action.Node()) == nil { - log.Warn("the task is stale, the target node is offline") + log.Warn("the task is stale, the target node is offline", WrapTaskLog(task, + zap.Int64("nodeID", action.Node()), + zap.Int("step", step))...) return merr.WrapErrNodeNotFound(action.Node()) } } @@ -965,38 +993,30 @@ func (scheduler *taskScheduler) checkStale(task Task) error { } func (scheduler *taskScheduler) checkSegmentTaskStale(task *SegmentTask) error { - log := log.With( - zap.Int64("taskID", task.ID()), - zap.Int64("collectionID", task.CollectionID()), - zap.Int64("replicaID", task.ReplicaID()), - zap.String("source", task.Source().String()), - ) - for _, action := range task.Actions() { switch action.Type() { case ActionTypeGrow: if ok, _ := scheduler.nodeMgr.IsStoppingNode(action.Node()); ok { - log.Warn("task stale due to node offline", zap.Int64("segment", task.segmentID)) + log.Ctx(task.Context()).Warn("task stale due to node offline", WrapTaskLog(task, zap.Int64("segment", task.segmentID))...) return merr.WrapErrNodeOffline(action.Node()) } taskType := GetTaskType(task) segment := scheduler.targetMgr.GetSealedSegment(task.CollectionID(), task.SegmentID(), meta.CurrentTargetFirst) if segment == nil { - log.Warn("task stale due to the segment to load not exists in targets", - zap.Int64("segment", task.segmentID), - zap.String("taskType", taskType.String()), - ) + log.Ctx(task.Context()).Warn("task stale due to the segment to load not exists in targets", + WrapTaskLog(task, zap.Int64("segment", task.segmentID), + zap.String("taskType", taskType.String()))...) return merr.WrapErrSegmentReduplicate(task.SegmentID(), "target doesn't contain this segment") } replica := scheduler.meta.ReplicaManager.GetByCollectionAndNode(task.CollectionID(), action.Node()) if replica == nil { - log.Warn("task stale due to replica not found") + log.Ctx(task.Context()).Warn("task stale due to replica not found", WrapTaskLog(task)...) return merr.WrapErrReplicaNotFound(task.CollectionID(), "by collectionID") } _, ok := scheduler.distMgr.GetShardLeader(replica, segment.GetInsertChannel()) if !ok { - log.Warn("task stale due to leader not found") + log.Ctx(task.Context()).Warn("task stale due to leader not found", WrapTaskLog(task)...) return merr.WrapErrChannelNotFound(segment.GetInsertChannel(), "failed to get shard delegator") } @@ -1008,23 +1028,16 @@ func (scheduler *taskScheduler) checkSegmentTaskStale(task *SegmentTask) error { } func (scheduler *taskScheduler) checkChannelTaskStale(task *ChannelTask) error { - log := log.With( - zap.Int64("taskID", task.ID()), - zap.Int64("collectionID", task.CollectionID()), - zap.Int64("replicaID", task.ReplicaID()), - zap.String("source", task.Source().String()), - ) - for _, action := range task.Actions() { switch action.Type() { case ActionTypeGrow: if ok, _ := scheduler.nodeMgr.IsStoppingNode(action.Node()); ok { - log.Warn("task stale due to node offline", zap.String("channel", task.Channel())) + log.Ctx(task.Context()).Warn("task stale due to node offline", WrapTaskLog(task, zap.String("channel", task.Channel()))...) return merr.WrapErrNodeOffline(action.Node()) } if scheduler.targetMgr.GetDmChannel(task.collectionID, task.Channel(), meta.NextTargetFirst) == nil { - log.Warn("the task is stale, the channel to subscribe not exists in targets", - zap.String("channel", task.Channel())) + log.Ctx(task.Context()).Warn("the task is stale, the channel to subscribe not exists in targets", + WrapTaskLog(task, zap.String("channel", task.Channel()))...) return merr.WrapErrChannelReduplicate(task.Channel(), "target doesn't contain this channel") } @@ -1036,48 +1049,41 @@ func (scheduler *taskScheduler) checkChannelTaskStale(task *ChannelTask) error { } func (scheduler *taskScheduler) checkLeaderTaskStale(task *LeaderTask) error { - log := log.With( - zap.Int64("taskID", task.ID()), - zap.Int64("collectionID", task.CollectionID()), - zap.Int64("replicaID", task.ReplicaID()), - zap.String("source", task.Source().String()), - zap.Int64("leaderID", task.leaderID), - ) - for _, action := range task.Actions() { switch action.Type() { case ActionTypeGrow: if ok, _ := scheduler.nodeMgr.IsStoppingNode(action.(*LeaderAction).GetLeaderID()); ok { - log.Warn("task stale due to node offline", zap.Int64("segment", task.segmentID)) + log.Ctx(task.Context()).Warn("task stale due to node offline", + WrapTaskLog(task, zap.Int64("leaderID", task.leaderID), zap.Int64("segment", task.segmentID))...) return merr.WrapErrNodeOffline(action.Node()) } taskType := GetTaskType(task) segment := scheduler.targetMgr.GetSealedSegment(task.CollectionID(), task.SegmentID(), meta.CurrentTargetFirst) if segment == nil { - log.Warn("task stale due to the segment to load not exists in targets", - zap.Int64("segment", task.segmentID), - zap.String("taskType", taskType.String()), - ) + log.Ctx(task.Context()).Warn("task stale due to the segment to load not exists in targets", + WrapTaskLog(task, zap.Int64("leaderID", task.leaderID), + zap.Int64("segment", task.segmentID), + zap.String("taskType", taskType.String()))...) return merr.WrapErrSegmentReduplicate(task.SegmentID(), "target doesn't contain this segment") } replica := scheduler.meta.ReplicaManager.GetByCollectionAndNode(task.CollectionID(), action.Node()) if replica == nil { - log.Warn("task stale due to replica not found") + log.Ctx(task.Context()).Warn("task stale due to replica not found", WrapTaskLog(task, zap.Int64("leaderID", task.leaderID))...) return merr.WrapErrReplicaNotFound(task.CollectionID(), "by collectionID") } view := scheduler.distMgr.GetLeaderShardView(task.leaderID, task.Shard()) if view == nil { - log.Warn("task stale due to leader not found") + log.Ctx(task.Context()).Warn("task stale due to leader not found", WrapTaskLog(task, zap.Int64("leaderID", task.leaderID))...) return merr.WrapErrChannelNotFound(task.Shard(), "failed to get shard delegator") } case ActionTypeReduce: view := scheduler.distMgr.GetLeaderShardView(task.leaderID, task.Shard()) if view == nil { - log.Warn("task stale due to leader not found") + log.Ctx(task.Context()).Warn("task stale due to leader not found", WrapTaskLog(task, zap.Int64("leaderID", task.leaderID))...) return merr.WrapErrChannelNotFound(task.Shard(), "failed to get shard delegator") } } diff --git a/internal/querycoordv2/task/task.go b/internal/querycoordv2/task/task.go index abfa538b7269f..6b940550751b8 100644 --- a/internal/querycoordv2/task/task.go +++ b/internal/querycoordv2/task/task.go @@ -125,12 +125,14 @@ type baseTask struct { span trace.Span // startTs - startTs time.Time + startTs atomic.Time } func newBaseTask(ctx context.Context, source Source, collectionID typeutil.UniqueID, replica *meta.Replica, shard string, taskTag string) *baseTask { ctx, cancel := context.WithCancel(ctx) ctx, span := otel.Tracer(typeutil.QueryCoordRole).Start(ctx, taskTag) + startTs := atomic.Time{} + startTs.Store(time.Now()) return &baseTask{ source: source, @@ -145,7 +147,7 @@ func newBaseTask(ctx context.Context, source Source, collectionID typeutil.Uniqu doneCh: make(chan struct{}), canceled: atomic.NewBool(false), span: span, - startTs: time.Now(), + startTs: startTs, } } @@ -208,11 +210,11 @@ func (task *baseTask) Index() string { } func (task *baseTask) RecordStartTs() { - task.startTs = time.Now() + task.startTs.Store(time.Now()) } func (task *baseTask) GetTaskLatency() int64 { - return time.Since(task.startTs).Milliseconds() + return time.Since(task.startTs.Load()).Milliseconds() } func (task *baseTask) Err() error { diff --git a/internal/querycoordv2/task/task_test.go b/internal/querycoordv2/task/task_test.go index 83aeafb7120ec..3fddedf71691d 100644 --- a/internal/querycoordv2/task/task_test.go +++ b/internal/querycoordv2/task/task_test.go @@ -765,26 +765,14 @@ func (suite *TaskSuite) TestReleaseGrowingSegmentTask() { suite.NoError(err) } - growings := map[int64]*meta.Segment{} - for _, segment := range suite.releaseSegments[1:] { - growings[segment] = utils.CreateTestSegment(suite.collection, 1, segment, targetNode, 1, "") - } - suite.dist.LeaderViewManager.Update(targetNode, &meta.LeaderView{ - ID: targetNode, - GrowingSegments: growings, - }) - segmentsNum := len(suite.releaseSegments) suite.AssertTaskNum(0, segmentsNum, 0, segmentsNum) - // Process tasks + // Process tasks and Release done suite.dispatchAndWait(targetNode) - suite.AssertTaskNum(segmentsNum-1, 0, 0, segmentsNum-1) - - // Release done - suite.dist.LeaderViewManager.Update(targetNode) + suite.AssertTaskNum(segmentsNum, 0, 0, segmentsNum) - // Process tasks done + // Tasks removed suite.dispatchAndWait(targetNode) suite.AssertTaskNum(0, 0, 0, 0) @@ -1084,7 +1072,7 @@ func (suite *TaskSuite) TestSegmentTaskStale() { CollectionID: suite.collection, }, }, nil) - for _, segment := range suite.loadSegments { + for _, segment := range suite.loadSegments[1:] { suite.broker.EXPECT().GetSegmentInfo(mock.Anything, segment).Return([]*datapb.SegmentInfo{ { ID: segment, @@ -1105,13 +1093,7 @@ func (suite *TaskSuite) TestSegmentTaskStale() { })) suite.dist.LeaderViewManager.Update(targetNode, utils.CreateTestLeaderView(targetNode, suite.collection, channel.ChannelName, map[int64]int64{}, map[int64]*meta.Segment{})) tasks := []Task{} - segments := make([]*datapb.SegmentInfo, 0) for _, segment := range suite.loadSegments { - segments = append(segments, &datapb.SegmentInfo{ - ID: segment, - PartitionID: 1, - InsertChannel: channel.GetChannelName(), - }) task, err := NewSegmentTask( ctx, timeout, @@ -1125,33 +1107,8 @@ func (suite *TaskSuite) TestSegmentTaskStale() { err = suite.scheduler.Add(task) suite.NoError(err) } - suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, suite.collection).Return([]*datapb.VchannelInfo{channel}, segments, nil) - suite.meta.CollectionManager.PutPartition(utils.CreateTestPartition(suite.collection, partition)) - suite.target.UpdateCollectionNextTarget(suite.collection) - segmentsNum := len(suite.loadSegments) - suite.AssertTaskNum(0, segmentsNum, 0, segmentsNum) - // Process tasks - suite.dispatchAndWait(targetNode) - suite.AssertTaskNum(segmentsNum, 0, 0, segmentsNum) - - // Process tasks done - // Dist contains channels, first task stale - view := &meta.LeaderView{ - ID: targetNode, - CollectionID: suite.collection, - Segments: map[int64]*querypb.SegmentDist{}, - Channel: channel.ChannelName, - } - for _, segment := range suite.loadSegments[1:] { - view.Segments[segment] = &querypb.SegmentDist{NodeID: targetNode, Version: 0} - } - distSegments := lo.Map(segments, func(info *datapb.SegmentInfo, _ int) *meta.Segment { - return meta.SegmentFromInfo(info) - }) - suite.dist.LeaderViewManager.Update(targetNode, view) - suite.dist.SegmentDistManager.Update(targetNode, distSegments...) - segments = make([]*datapb.SegmentInfo, 0) + segments := make([]*datapb.SegmentInfo, 0) for _, segment := range suite.loadSegments[1:] { segments = append(segments, &datapb.SegmentInfo{ ID: segment, @@ -1159,13 +1116,16 @@ func (suite *TaskSuite) TestSegmentTaskStale() { InsertChannel: channel.GetChannelName(), }) } - bakExpectations := suite.broker.ExpectedCalls - suite.broker.AssertExpectations(suite.T()) - suite.broker.ExpectedCalls = suite.broker.ExpectedCalls[:0] suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, suite.collection).Return([]*datapb.VchannelInfo{channel}, segments, nil) suite.meta.CollectionManager.PutPartition(utils.CreateTestPartition(suite.collection, 2)) suite.target.UpdateCollectionNextTarget(suite.collection) + + // process done + suite.dispatchAndWait(targetNode) + suite.AssertTaskNum(1, 0, 0, 1) + + // task removed suite.dispatchAndWait(targetNode) suite.AssertTaskNum(0, 0, 0, 0) @@ -1178,7 +1138,6 @@ func (suite *TaskSuite) TestSegmentTaskStale() { suite.NoError(task.Err()) } } - suite.broker.ExpectedCalls = bakExpectations } func (suite *TaskSuite) TestChannelTaskReplace() { @@ -1491,10 +1450,10 @@ func (suite *TaskSuite) AssertTaskNum(process, wait, channel, segment int) { suite.Equal(process, scheduler.processQueue.Len()) suite.Equal(wait, scheduler.waitQueue.Len()) - suite.Len(scheduler.segmentTasks, segment) - suite.Len(scheduler.channelTasks, channel) - suite.Equal(len(scheduler.tasks), process+wait) - suite.Equal(len(scheduler.tasks), segment+channel) + suite.Equal(scheduler.segmentTasks.Len(), segment) + suite.Equal(scheduler.channelTasks.Len(), channel) + suite.Equal(scheduler.tasks.Len(), process+wait) + suite.Equal(scheduler.tasks.Len(), segment+channel) } func (suite *TaskSuite) dispatchAndWait(node int64) { @@ -1506,13 +1465,14 @@ func (suite *TaskSuite) dispatchAndWait(node int64) { count = 0 keys = make([]any, 0) - for _, executor := range suite.scheduler.executors { + suite.scheduler.executors.Range(func(_ int64, executor *Executor) bool { executor.executingTasks.Range(func(taskIndex string) bool { keys = append(keys, taskIndex) count++ return true }) - } + return true + }) if count == 0 { return diff --git a/internal/querycoordv2/utils/util.go b/internal/querycoordv2/utils/util.go index 0361c83777352..e0821b5c7fae4 100644 --- a/internal/querycoordv2/utils/util.go +++ b/internal/querycoordv2/utils/util.go @@ -48,7 +48,7 @@ func CheckNodeAvailable(nodeID int64, info *session.NodeInfo) error { func CheckDelegatorDataReady(nodeMgr *session.NodeManager, targetMgr meta.TargetManagerInterface, leader *meta.LeaderView, scope int32) error { log := log.Ctx(context.TODO()). WithRateGroup("utils.CheckLeaderAvailable", 1, 60). - With(zap.Int64("leaderID", leader.ID)) + With(zap.Int64("leaderID", leader.ID), zap.Int64("collectionID", leader.CollectionID)) info := nodeMgr.Get(leader.ID) // Check whether leader is online diff --git a/pkg/metrics/querycoord_metrics.go b/pkg/metrics/querycoord_metrics.go index 67aa181421d72..d66abea2cd129 100644 --- a/pkg/metrics/querycoord_metrics.go +++ b/pkg/metrics/querycoord_metrics.go @@ -36,6 +36,7 @@ const ( LeaderGrowTaskLabel = "leader_grow" LeaderReduceTaskLabel = "leader_reduce" + LeaderUpdateTaskLabel = "leader_update" UnknownTaskLabel = "unknown"