Skip to content

Commit

Permalink
enhance: clean failed/timeout compaction task in compactionHandler
Browse files Browse the repository at this point in the history
Signed-off-by: wayblink <[email protected]>
  • Loading branch information
wayblink committed Aug 26, 2024
1 parent 9dc1311 commit e645c88
Show file tree
Hide file tree
Showing 9 changed files with 228 additions and 24 deletions.
40 changes: 39 additions & 1 deletion internal/datacoord/compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ type compactionPlanHandler struct {
executingGuard lock.RWMutex
executingTasks map[int64]CompactionTask // planID -> task

cleaningGuard lock.RWMutex
cleaningTasks map[int64]CompactionTask // planID -> task

meta CompactionMeta
allocator allocator.Allocator
chManager ChannelManager
Expand Down Expand Up @@ -182,13 +185,14 @@ func newCompactionPlanHandler(cluster Cluster, sessions session.DataNodeManager,
) *compactionPlanHandler {
return &compactionPlanHandler{
queueTasks: make(map[int64]CompactionTask),
executingTasks: make(map[int64]CompactionTask),
cleaningTasks: make(map[int64]CompactionTask),
chManager: cm,
meta: meta,
sessions: sessions,
allocator: allocator,
stopCh: make(chan struct{}),
cluster: cluster,
executingTasks: make(map[int64]CompactionTask),
taskNumber: atomic.NewInt32(0),
analyzeScheduler: analyzeScheduler,
handler: handler,
Expand Down Expand Up @@ -373,6 +377,7 @@ func (c *compactionPlanHandler) loopCheck() {
if err != nil {
log.Info("fail to update compaction", zap.Error(err))
}
c.cleanFailedTasks()
}
}
}
Expand Down Expand Up @@ -637,6 +642,11 @@ func (c *compactionPlanHandler) assignNodeIDs(tasks []CompactionTask) {
}
}

// checkCompaction retrieves executing tasks and calls each task's Process() method
// to evaluate its state and progress through the state machine.
// Completed tasks are removed from executingTasks.
// Tasks that fail or timeout are moved from executingTasks to cleaningTasks,
// where task-specific clean logic is performed asynchronously.
func (c *compactionPlanHandler) checkCompaction() error {
// Get executing executingTasks before GetCompactionState from DataNode to prevent false failure,
// for DC might add new task while GetCompactionState.
Expand Down Expand Up @@ -672,9 +682,37 @@ func (c *compactionPlanHandler) checkCompaction() error {
}
c.executingGuard.Unlock()
c.taskNumber.Sub(int32(len(finishedTasks)))

// insert task need to clean
c.cleaningGuard.Lock()
for _, t := range finishedTasks {
if t.GetState() == datapb.CompactionTaskState_failed || t.GetState() == datapb.CompactionTaskState_timeout {
c.cleaningTasks[t.GetPlanID()] = t
}
}
c.cleaningGuard.Unlock()
return nil
}

// cleanFailedTasks performs task define Clean logic
// while compactionPlanHandler.Clean is to do garbage collection for cleaned tasks
func (c *compactionPlanHandler) cleanFailedTasks() {
c.cleaningGuard.RLock()
cleanedTasks := make([]CompactionTask, 0)
for _, t := range c.cleaningTasks {
clean := t.Clean()
if clean {
cleanedTasks = append(cleanedTasks, t)
}
}
c.cleaningGuard.RUnlock()
c.cleaningGuard.Lock()
for _, t := range cleanedTasks {
delete(c.cleaningTasks, t.GetPlanID())
}
c.cleaningGuard.Unlock()
}

func (c *compactionPlanHandler) pickAnyNode(nodeSlots map[int64]int64, task CompactionTask) (nodeID int64, useSlot int64) {
nodeID = NullNodeID
var maxSlots int64 = -1
Expand Down
10 changes: 10 additions & 0 deletions internal/datacoord/compaction_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,17 @@ import (
)

type CompactionTask interface {
// Process performs the task's state machine
//
// Returns:
// - <bool>: whether the task state machine ends.
//
// Notes:
//
// `end` doesn't mean the task completed, its state may be completed or failed or timeout.
Process() bool
// Clean performs clean logic for a fail/timeout task
Clean() bool
BuildCompactionRequest() (*datapb.CompactionPlan, error)

GetTriggerID() UniqueID
Expand Down
41 changes: 25 additions & 16 deletions internal/datacoord/compaction_task_clustering.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@ func (t *clusteringCompactionTask) Process() bool {
if currentState != lastState {
ts := time.Now().Unix()
lastStateDuration := ts - t.GetLastStateStartTime()
log.Info("clustering compaction task state changed", zap.String("lastState", lastState), zap.String("currentState", currentState), zap.Int64("elapse seconds", lastStateDuration))
metrics.DataCoordCompactionLatency.
WithLabelValues(fmt.Sprint(typeutil.IsVectorType(t.GetClusteringKeyField().DataType)), fmt.Sprint(t.CollectionID), t.Channel, datapb.CompactionType_ClusteringCompaction.String(), lastState).
Observe(float64(lastStateDuration * 1000))
Expand All @@ -109,15 +108,22 @@ func (t *clusteringCompactionTask) Process() bool {
if err != nil {
log.Warn("Failed to updateAndSaveTaskMeta", zap.Error(err))
}
log.Info("clustering compaction task state changed", zap.String("lastState", lastState), zap.String("currentState", currentState), zap.Int64("elapse seconds", lastStateDuration))
}
log.Debug("process clustering task", zap.String("lastState", lastState), zap.String("currentState", currentState))
return t.State == datapb.CompactionTaskState_completed || t.State == datapb.CompactionTaskState_cleaned
return t.State == datapb.CompactionTaskState_completed ||
t.State == datapb.CompactionTaskState_cleaned ||
t.State == datapb.CompactionTaskState_failed ||
t.State == datapb.CompactionTaskState_timeout
}

// retryableProcess process task's state transfer, return error if not work as expected
// the outer Process will set state and retry times according to the error type(retryable or not-retryable)
func (t *clusteringCompactionTask) retryableProcess() error {
if t.State == datapb.CompactionTaskState_completed || t.State == datapb.CompactionTaskState_cleaned {
if t.State == datapb.CompactionTaskState_completed ||
t.State == datapb.CompactionTaskState_cleaned ||
t.State == datapb.CompactionTaskState_failed ||
t.State == datapb.CompactionTaskState_timeout {
return nil
}

Expand All @@ -144,14 +150,21 @@ func (t *clusteringCompactionTask) retryableProcess() error {
return t.processMetaSaved()
case datapb.CompactionTaskState_indexing:
return t.processIndexing()
case datapb.CompactionTaskState_timeout:
return t.processFailedOrTimeout()
case datapb.CompactionTaskState_failed:
return t.processFailedOrTimeout()
}
return nil
}

func (t *clusteringCompactionTask) Clean() bool {
log := log.With(zap.Int64("planID", t.GetPlanID()), zap.String("type", t.GetType().String()))
log.Info("clean task")
err := t.doClean()
if err != nil {
log.Warn("clean task fail", zap.Error(err))
return false
}
return true
}

func (t *clusteringCompactionTask) BuildCompactionRequest() (*datapb.CompactionPlan, error) {
beginLogID, _, err := t.allocator.AllocN(1)
if err != nil {
Expand Down Expand Up @@ -269,12 +282,7 @@ func (t *clusteringCompactionTask) processExecuting() error {
return t.processMetaSaved()
case datapb.CompactionTaskState_executing:
if t.checkTimeout() {
err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_timeout))
if err == nil {
return t.processFailedOrTimeout()
} else {
return err
}
return t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_timeout))
}
return nil
case datapb.CompactionTaskState_failed:
Expand Down Expand Up @@ -374,13 +382,13 @@ func (t *clusteringCompactionTask) resetSegmentCompacting() {
t.meta.SetSegmentsCompacting(t.GetInputSegments(), false)
}

func (t *clusteringCompactionTask) processFailedOrTimeout() error {
func (t *clusteringCompactionTask) doClean() error {
log.Info("clean task", zap.Int64("triggerID", t.GetTriggerID()), zap.Int64("planID", t.GetPlanID()), zap.String("state", t.GetState().String()))
// revert segments meta
var operators []UpdateOperator
// revert level of input segments
// L1 : L1 ->(processPipelining)-> L2 ->(processFailedOrTimeout)-> L1
// L2 : L2 ->(processPipelining)-> L2 ->(processFailedOrTimeout)-> L2
// L1 : L1 ->(processPipelining)-> L2 ->(doClean)-> L1
// L2 : L2 ->(processPipelining)-> L2 ->(doClean)-> L2
for _, segID := range t.InputSegments {
operators = append(operators, RevertSegmentLevelOperator(segID))
}
Expand All @@ -407,6 +415,7 @@ func (t *clusteringCompactionTask) processFailedOrTimeout() error {
err = t.meta.CleanPartitionStatsInfo(partitionStatsInfo)
if err != nil {
log.Warn("gcPartitionStatsInfo fail", zap.Error(err))
return merr.WrapErrCleanPartitionStatsFail(fmt.Sprintf("%d-%d-%s-%d", t.GetCollectionID(), t.GetPartitionID(), t.GetChannel(), t.GetPlanID()))
}

return t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_cleaned))
Expand Down
14 changes: 7 additions & 7 deletions internal/datacoord/compaction_task_clustering_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func (s *ClusteringCompactionTaskSuite) TestClusteringCompactionSegmentMetaChang
},
})

task.processFailedOrTimeout()
task.doClean()

seg12 := s.meta.GetSegment(101)
s.Equal(datapb.SegmentLevel_L1, seg12.Level)
Expand Down Expand Up @@ -204,7 +204,7 @@ func (s *ClusteringCompactionTaskSuite) TestProcessRetryLogic() {
s.Equal(false, task.Process())
s.Equal(int32(3), task.RetryTimes)
s.Equal(datapb.CompactionTaskState_pipelining, task.GetState())
s.Equal(false, task.Process())
s.Equal(true, task.Process())
s.Equal(int32(0), task.RetryTimes)
s.Equal(datapb.CompactionTaskState_failed, task.GetState())
}
Expand All @@ -213,7 +213,7 @@ func (s *ClusteringCompactionTaskSuite) TestProcessPipelining() {
s.Run("process pipelining fail, segment not found", func() {
task := s.generateBasicTask(false)
task.State = datapb.CompactionTaskState_pipelining
s.Equal(false, task.Process())
s.Equal(true, task.Process())
s.Equal(datapb.CompactionTaskState_failed, task.GetState())
})

Expand Down Expand Up @@ -439,7 +439,7 @@ func (s *ClusteringCompactionTaskSuite) TestProcessExecuting() {
}, nil).Once()
time.Sleep(time.Second * 1)
s.Equal(true, task.Process())
s.Equal(datapb.CompactionTaskState_cleaned, task.GetState())
s.Equal(datapb.CompactionTaskState_timeout, task.GetState())
})
}

Expand Down Expand Up @@ -538,7 +538,7 @@ func (s *ClusteringCompactionTaskSuite) TestProcessAnalyzingState() {
s.Run("analyze task not found", func() {
task := s.generateBasicTask(false)
task.State = datapb.CompactionTaskState_analyzing
s.False(task.Process())
s.True(task.Process())
s.Equal(datapb.CompactionTaskState_failed, task.GetState())
})

Expand All @@ -555,7 +555,7 @@ func (s *ClusteringCompactionTaskSuite) TestProcessAnalyzingState() {
State: indexpb.JobState_JobStateFailed,
}
s.meta.analyzeMeta.AddAnalyzeTask(t)
s.False(task.Process())
s.True(task.Process())
s.Equal(datapb.CompactionTaskState_failed, task.GetState())
})

Expand All @@ -573,7 +573,7 @@ func (s *ClusteringCompactionTaskSuite) TestProcessAnalyzingState() {
CentroidsFile: "",
}
s.meta.analyzeMeta.AddAnalyzeTask(t)
s.False(task.Process())
s.True(task.Process())
s.Equal(datapb.CompactionTaskState_failed, task.GetState())
})

Expand Down
4 changes: 4 additions & 0 deletions internal/datacoord/compaction_task_l0.go
Original file line number Diff line number Diff line change
Expand Up @@ -422,3 +422,7 @@ func (t *l0CompactionTask) saveSegmentMeta() error {

return t.meta.UpdateSegmentsInfo(operators...)
}

func (t *l0CompactionTask) Clean() bool {
return true
}
4 changes: 4 additions & 0 deletions internal/datacoord/compaction_task_mix.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,3 +374,7 @@ func (t *mixCompactionTask) BuildCompactionRequest() (*datapb.CompactionPlan, er
log.Info("Compaction handler refreshed mix compaction plan", zap.Any("segID2DeltaLogs", segIDMap))
return plan, nil
}

func (t *mixCompactionTask) Clean() bool {
return true
}
Loading

0 comments on commit e645c88

Please sign in to comment.