Skip to content

Commit

Permalink
fix: [2.5]Restore the compacting state for stats task during recovery (
Browse files Browse the repository at this point in the history
…#39460)

issue: #39333 

master pr: #39459

---------

Signed-off-by: Cai Zhang <[email protected]>
  • Loading branch information
xiaocai2333 authored Jan 20, 2025
1 parent 15d60c6 commit 817b616
Show file tree
Hide file tree
Showing 5 changed files with 127 additions and 4 deletions.
2 changes: 1 addition & 1 deletion internal/datacoord/job_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ func (jm *statsJobManager) SubmitStatsTask(originSegmentID, targetSegmentID int6
}
if err = jm.mt.statsTaskMeta.AddStatsTask(t); err != nil {
if errors.Is(err, merr.ErrTaskDuplicate) {
log.Info("stats task already exists", zap.Int64("taskID", taskID),
log.RatedInfo(10, "stats task already exists", zap.Int64("taskID", taskID),
zap.Int64("collectionID", originSegment.GetCollectionID()),
zap.Int64("segmentID", originSegment.GetID()))
return nil
Expand Down
4 changes: 2 additions & 2 deletions internal/datacoord/stats_task_meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,10 @@ func (stm *statsTaskMeta) AddStatsTask(t *indexpb.StatsTask) error {
defer stm.Unlock()

for _, st := range stm.tasks {
if st.GetSegmentID() == t.GetSegmentID() && st.GetSubJobType() == t.GetSubJobType() {
if st.GetTaskID() == t.GetTaskID() || (st.GetSegmentID() == t.GetSegmentID() && st.GetSubJobType() == t.GetSubJobType() && st.GetState() != indexpb.JobState_JobStateFailed) {
msg := fmt.Sprintf("stats task already exist in meta of segment %d with subJobType: %s",
t.GetSegmentID(), t.GetSubJobType().String())
log.Warn(msg)
log.RatedWarn(10, msg, zap.Int64("taskID", t.GetTaskID()), zap.Int64("exist taskID", st.GetTaskID()))
return merr.WrapErrTaskDuplicate(indexpb.JobType_JobTypeStatsJob.String(), msg)
}
}
Expand Down
4 changes: 3 additions & 1 deletion internal/datacoord/task_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,9 @@ func (it *indexBuildTask) PreCheck(ctx context.Context, dependency *taskSchedule
}

log.Ctx(ctx).Info("index task pre check successfully", zap.Int64("taskID", it.GetTaskID()),
zap.Int64("segID", segment.GetID()))
zap.Int64("segID", segment.GetID()),
zap.Int32("CurrentIndexVersion", it.req.GetCurrentIndexVersion()),
zap.Int32("CurrentScalarIndexVersion", it.req.GetCurrentScalarIndexVersion()))
return true
}

Expand Down
15 changes: 15 additions & 0 deletions internal/datacoord/task_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,20 @@ func (s *taskScheduler) reloadFromMeta() {
allStatsTasks := s.meta.statsTaskMeta.GetAllTasks()
for taskID, t := range allStatsTasks {
if t.GetState() != indexpb.JobState_JobStateFinished && t.GetState() != indexpb.JobState_JobStateFailed {
if t.GetState() == indexpb.JobState_JobStateInProgress || t.GetState() == indexpb.JobState_JobStateRetry {
exist, canDo := s.meta.CheckAndSetSegmentsCompacting(context.TODO(), []UniqueID{t.GetSegmentID()})
if !exist || !canDo {
log.Ctx(s.ctx).Warn("segment is not exist or is compacting, skip stats, but this should not have happened, try to remove the stats task",
zap.Int64("taskID", taskID), zap.Bool("exist", exist), zap.Bool("canDo", canDo))
err := s.meta.statsTaskMeta.DropStatsTask(t.GetTaskID())
if err == nil {
continue
}
log.Ctx(s.ctx).Warn("remove stats task failed, set to failed", zap.Int64("taskID", taskID), zap.Error(err))
t.State = indexpb.JobState_JobStateFailed
t.FailReason = "segment is nto exist or is compacting"
}
}
s.enqueue(&statsTask{
taskID: taskID,
segmentID: t.GetSegmentID(),
Expand Down Expand Up @@ -465,6 +479,7 @@ func (s *taskScheduler) processInProgress(task Task) bool {
if exist {
task.QueryResult(s.ctx, client)
if task.GetState() == indexpb.JobState_JobStateFinished || task.GetState() == indexpb.JobState_JobStateFailed {
task.ResetTask(s.meta)
return s.processFinished(task)
}
return true
Expand Down
106 changes: 106 additions & 0 deletions internal/datacoord/task_scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ var (
buildID = UniqueID(600)
nodeID = UniqueID(700)
partitionKeyID = UniqueID(800)
statsTaskID = UniqueID(900)
)

func createIndexMeta(catalog metastore.DataCoordCatalog) *indexMeta {
Expand Down Expand Up @@ -1924,3 +1925,108 @@ func (s *taskSchedulerSuite) Test_indexTaskWithMvOptionalScalarField() {
})
scheduler_isolation.Stop()
}

func (s *taskSchedulerSuite) Test_reload() {
s.Run("normal case", func() {
catalog := catalogmocks.NewDataCoordCatalog(s.T())
workerManager := session.NewMockWorkerManager(s.T())
handler := NewNMockHandler(s.T())
mt := createMeta(catalog, withAnalyzeMeta(s.createAnalyzeMeta(catalog)), withIndexMeta(createIndexMeta(catalog)),
withStatsTaskMeta(&statsTaskMeta{
ctx: context.Background(),
catalog: catalog,
tasks: map[int64]*indexpb.StatsTask{
statsTaskID: {
CollectionID: 10000,
PartitionID: 10001,
SegmentID: 1000,
InsertChannel: "",
TaskID: statsTaskID,
Version: 1,
NodeID: 1,
State: indexpb.JobState_JobStateInProgress,
FailReason: "",
TargetSegmentID: 2000,
SubJobType: indexpb.StatsSubJob_Sort,
CanRecycle: false,
},
},
}))
scheduler := newTaskScheduler(context.Background(), mt, workerManager, nil, nil, handler, nil)
s.NotNil(scheduler)
s.True(mt.segments.segments[1000].isCompacting)
task, ok := scheduler.tasks[statsTaskID]
s.True(ok)
s.NotNil(task)
})

s.Run("segment is compacting", func() {
catalog := catalogmocks.NewDataCoordCatalog(s.T())
catalog.EXPECT().DropStatsTask(mock.Anything, mock.Anything).Return(nil)
workerManager := session.NewMockWorkerManager(s.T())
handler := NewNMockHandler(s.T())
mt := createMeta(catalog, withAnalyzeMeta(s.createAnalyzeMeta(catalog)), withIndexMeta(createIndexMeta(catalog)),
withStatsTaskMeta(&statsTaskMeta{
ctx: context.Background(),
catalog: catalog,
tasks: map[int64]*indexpb.StatsTask{
statsTaskID: {
CollectionID: 10000,
PartitionID: 10001,
SegmentID: 1000,
InsertChannel: "",
TaskID: statsTaskID,
Version: 1,
NodeID: 1,
State: indexpb.JobState_JobStateInProgress,
FailReason: "",
TargetSegmentID: 2000,
SubJobType: indexpb.StatsSubJob_Sort,
CanRecycle: false,
},
},
}))
mt.segments.segments[1000].isCompacting = true
scheduler := newTaskScheduler(context.Background(), mt, workerManager, nil, nil, handler, nil)
s.NotNil(scheduler)
s.True(mt.segments.segments[1000].isCompacting)
task, ok := scheduler.tasks[statsTaskID]
s.False(ok)
s.Nil(task)
})

s.Run("drop task failed", func() {
catalog := catalogmocks.NewDataCoordCatalog(s.T())
catalog.EXPECT().DropStatsTask(mock.Anything, mock.Anything).Return(errors.New("mock error"))
workerManager := session.NewMockWorkerManager(s.T())
handler := NewNMockHandler(s.T())
mt := createMeta(catalog, withAnalyzeMeta(s.createAnalyzeMeta(catalog)), withIndexMeta(createIndexMeta(catalog)),
withStatsTaskMeta(&statsTaskMeta{
ctx: context.Background(),
catalog: catalog,
tasks: map[int64]*indexpb.StatsTask{
statsTaskID: {
CollectionID: 10000,
PartitionID: 10001,
SegmentID: 1000,
InsertChannel: "",
TaskID: statsTaskID,
Version: 1,
NodeID: 1,
State: indexpb.JobState_JobStateInProgress,
FailReason: "",
TargetSegmentID: 2000,
SubJobType: indexpb.StatsSubJob_Sort,
CanRecycle: false,
},
},
}))
mt.segments.segments[1000].isCompacting = true
scheduler := newTaskScheduler(context.Background(), mt, workerManager, nil, nil, handler, nil)
s.NotNil(scheduler)
s.True(mt.segments.segments[1000].isCompacting)
task, ok := scheduler.tasks[statsTaskID]
s.True(ok)
s.Equal(indexpb.JobState_JobStateFailed, task.GetState())
})
}

0 comments on commit 817b616

Please sign in to comment.