Skip to content

Commit

Permalink
enhance: clean failed/timeout compaction task in compactionHandler
Browse files Browse the repository at this point in the history
Signed-off-by: wayblink <[email protected]>
  • Loading branch information
wayblink committed Oct 23, 2024
1 parent 8902e22 commit a83b2b6
Show file tree
Hide file tree
Showing 14 changed files with 514 additions and 126 deletions.
39 changes: 39 additions & 0 deletions internal/datacoord/compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ type compactionPlanHandler struct {
executingGuard lock.RWMutex
executingTasks map[int64]CompactionTask // planID -> task

cleaningGuard lock.RWMutex
cleaningTasks map[int64]CompactionTask // planID -> task

meta CompactionMeta
allocator allocator.Allocator
chManager ChannelManager
Expand Down Expand Up @@ -189,6 +192,7 @@ func newCompactionPlanHandler(cluster Cluster, sessions session.DataNodeManager,
stopCh: make(chan struct{}),
cluster: cluster,
executingTasks: make(map[int64]CompactionTask),
cleaningTasks: make(map[int64]CompactionTask),
analyzeScheduler: analyzeScheduler,
handler: handler,
}
Expand Down Expand Up @@ -368,6 +372,7 @@ func (c *compactionPlanHandler) loopCheck() {
if err != nil {
log.Info("fail to update compaction", zap.Error(err))
}
c.cleanFailedTasks()
}
}
}
Expand Down Expand Up @@ -614,6 +619,11 @@ func (c *compactionPlanHandler) assignNodeIDs(tasks []CompactionTask) {
}
}

// checkCompaction retrieves executing tasks and calls each task's Process() method
// to evaluate its state and progress through the state machine.
// Completed tasks are removed from executingTasks.
// Tasks that fail or timeout are moved from executingTasks to cleaningTasks,
// where task-specific clean logic is performed asynchronously.
func (c *compactionPlanHandler) checkCompaction() error {
// Get executing executingTasks before GetCompactionState from DataNode to prevent false failure,
// for DC might add new task while GetCompactionState.
Expand Down Expand Up @@ -649,9 +659,38 @@ func (c *compactionPlanHandler) checkCompaction() error {
metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", t.GetNodeID()), t.GetType().String(), metrics.Done).Inc()
}
c.executingGuard.Unlock()

// insert task need to clean
c.cleaningGuard.Lock()
for _, t := range finishedTasks {
if t.GetState() == datapb.CompactionTaskState_failed || t.GetState() == datapb.CompactionTaskState_timeout {
c.cleaningTasks[t.GetPlanID()] = t
}
}
c.cleaningGuard.Unlock()

return nil
}

// cleanFailedTasks performs task define Clean logic
// while compactionPlanHandler.Clean is to do garbage collection for cleaned tasks
func (c *compactionPlanHandler) cleanFailedTasks() {
c.cleaningGuard.RLock()
cleanedTasks := make([]CompactionTask, 0)
for _, t := range c.cleaningTasks {
clean := t.Clean()
if clean {
cleanedTasks = append(cleanedTasks, t)
}
}
c.cleaningGuard.RUnlock()
c.cleaningGuard.Lock()
for _, t := range cleanedTasks {
delete(c.cleaningTasks, t.GetPlanID())
}
c.cleaningGuard.Unlock()
}

func (c *compactionPlanHandler) pickAnyNode(nodeSlots map[int64]int64, task CompactionTask) (nodeID int64, useSlot int64) {
nodeID = NullNodeID
var maxSlots int64 = -1
Expand Down
10 changes: 10 additions & 0 deletions internal/datacoord/compaction_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,17 @@ import (
)

type CompactionTask interface {
// Process performs the task's state machine
//
// Returns:
// - <bool>: whether the task state machine ends.
//
// Notes:
//
// `end` doesn't mean the task completed, its state may be completed or failed or timeout.
Process() bool
// Clean performs clean logic for a fail/timeout task
Clean() bool
BuildCompactionRequest() (*datapb.CompactionPlan, error)
GetSlotUsage() int64

Expand Down
74 changes: 53 additions & 21 deletions internal/datacoord/compaction_task_clustering.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@ func (t *clusteringCompactionTask) Process() bool {
if currentState != lastState {
ts := time.Now().Unix()
lastStateDuration := ts - t.GetLastStateStartTime()
log.Info("clustering compaction task state changed", zap.String("lastState", lastState), zap.String("currentState", currentState), zap.Int64("elapse seconds", lastStateDuration))
metrics.DataCoordCompactionLatency.
WithLabelValues(fmt.Sprint(typeutil.IsVectorType(t.GetClusteringKeyField().DataType)), fmt.Sprint(t.CollectionID), t.Channel, datapb.CompactionType_ClusteringCompaction.String(), lastState).
Observe(float64(lastStateDuration * 1000))
Expand All @@ -115,15 +114,22 @@ func (t *clusteringCompactionTask) Process() bool {
if err != nil {
log.Warn("Failed to updateAndSaveTaskMeta", zap.Error(err))
}
log.Info("clustering compaction task state changed", zap.String("lastState", lastState), zap.String("currentState", currentState), zap.Int64("elapse seconds", lastStateDuration))
}
log.Debug("process clustering task", zap.String("lastState", lastState), zap.String("currentState", currentState))
return t.State == datapb.CompactionTaskState_completed || t.State == datapb.CompactionTaskState_cleaned
return t.State == datapb.CompactionTaskState_completed ||
t.State == datapb.CompactionTaskState_cleaned ||
t.State == datapb.CompactionTaskState_failed ||
t.State == datapb.CompactionTaskState_timeout
}

// retryableProcess process task's state transfer, return error if not work as expected
// the outer Process will set state and retry times according to the error type(retryable or not-retryable)
func (t *clusteringCompactionTask) retryableProcess() error {
if t.State == datapb.CompactionTaskState_completed || t.State == datapb.CompactionTaskState_cleaned {
if t.State == datapb.CompactionTaskState_completed ||
t.State == datapb.CompactionTaskState_cleaned ||
t.State == datapb.CompactionTaskState_failed ||
t.State == datapb.CompactionTaskState_timeout {
return nil
}

Expand Down Expand Up @@ -152,15 +158,25 @@ func (t *clusteringCompactionTask) retryableProcess() error {
return t.processIndexing()
case datapb.CompactionTaskState_statistic:
return t.processStats()

case datapb.CompactionTaskState_timeout:
return t.processFailedOrTimeout()
case datapb.CompactionTaskState_failed:
return t.processFailedOrTimeout()
case datapb.CompactionTaskState_timeout:
return t.processFailedOrTimeout()
}
return nil
}

func (t *clusteringCompactionTask) Clean() bool {
log := log.With(zap.Int64("planID", t.GetPlanID()), zap.String("type", t.GetType().String()))
log.Info("clean task")
err := t.doClean()
if err != nil {
log.Warn("clean task fail", zap.Error(err))
return false
}
return true
}

func (t *clusteringCompactionTask) BuildCompactionRequest() (*datapb.CompactionPlan, error) {
beginLogID, _, err := t.allocator.AllocN(1)
if err != nil {
Expand Down Expand Up @@ -256,24 +272,25 @@ func (t *clusteringCompactionTask) processExecuting() error {
return segment.GetSegmentID()
})

// update the tmp segmentIDs first, revert the segments if CompleteCompactionMutation fails
err = t.updateAndSaveTaskMeta(setTmpSegments(resultSegmentIDs))
if err != nil {
return err
}

_, metricMutation, err := t.meta.CompleteCompactionMutation(t.CompactionTask, t.result)
if err != nil {
return err
}
metricMutation.commit()
err = t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_meta_saved), setTmpSegments(resultSegmentIDs))
err = t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_meta_saved), setCompactionCommited(true))

Check failure on line 286 in internal/datacoord/compaction_task_clustering.go

View workflow job for this annotation

GitHub Actions / Code Checker MacOS 12

undefined: setCompactionCommited
if err != nil {
return err
}
return t.processMetaSaved()
case datapb.CompactionTaskState_executing:
if t.checkTimeout() {
err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_timeout))
if err == nil {
return t.processFailedOrTimeout()
} else {
return err
}
return t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_timeout))
}
return nil
case datapb.CompactionTaskState_failed:
Expand Down Expand Up @@ -504,12 +521,19 @@ func (t *clusteringCompactionTask) resetSegmentCompacting() {
t.meta.SetSegmentsCompacting(t.GetInputSegments(), false)
}

// Backward compatibility
func (t *clusteringCompactionTask) processFailedOrTimeout() error {
log.Info("clean task", zap.Int64("triggerID", t.GetTriggerID()), zap.Int64("planID", t.GetPlanID()), zap.String("state", t.GetState().String()))
return nil
}

func (t *clusteringCompactionTask) doClean() error {
log := log.With(zap.Int64("planID", t.GetPlanID()))
log.Info("clean task", zap.Int64("triggerID", t.GetTriggerID()), zap.String("state", t.GetState().String()))

if err := t.sessions.DropCompactionPlan(t.GetNodeID(), &datapb.DropCompactionPlanRequest{
PlanID: t.GetPlanID(),
}); err != nil {
log.Warn("clusteringCompactionTask processFailedOrTimeout unable to drop compaction plan", zap.Int64("planID", t.GetPlanID()), zap.Error(err))
log.Warn("clusteringCompactionTask unable to drop compaction plan", zap.Error(err))
}
isInputDropped := false
for _, segID := range t.GetInputSegments() {
Expand All @@ -525,8 +549,8 @@ func (t *clusteringCompactionTask) processFailedOrTimeout() error {
// revert segments meta
var operators []UpdateOperator
// revert level of input segments
// L1 : L1 ->(processPipelining)-> L2 ->(processFailedOrTimeout)-> L1
// L2 : L2 ->(processPipelining)-> L2 ->(processFailedOrTimeout)-> L2
// L1 : L1 ->(process)-> L2 ->(clean)-> L1
// L2 : L2 ->(process)-> L2 ->(clean)-> L2
for _, segID := range t.GetInputSegments() {
operators = append(operators, RevertSegmentLevelOperator(segID))
}
Expand Down Expand Up @@ -557,16 +581,13 @@ func (t *clusteringCompactionTask) processFailedOrTimeout() error {
// tmpSegment is always invisible
operators = append(operators, UpdateStatusOperator(segID, commonpb.SegmentState_Dropped))
}

err := t.meta.UpdateSegmentsInfo(operators...)
if err != nil {
log.Warn("UpdateSegmentsInfo fail", zap.Error(err))
return merr.WrapErrClusteringCompactionMetaError("UpdateSegmentsInfo", err)
}
}

t.resetSegmentCompacting()

// drop partition stats if uploaded
partitionStatsInfo := &datapb.PartitionStatsInfo{
CollectionID: t.GetCollectionID(),
Expand All @@ -578,9 +599,20 @@ func (t *clusteringCompactionTask) processFailedOrTimeout() error {
err := t.meta.CleanPartitionStatsInfo(partitionStatsInfo)
if err != nil {
log.Warn("gcPartitionStatsInfo fail", zap.Error(err))
return merr.WrapErrCleanPartitionStatsFail(fmt.Sprintf("%d-%d-%s-%d", t.GetCollectionID(), t.GetPartitionID(), t.GetChannel(), t.GetPlanID()))
}

return t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_cleaned))
err = t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_cleaned))
if err != nil {
log.Warn("clusteringCompactionTask fail to updateAndSaveTaskMeta", zap.Error(err))
return err
}

// resetSegmentCompacting must be the last step of Clean, to make sure resetSegmentCompacting only called once
// otherwise, it may unlock segments locked by other compaction tasks
t.resetSegmentCompacting()
log.Info("clusteringCompactionTask clean done")
return nil
}

func (t *clusteringCompactionTask) doAnalyze() error {
Expand Down
42 changes: 34 additions & 8 deletions internal/datacoord/compaction_task_clustering_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,14 +109,41 @@ func (s *ClusteringCompactionTaskSuite) TestClusteringCompactionSegmentMetaChang
s.mockSessionMgr.EXPECT().Compaction(mock.Anything, mock.Anything, mock.Anything).Return(nil)
task := s.generateBasicTask(false)

task.processPipelining()
err := task.processPipelining()
s.NoError(err)

seg11 := s.meta.GetSegment(101)
s.Equal(datapb.SegmentLevel_L1, seg11.Level)
seg21 := s.meta.GetSegment(102)
s.Equal(datapb.SegmentLevel_L2, seg21.Level)
s.Equal(int64(10000), seg21.PartitionStatsVersion)

task.ResultSegments = []int64{103, 104}
// fake some compaction result segment
s.meta.AddSegment(context.TODO(), &SegmentInfo{
SegmentInfo: &datapb.SegmentInfo{
ID: 103,
State: commonpb.SegmentState_Flushed,
Level: datapb.SegmentLevel_L2,
LastLevel: datapb.SegmentLevel_L1,
CreatedByCompaction: true,
PartitionStatsVersion: 10001,
},
})
s.meta.AddSegment(context.TODO(), &SegmentInfo{
SegmentInfo: &datapb.SegmentInfo{
ID: 104,
State: commonpb.SegmentState_Flushed,
Level: datapb.SegmentLevel_L2,
LastLevel: datapb.SegmentLevel_L1,
CreatedByCompaction: true,
PartitionStatsVersion: 10001,
},
})

err = task.doClean()
s.NoError(err)

s.Run("v2.4.x", func() {
// fake some compaction result segment
s.meta.AddSegment(context.TODO(), &SegmentInfo{
Expand Down Expand Up @@ -336,7 +363,7 @@ func (s *ClusteringCompactionTaskSuite) TestProcessRetryLogic() {
s.Equal(false, task.Process())
s.Equal(int32(3), task.RetryTimes)
s.Equal(datapb.CompactionTaskState_pipelining, task.GetState())
s.Equal(false, task.Process())
s.Equal(true, task.Process())
s.Equal(int32(0), task.RetryTimes)
s.Equal(datapb.CompactionTaskState_failed, task.GetState())
}
Expand All @@ -345,7 +372,7 @@ func (s *ClusteringCompactionTaskSuite) TestProcessPipelining() {
s.Run("process pipelining fail, segment not found", func() {
task := s.generateBasicTask(false)
task.State = datapb.CompactionTaskState_pipelining
s.Equal(false, task.Process())
s.Equal(true, task.Process())
s.Equal(datapb.CompactionTaskState_failed, task.GetState())
})

Expand Down Expand Up @@ -572,11 +599,10 @@ func (s *ClusteringCompactionTaskSuite) TestProcessExecuting() {
},
},
}, nil).Once()
s.mockSessionMgr.EXPECT().DropCompactionPlan(mock.Anything, mock.Anything).Return(nil).Once()

time.Sleep(time.Second * 1)
s.Equal(true, task.Process())
s.Equal(datapb.CompactionTaskState_cleaned, task.GetState())
s.Equal(datapb.CompactionTaskState_timeout, task.GetState())
})
}

Expand Down Expand Up @@ -677,7 +703,7 @@ func (s *ClusteringCompactionTaskSuite) TestProcessAnalyzingState() {
s.Run("analyze task not found", func() {
task := s.generateBasicTask(false)
task.State = datapb.CompactionTaskState_analyzing
s.False(task.Process())
s.True(task.Process())
s.Equal(datapb.CompactionTaskState_failed, task.GetState())
})

Expand All @@ -694,7 +720,7 @@ func (s *ClusteringCompactionTaskSuite) TestProcessAnalyzingState() {
State: indexpb.JobState_JobStateFailed,
}
s.meta.analyzeMeta.AddAnalyzeTask(t)
s.False(task.Process())
s.True(task.Process())
s.Equal(datapb.CompactionTaskState_failed, task.GetState())
})

Expand All @@ -712,7 +738,7 @@ func (s *ClusteringCompactionTaskSuite) TestProcessAnalyzingState() {
CentroidsFile: "",
}
s.meta.analyzeMeta.AddAnalyzeTask(t)
s.False(task.Process())
s.True(task.Process())
s.Equal(datapb.CompactionTaskState_failed, task.GetState())
})

Expand Down
Loading

0 comments on commit a83b2b6

Please sign in to comment.