Skip to content

Commit

Permalink
Refine index task scheduler policy
Browse files Browse the repository at this point in the history
Signed-off-by: Cai Zhang <[email protected]>
  • Loading branch information
xiaocai2333 committed Jan 14, 2025
1 parent da1b786 commit 5585e34
Show file tree
Hide file tree
Showing 25 changed files with 820 additions and 466 deletions.
4 changes: 4 additions & 0 deletions configs/milvus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,7 @@ indexCoord:
indexNode:
scheduler:
buildParallel: 1
slotCap: 16
enableDisk: true # enable index node build disk vector index
maxDiskUsagePercentage: 95
ip: # TCP/IP address of indexNode. If not specified, use the first unicastable address
Expand Down Expand Up @@ -631,6 +632,9 @@ dataCoord:
clusteringCompactionUsage: 16 # slot usage of clustering compaction job.
mixCompactionUsage: 8 # slot usage of mix compaction job.
l0DeleteCompactionUsage: 8 # slot usage of l0 compaction job.
indexTaskSlotUsage: 16 # slot usage of index task
statsTaskSlotUsage: 4 # slot usage of stats task
analyzeTaskSlotUsage: 16 # slot usage of analyze task
ip: # TCP/IP address of dataCoord. If not specified, use the first unicastable address
port: 13333 # TCP port of dataCoord
grpc:
Expand Down
47 changes: 47 additions & 0 deletions internal/datacoord/session/indexnode_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,18 @@ type WorkerManager interface {
RemoveNode(nodeID typeutil.UniqueID)
StoppingNode(nodeID typeutil.UniqueID)
PickClient() (typeutil.UniqueID, types.IndexNodeClient)
QuerySlots() []WorkerSlots
ClientSupportDisk() bool
GetAllClients() map[typeutil.UniqueID]types.IndexNodeClient
GetClientByID(nodeID typeutil.UniqueID) (types.IndexNodeClient, bool)
}

type WorkerSlots struct {
NodeID int64
TotalSlots int64
AvailableSlots int64
}

// IndexNodeManager is used to manage the client of IndexNode.
type IndexNodeManager struct {
nodeClients map[typeutil.UniqueID]types.IndexNodeClient
Expand Down Expand Up @@ -115,6 +122,46 @@ func (nm *IndexNodeManager) AddNode(nodeID typeutil.UniqueID, address string) er
return nil
}

func (nm *IndexNodeManager) QuerySlots() []WorkerSlots {
nm.lock.Lock()
defer nm.lock.Unlock()

ctx, cancel := context.WithTimeout(context.Background(), querySlotTimeout)
defer cancel()

nodeSlots := make([]WorkerSlots, 0)
mu := &sync.Mutex{}
wg := &sync.WaitGroup{}
for nodeID, client := range nm.nodeClients {
if _, ok := nm.stoppingNodes[nodeID]; !ok {
wg.Add(1)
go func(nodeID int64) {
defer wg.Done()
resp, err := client.GetJobStats(ctx, &workerpb.GetJobStatsRequest{})
if err != nil {
log.Warn("get IndexNode slots failed", zap.Int64("nodeID", nodeID), zap.Error(err))
return
}
if resp.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {
log.Warn("get IndexNode slots failed", zap.Int64("nodeID", nodeID),
zap.String("reason", resp.GetStatus().GetReason()))
return
}
mu.Lock()
defer mu.Unlock()
nodeSlots = append(nodeSlots, WorkerSlots{
NodeID: nodeID,
TotalSlots: resp.GetTotalSlots(),
AvailableSlots: resp.GetTaskSlots(),
})
}(nodeID)
}
}
wg.Wait()
log.Ctx(context.TODO()).Debug("query slot done", zap.Any("nodeSlots", nodeSlots))
return nodeSlots
}

func (nm *IndexNodeManager) PickClient() (typeutil.UniqueID, types.IndexNodeClient) {
nm.lock.Lock()
defer nm.lock.Unlock()
Expand Down
47 changes: 47 additions & 0 deletions internal/datacoord/session/mock_worker_manager.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 9 additions & 7 deletions internal/datacoord/task_analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,12 +133,12 @@ func (at *analyzeTask) UpdateMetaBuildingState(meta *meta) error {
return nil
}

func (at *analyzeTask) PreCheck(ctx context.Context, dependency *taskScheduler) bool {
func (at *analyzeTask) PreCheck(ctx context.Context, dependency *taskScheduler) (bool, int64) {
t := dependency.meta.analyzeMeta.GetTask(at.GetTaskID())
if t == nil {
log.Ctx(ctx).Info("task is nil, delete it", zap.Int64("taskID", at.GetTaskID()))
at.SetState(indexpb.JobState_JobStateNone, "analyze task is nil")
return false
return false, 0
}

at.req = &workerpb.AnalyzeRequest{
Expand Down Expand Up @@ -170,7 +170,7 @@ func (at *analyzeTask) PreCheck(ctx context.Context, dependency *taskScheduler)
log.Ctx(ctx).Warn("analyze stats task is processing, but segment is nil, delete the task",
zap.Int64("taskID", at.GetTaskID()), zap.Int64("segmentID", segID))
at.SetState(indexpb.JobState_JobStateFailed, fmt.Sprintf("segmentInfo with ID: %d is nil", segID))
return false
return false, 0
}

totalSegmentsRows += info.GetNumOfRows()
Expand All @@ -188,7 +188,7 @@ func (at *analyzeTask) PreCheck(ctx context.Context, dependency *taskScheduler)
log.Ctx(ctx).Warn("analyze task get collection info failed", zap.Int64("collectionID",
segments[0].GetCollectionID()), zap.Error(err))
at.SetState(indexpb.JobState_JobStateInit, err.Error())
return false
return false, 0
}

schema := collInfo.Schema
Expand All @@ -203,7 +203,7 @@ func (at *analyzeTask) PreCheck(ctx context.Context, dependency *taskScheduler)
dim, err := storage.GetDimFromParams(field.TypeParams)
if err != nil {
at.SetState(indexpb.JobState_JobStateInit, err.Error())
return false
return false, 0
}
at.req.Dim = int64(dim)

Expand All @@ -212,7 +212,7 @@ func (at *analyzeTask) PreCheck(ctx context.Context, dependency *taskScheduler)
if numClusters < Params.DataCoordCfg.ClusteringCompactionMinCentroidsNum.GetAsInt64() {
log.Ctx(ctx).Info("data size is too small, skip analyze task", zap.Float64("raw data size", totalSegmentsRawDataSize), zap.Int64("num clusters", numClusters), zap.Int64("minimum num clusters required", Params.DataCoordCfg.ClusteringCompactionMinCentroidsNum.GetAsInt64()))
at.SetState(indexpb.JobState_JobStateFinished, "")
return false
return false, 0
}
if numClusters > Params.DataCoordCfg.ClusteringCompactionMaxCentroidsNum.GetAsInt64() {
numClusters = Params.DataCoordCfg.ClusteringCompactionMaxCentroidsNum.GetAsInt64()
Expand All @@ -223,8 +223,10 @@ func (at *analyzeTask) PreCheck(ctx context.Context, dependency *taskScheduler)
at.req.MinClusterSizeRatio = Params.DataCoordCfg.ClusteringCompactionMinClusterSizeRatio.GetAsFloat()
at.req.MaxClusterSizeRatio = Params.DataCoordCfg.ClusteringCompactionMaxClusterSizeRatio.GetAsFloat()
at.req.MaxClusterSize = Params.DataCoordCfg.ClusteringCompactionMaxClusterSize.GetAsSize()
taskSlot := Params.DataCoordCfg.AnalyzeTaskSlotUsage.GetAsInt64()
at.req.TaskSlot = taskSlot

return true
return true, taskSlot
}

func (at *analyzeTask) AssignTask(ctx context.Context, client types.IndexNodeClient) bool {
Expand Down
32 changes: 23 additions & 9 deletions internal/datacoord/task_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,19 +132,19 @@ func (it *indexBuildTask) UpdateMetaBuildingState(meta *meta) error {
return meta.indexMeta.BuildIndex(it.taskID)
}

func (it *indexBuildTask) PreCheck(ctx context.Context, dependency *taskScheduler) bool {
func (it *indexBuildTask) PreCheck(ctx context.Context, dependency *taskScheduler) (bool, int64) {
segIndex, exist := dependency.meta.indexMeta.GetIndexJob(it.taskID)
if !exist || segIndex == nil {
log.Ctx(ctx).Info("index task has not exist in meta table, remove task", zap.Int64("taskID", it.taskID))
it.SetState(indexpb.JobState_JobStateNone, "index task has not exist in meta table")
return false
return false, 0
}

segment := dependency.meta.GetSegment(ctx, segIndex.SegmentID)
if !isSegmentHealthy(segment) || !dependency.meta.indexMeta.IsIndexExist(segIndex.CollectionID, segIndex.IndexID) {
log.Ctx(ctx).Info("task is no need to build index, remove it", zap.Int64("taskID", it.taskID))
it.SetState(indexpb.JobState_JobStateNone, "task is no need to build index")
return false
return false, 0
}
indexParams := dependency.meta.indexMeta.GetIndexParams(segIndex.CollectionID, segIndex.IndexID)
indexType := GetIndexType(indexParams)
Expand All @@ -154,7 +154,7 @@ func (it *indexBuildTask) PreCheck(ctx context.Context, dependency *taskSchedule
it.SetStartTime(time.Now())
it.SetEndTime(time.Now())
it.SetState(indexpb.JobState_JobStateFinished, "fake finished index success")
return false
return false, 0
}

typeParams := dependency.meta.indexMeta.GetTypeParams(segIndex.CollectionID, segIndex.IndexID)
Expand All @@ -170,7 +170,7 @@ func (it *indexBuildTask) PreCheck(ctx context.Context, dependency *taskSchedule
if ret != nil {
log.Ctx(ctx).Warn("failed to update index build params defined in yaml", zap.Int64("taskID", it.taskID), zap.Error(ret))
it.SetState(indexpb.JobState_JobStateInit, ret.Error())
return false
return false, 0
}
}

Expand All @@ -180,14 +180,14 @@ func (it *indexBuildTask) PreCheck(ctx context.Context, dependency *taskSchedule
if err != nil {
log.Ctx(ctx).Warn("failed to append index build params", zap.Int64("taskID", it.taskID), zap.Error(err))
it.SetState(indexpb.JobState_JobStateInit, err.Error())
return false
return false, 0
}
}

collectionInfo, err := dependency.handler.GetCollection(ctx, segment.GetCollectionID())
if err != nil {
log.Ctx(ctx).Info("index builder get collection info failed", zap.Int64("collectionID", segment.GetCollectionID()), zap.Error(err))
return false
return false, 0
}

schema := collectionInfo.Schema
Expand Down Expand Up @@ -215,7 +215,7 @@ func (it *indexBuildTask) PreCheck(ctx context.Context, dependency *taskSchedule
if collectionInfo == nil {
log.Ctx(ctx).Warn("get collection failed", zap.Int64("collID", segIndex.CollectionID), zap.Error(err))
it.SetState(indexpb.JobState_JobStateInit, err.Error())
return true
return true, 0
}
partitionKeyField, _ := typeutil.GetPartitionKeyFieldSchema(schema)
if partitionKeyField != nil && typeutil.IsFieldDataTypeSupportMaterializedView(partitionKeyField) {
Expand All @@ -235,6 +235,8 @@ func (it *indexBuildTask) PreCheck(ctx context.Context, dependency *taskSchedule
}
}

taskSlot := calculateIndexTaskSlot(segment.getSegmentSize())

it.req = &workerpb.CreateJobRequest{
ClusterID: Params.CommonCfg.ClusterPrefix.GetValue(),
IndexFilePrefix: path.Join(dependency.chunkManager.RootPath(), common.SegmentIndexPath),
Expand All @@ -256,11 +258,23 @@ func (it *indexBuildTask) PreCheck(ctx context.Context, dependency *taskSchedule
OptionalScalarFields: optionalFields,
Field: field,
PartitionKeyIsolation: partitionKeyIsolation,
TaskSlot: taskSlot,
}

log.Ctx(ctx).Info("index task pre check successfully", zap.Int64("taskID", it.GetTaskID()),
zap.Int64("segID", segment.GetID()))
return true
return true, taskSlot
}

func calculateIndexTaskSlot(segmentSize int64) int64 {
if segmentSize > 500*1024*1024 {
return max(Params.DataCoordCfg.IndexTaskSlotUsage.GetAsInt64(), 1)
} else if segmentSize > 100*1024*1024 {
return max(Params.DataCoordCfg.IndexTaskSlotUsage.GetAsInt64()/2, 1)
} else if segmentSize > 10*1024*1024 {
return max(Params.DataCoordCfg.IndexTaskSlotUsage.GetAsInt64()/4, 1)
}
return max(Params.DataCoordCfg.IndexTaskSlotUsage.GetAsInt64()/8, 1)
}

func (it *indexBuildTask) AssignTask(ctx context.Context, client types.IndexNodeClient) bool {
Expand Down
Loading

0 comments on commit 5585e34

Please sign in to comment.