Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enhance: clean failed/timeout compaction task in compactionHandler #34991

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions internal/datacoord/compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ type compactionPlanHandler struct {
executingGuard lock.RWMutex
executingTasks map[int64]CompactionTask // planID -> task

cleaningGuard lock.RWMutex
cleaningTasks map[int64]CompactionTask // planID -> task

meta CompactionMeta
allocator allocator.Allocator
sessions session.DataNodeManager
Expand Down Expand Up @@ -193,6 +196,7 @@ func newCompactionPlanHandler(cluster Cluster, sessions session.DataNodeManager,
stopCh: make(chan struct{}),
cluster: cluster,
executingTasks: make(map[int64]CompactionTask),
cleaningTasks: make(map[int64]CompactionTask),
analyzeScheduler: analyzeScheduler,
handler: handler,
}
Expand Down Expand Up @@ -406,6 +410,7 @@ func (c *compactionPlanHandler) loopCheck() {
if err != nil {
log.Info("fail to update compaction", zap.Error(err))
}
c.cleanFailedTasks()
}
}
}
Expand Down Expand Up @@ -648,6 +653,11 @@ func assignNodeID(slots map[int64]int64, t CompactionTask) int64 {
return nodeID
}

// checkCompaction retrieves executing tasks and calls each task's Process() method
// to evaluate its state and progress through the state machine.
// Completed tasks are removed from executingTasks.
// Tasks that fail or timeout are moved from executingTasks to cleaningTasks,
// where task-specific clean logic is performed asynchronously.
func (c *compactionPlanHandler) checkCompaction() error {
// Get executing executingTasks before GetCompactionState from DataNode to prevent false failure,
// for DC might add new task while GetCompactionState.
Expand Down Expand Up @@ -689,9 +699,38 @@ func (c *compactionPlanHandler) checkCompaction() error {
metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", t.GetTaskProto().GetNodeID()), t.GetTaskProto().GetType().String(), metrics.Done).Inc()
}
c.executingGuard.Unlock()

// insert task need to clean
c.cleaningGuard.Lock()
for _, t := range finishedTasks {
if t.GetTaskProto().GetState() == datapb.CompactionTaskState_failed || t.GetTaskProto().GetState() == datapb.CompactionTaskState_timeout {
c.cleaningTasks[t.GetTaskProto().GetPlanID()] = t
}
}
c.cleaningGuard.Unlock()

return nil
}

// cleanFailedTasks performs task define Clean logic
// while compactionPlanHandler.Clean is to do garbage collection for cleaned tasks
func (c *compactionPlanHandler) cleanFailedTasks() {
c.cleaningGuard.RLock()
cleanedTasks := make([]CompactionTask, 0)
for _, t := range c.cleaningTasks {
clean := t.Clean()
if clean {
cleanedTasks = append(cleanedTasks, t)
}
}
c.cleaningGuard.RUnlock()
c.cleaningGuard.Lock()
for _, t := range cleanedTasks {
delete(c.cleaningTasks, t.GetTaskProto().GetPlanID())
}
c.cleaningGuard.Unlock()
}

func pickAnyNode(nodeSlots map[int64]int64, task CompactionTask) (nodeID int64, useSlot int64) {
nodeID = NullNodeID
var maxSlots int64 = -1
Expand Down
10 changes: 10 additions & 0 deletions internal/datacoord/compaction_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,17 @@ import (
)

type CompactionTask interface {
// Process performs the task's state machine
//
// Returns:
// - <bool>: whether the task state machine ends.
//
// Notes:
//
// `end` doesn't mean the task completed, its state may be completed or failed or timeout.
Process() bool
// Clean performs clean logic for a fail/timeout task
Clean() bool
BuildCompactionRequest() (*datapb.CompactionPlan, error)
GetSlotUsage() int64
GetLabel() string
Expand Down
64 changes: 46 additions & 18 deletions internal/datacoord/compaction_task_clustering.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ func newClusteringCompactionTask(t *datapb.CompactionTask, allocator allocator.A
return task
}

// Note: return True means exit this state machine.
// ONLY return True for Completed, Failed or Timeout
func (t *clusteringCompactionTask) Process() bool {
log := log.With(zap.Int64("triggerID", t.GetTaskProto().GetTriggerID()), zap.Int64("PlanID", t.GetTaskProto().GetPlanID()), zap.Int64("collectionID", t.GetTaskProto().GetCollectionID()))
lastState := t.GetTaskProto().GetState().String()
Expand Down Expand Up @@ -125,15 +127,22 @@ func (t *clusteringCompactionTask) Process() bool {
if err != nil {
log.Warn("Failed to updateAndSaveTaskMeta", zap.Error(err))
}
log.Info("clustering compaction task state changed", zap.String("lastState", lastState), zap.String("currentState", currentState), zap.Int64("elapse seconds", lastStateDuration))
}
log.Debug("process clustering task", zap.String("lastState", lastState), zap.String("currentState", currentState))
return t.GetTaskProto().State == datapb.CompactionTaskState_completed || t.GetTaskProto().State == datapb.CompactionTaskState_cleaned
return t.GetTaskProto().State == datapb.CompactionTaskState_completed ||
t.GetTaskProto().State == datapb.CompactionTaskState_cleaned ||
t.GetTaskProto().State == datapb.CompactionTaskState_failed ||
t.GetTaskProto().State == datapb.CompactionTaskState_timeout
}

// retryableProcess process task's state transfer, return error if not work as expected
// the outer Process will set state and retry times according to the error type(retryable or not-retryable)
func (t *clusteringCompactionTask) retryableProcess() error {
if t.GetTaskProto().State == datapb.CompactionTaskState_completed || t.GetTaskProto().State == datapb.CompactionTaskState_cleaned {
if t.GetTaskProto().State == datapb.CompactionTaskState_completed ||
t.GetTaskProto().State == datapb.CompactionTaskState_cleaned ||
t.GetTaskProto().State == datapb.CompactionTaskState_failed ||
t.GetTaskProto().State == datapb.CompactionTaskState_timeout {
return nil
}

Expand Down Expand Up @@ -162,15 +171,25 @@ func (t *clusteringCompactionTask) retryableProcess() error {
return t.processIndexing()
case datapb.CompactionTaskState_statistic:
return t.processStats()

case datapb.CompactionTaskState_timeout:
return t.processFailedOrTimeout()
case datapb.CompactionTaskState_failed:
return t.processFailedOrTimeout()
case datapb.CompactionTaskState_timeout:
return t.processFailedOrTimeout()
}
return nil
}

func (t *clusteringCompactionTask) Clean() bool {
log := log.With(zap.Int64("planID", t.GetTaskProto().GetPlanID()), zap.String("type", t.GetTaskProto().GetType().String()))
log.Info("clean task")
err := t.doClean()
if err != nil {
log.Warn("clean task fail", zap.Error(err))
return false
}
return true
}

func (t *clusteringCompactionTask) BuildCompactionRequest() (*datapb.CompactionPlan, error) {
beginLogID, _, err := t.allocator.AllocN(1)
if err != nil {
Expand Down Expand Up @@ -280,12 +299,7 @@ func (t *clusteringCompactionTask) processExecuting() error {
return t.processMetaSaved()
case datapb.CompactionTaskState_executing:
if t.checkTimeout() {
err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_timeout))
if err == nil {
return t.processFailedOrTimeout()
} else {
return err
}
return t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_timeout))
}
return nil
case datapb.CompactionTaskState_failed:
Expand Down Expand Up @@ -523,12 +537,17 @@ func (t *clusteringCompactionTask) resetSegmentCompacting() {
}

func (t *clusteringCompactionTask) processFailedOrTimeout() error {
log.Info("clean task", zap.Int64("triggerID", t.GetTaskProto().GetTriggerID()), zap.Int64("planID", t.GetTaskProto().GetPlanID()), zap.String("state", t.GetTaskProto().GetState().String()))
return nil
}

func (t *clusteringCompactionTask) doClean() error {
log := log.With(zap.Int64("planID", t.GetTaskProto().GetPlanID()))
log.Info("clean task", zap.Int64("triggerID", t.GetTaskProto().GetTriggerID()), zap.String("state", t.GetTaskProto().GetState().String()))

if err := t.sessions.DropCompactionPlan(t.GetTaskProto().GetNodeID(), &datapb.DropCompactionPlanRequest{
PlanID: t.GetTaskProto().GetPlanID(),
}); err != nil {
log.Warn("clusteringCompactionTask processFailedOrTimeout unable to drop compaction plan", zap.Int64("planID", t.GetTaskProto().GetPlanID()), zap.Error(err))
log.Warn("clusteringCompactionTask unable to drop compaction plan", zap.Error(err))
}
isInputDropped := false
for _, segID := range t.GetTaskProto().GetInputSegments() {
Expand All @@ -544,8 +563,8 @@ func (t *clusteringCompactionTask) processFailedOrTimeout() error {
// revert segments meta
var operators []UpdateOperator
// revert level of input segments
// L1 : L1 ->(processPipelining)-> L2 ->(processFailedOrTimeout)-> L1
// L2 : L2 ->(processPipelining)-> L2 ->(processFailedOrTimeout)-> L2
// L1 : L1 ->(process)-> L2 ->(clean)-> L1
// L2 : L2 ->(process)-> L2 ->(clean)-> L2
for _, segID := range t.GetTaskProto().GetInputSegments() {
operators = append(operators, RevertSegmentLevelOperator(segID))
}
Expand Down Expand Up @@ -583,8 +602,6 @@ func (t *clusteringCompactionTask) processFailedOrTimeout() error {
}
}

t.resetSegmentCompacting()

// drop partition stats if uploaded
partitionStatsInfo := &datapb.PartitionStatsInfo{
CollectionID: t.GetTaskProto().GetCollectionID(),
Expand All @@ -596,9 +613,20 @@ func (t *clusteringCompactionTask) processFailedOrTimeout() error {
err := t.meta.CleanPartitionStatsInfo(context.TODO(), partitionStatsInfo)
if err != nil {
log.Warn("gcPartitionStatsInfo fail", zap.Error(err))
return merr.WrapErrCleanPartitionStatsFail(fmt.Sprintf("%d-%d-%s-%d", t.GetTaskProto().GetCollectionID(), t.GetTaskProto().GetPartitionID(), t.GetTaskProto().GetChannel(), t.GetTaskProto().GetPlanID()))
}

return t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_cleaned))
err = t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_cleaned))
if err != nil {
log.Warn("clusteringCompactionTask fail to updateAndSaveTaskMeta", zap.Error(err))
return err
}

// resetSegmentCompacting must be the last step of Clean, to make sure resetSegmentCompacting only called once
// otherwise, it may unlock segments locked by other compaction tasks
t.resetSegmentCompacting()
log.Info("clusteringCompactionTask clean done")
return nil
}

func (t *clusteringCompactionTask) doAnalyze() error {
Expand Down
48 changes: 39 additions & 9 deletions internal/datacoord/compaction_task_clustering_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,14 +109,43 @@ func (s *ClusteringCompactionTaskSuite) TestClusteringCompactionSegmentMetaChang
s.mockSessionMgr.EXPECT().Compaction(mock.Anything, mock.Anything, mock.Anything).Return(nil)
task := s.generateBasicTask(false)

task.processPipelining()
err := task.processPipelining()
s.NoError(err)

seg11 := s.meta.GetSegment(context.TODO(), 101)
s.Equal(datapb.SegmentLevel_L1, seg11.Level)
seg21 := s.meta.GetSegment(context.TODO(), 102)
s.Equal(datapb.SegmentLevel_L2, seg21.Level)
s.Equal(int64(10000), seg21.PartitionStatsVersion)

task.updateAndSaveTaskMeta(setResultSegments([]int64{103, 104}))
// fake some compaction result segment
s.meta.AddSegment(context.TODO(), &SegmentInfo{
SegmentInfo: &datapb.SegmentInfo{
ID: 103,
State: commonpb.SegmentState_Flushed,
Level: datapb.SegmentLevel_L2,
LastLevel: datapb.SegmentLevel_L1,
CreatedByCompaction: true,
PartitionStatsVersion: 10001,
},
})
s.meta.AddSegment(context.TODO(), &SegmentInfo{
SegmentInfo: &datapb.SegmentInfo{
ID: 104,
State: commonpb.SegmentState_Flushed,
Level: datapb.SegmentLevel_L2,
LastLevel: datapb.SegmentLevel_L1,
CreatedByCompaction: true,
PartitionStatsVersion: 10001,
},
})

s.mockSessionMgr.EXPECT().DropCompactionPlan(mock.Anything, mock.Anything).Return(nil)

err = task.doClean()
s.NoError(err)

s.Run("v2.4.x", func() {
// fake some compaction result segment
s.meta.AddSegment(context.TODO(), &SegmentInfo{
Expand Down Expand Up @@ -164,6 +193,7 @@ func (s *ClusteringCompactionTaskSuite) TestClusteringCompactionSegmentMetaChang
task.GetTaskProto().ResultSegments = []int64{103, 104}

task.processFailedOrTimeout()
task.Clean()

seg12 := s.meta.GetSegment(context.TODO(), 101)
s.Equal(datapb.SegmentLevel_L1, seg12.Level)
Expand Down Expand Up @@ -253,6 +283,7 @@ func (s *ClusteringCompactionTaskSuite) TestClusteringCompactionSegmentMetaChang
task.GetTaskProto().ResultSegments = []int64{105, 106}

task.processFailedOrTimeout()
task.Clean()

seg12 := s.meta.GetSegment(context.TODO(), 101)
s.Equal(datapb.SegmentLevel_L1, seg12.Level)
Expand Down Expand Up @@ -336,7 +367,7 @@ func (s *ClusteringCompactionTaskSuite) TestProcessRetryLogic() {
s.Equal(false, task.Process())
s.Equal(int32(3), task.GetTaskProto().RetryTimes)
s.Equal(datapb.CompactionTaskState_pipelining, task.GetTaskProto().GetState())
s.Equal(false, task.Process())
s.True(task.Process())
s.Equal(int32(0), task.GetTaskProto().RetryTimes)
s.Equal(datapb.CompactionTaskState_failed, task.GetTaskProto().GetState())
}
Expand All @@ -345,7 +376,7 @@ func (s *ClusteringCompactionTaskSuite) TestProcessPipelining() {
s.Run("process pipelining fail, segment not found", func() {
task := s.generateBasicTask(false)
task.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining))
s.Equal(false, task.Process())
s.True(task.Process())
s.Equal(datapb.CompactionTaskState_failed, task.GetTaskProto().GetState())
})

Expand Down Expand Up @@ -570,11 +601,10 @@ func (s *ClusteringCompactionTaskSuite) TestProcessExecuting() {
},
},
}, nil).Once()
s.mockSessionMgr.EXPECT().DropCompactionPlan(mock.Anything, mock.Anything).Return(nil).Once()

time.Sleep(time.Second * 1)
s.Equal(true, task.Process())
s.Equal(datapb.CompactionTaskState_cleaned, task.GetTaskProto().GetState())
s.True(task.Process())
s.Equal(datapb.CompactionTaskState_timeout, task.GetTaskProto().GetState())
})
}

Expand Down Expand Up @@ -675,7 +705,7 @@ func (s *ClusteringCompactionTaskSuite) TestProcessAnalyzingState() {
s.Run("analyze task not found", func() {
task := s.generateBasicTask(false)
task.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_analyzing))
s.False(task.Process())
s.True(task.Process())
s.Equal(datapb.CompactionTaskState_failed, task.GetTaskProto().GetState())
})

Expand All @@ -691,7 +721,7 @@ func (s *ClusteringCompactionTaskSuite) TestProcessAnalyzingState() {
State: indexpb.JobState_JobStateFailed,
}
s.meta.analyzeMeta.AddAnalyzeTask(t)
s.False(task.Process())
s.True(task.Process())
s.Equal(datapb.CompactionTaskState_failed, task.GetTaskProto().GetState())
})

Expand All @@ -708,7 +738,7 @@ func (s *ClusteringCompactionTaskSuite) TestProcessAnalyzingState() {
CentroidsFile: "",
}
s.meta.analyzeMeta.AddAnalyzeTask(t)
s.False(task.Process())
s.True(task.Process())
s.Equal(datapb.CompactionTaskState_failed, task.GetTaskProto().GetState())
})

Expand Down
Loading
Loading