Skip to content

Commit

Permalink
fix: Remove frequently updating metric to avoid mutex contention
Browse files Browse the repository at this point in the history
Signed-off-by: bigsheeper <[email protected]>
  • Loading branch information
bigsheeper committed Jan 1, 2025
1 parent ccbe6fc commit d941e90
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 19 deletions.
26 changes: 9 additions & 17 deletions internal/datacoord/index_meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

"github.com/prometheus/client_golang/prometheus"
"github.com/samber/lo"
"go.uber.org/atomic"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"

Expand Down Expand Up @@ -55,6 +56,8 @@ type indexMeta struct {

// segmentID -> indexID -> segmentIndex
segmentIndexes map[UniqueID]map[UniqueID]*model.SegmentIndex

lastUpdateMetricTime atomic.Time
}

// NewMeta creates meta from provided `kv.TxnKV`
Expand Down Expand Up @@ -138,6 +141,10 @@ func (m *indexMeta) updateSegIndexMeta(segIdx *model.SegmentIndex, updateFunc fu
}

func (m *indexMeta) updateIndexTasksMetrics() {
if time.Since(m.lastUpdateMetricTime.Load()) < 120*time.Second {

Check failure on line 144 in internal/datacoord/index_meta.go

View workflow job for this annotation

GitHub Actions / Code Checker MacOS 13

undefined: time
return
}
defer m.lastUpdateMetricTime.Store(time.Now())

Check failure on line 147 in internal/datacoord/index_meta.go

View workflow job for this annotation

GitHub Actions / Code Checker MacOS 13

undefined: time
taskMetrics := make(map[UniqueID]map[commonpb.IndexState]int)
for _, segIdx := range m.buildID2SegmentIndex {
if segIdx.IsDeleted {
Expand Down Expand Up @@ -166,6 +173,7 @@ func (m *indexMeta) updateIndexTasksMetrics() {
}
}
}
log.Ctx(m.ctx).Info("update index metric", zap.Int("collectionNum", len(taskMetrics)))
}

func checkParams(fieldIndex *model.Index, req *indexpb.CreateIndexRequest) bool {
Expand Down Expand Up @@ -782,7 +790,7 @@ func (m *indexMeta) GetAllSegIndexes() map[int64]*model.SegmentIndex {

segIndexes := make(map[int64]*model.SegmentIndex, len(m.buildID2SegmentIndex))
for buildID, segIndex := range m.buildID2SegmentIndex {
segIndexes[buildID] = model.CloneSegmentIndex(segIndex)
segIndexes[buildID] = segIndex
}
return segIndexes
}
Expand Down Expand Up @@ -879,22 +887,6 @@ func (m *indexMeta) CheckCleanSegmentIndex(buildID UniqueID) (bool, *model.Segme
return true, nil
}

func (m *indexMeta) GetMetasByNodeID(nodeID UniqueID) []*model.SegmentIndex {
m.RLock()
defer m.RUnlock()

metas := make([]*model.SegmentIndex, 0)
for _, segIndex := range m.buildID2SegmentIndex {
if segIndex.IsDeleted {
continue
}
if nodeID == segIndex.NodeID {
metas = append(metas, model.CloneSegmentIndex(segIndex))
}
}
return metas
}

func (m *indexMeta) getSegmentsIndexStates(collectionID UniqueID, segmentIDs []UniqueID) map[int64]map[int64]*indexpb.SegmentIndexState {
m.RLock()
defer m.RUnlock()
Expand Down
8 changes: 6 additions & 2 deletions internal/querycoordv2/task/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,8 @@ type taskScheduler struct {
channelTasks map[replicaChannelIndex]Task
processQueue *taskQueue
waitQueue *taskQueue

lastUpdateMetricTime time.Time
}

func NewScheduler(ctx context.Context,
Expand Down Expand Up @@ -279,13 +281,14 @@ func (scheduler *taskScheduler) Add(task Task) error {
scheduler.segmentTasks[index] = task
}

scheduler.updateTaskMetrics()
log.Ctx(task.Context()).Info("task added", zap.String("task", task.String()))
task.RecordStartTs()
return nil
}

func (scheduler *taskScheduler) updateTaskMetrics() {
if time.Since(scheduler.lastUpdateMetricTime) < 30*time.Second {
return
}
segmentGrowNum, segmentReduceNum, segmentMoveNum := 0, 0, 0
channelGrowNum, channelReduceNum, channelMoveNum := 0, 0, 0
for _, task := range scheduler.segmentTasks {
Expand Down Expand Up @@ -318,6 +321,7 @@ func (scheduler *taskScheduler) updateTaskMetrics() {
metrics.QueryCoordTaskNum.WithLabelValues(metrics.ChannelGrowTaskLabel).Set(float64(channelGrowNum))
metrics.QueryCoordTaskNum.WithLabelValues(metrics.ChannelReduceTaskLabel).Set(float64(channelReduceNum))
metrics.QueryCoordTaskNum.WithLabelValues(metrics.ChannelMoveTaskLabel).Set(float64(channelMoveNum))
scheduler.lastUpdateMetricTime = time.Now()
}

// check whether the task is valid to add,
Expand Down

0 comments on commit d941e90

Please sign in to comment.