Skip to content

Commit

Permalink
enhance: clean failed/timeout compaction task in compactionHandler
Browse files Browse the repository at this point in the history
Signed-off-by: wayblink <[email protected]>
  • Loading branch information
wayblink committed Oct 24, 2024
1 parent 39a91eb commit c01eeb2
Show file tree
Hide file tree
Showing 13 changed files with 477 additions and 91 deletions.
39 changes: 39 additions & 0 deletions internal/datacoord/compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ type compactionPlanHandler struct {
executingGuard lock.RWMutex
executingTasks map[int64]CompactionTask // planID -> task

cleaningGuard lock.RWMutex
cleaningTasks map[int64]CompactionTask // planID -> task

meta CompactionMeta
allocator allocator.Allocator
chManager ChannelManager
Expand Down Expand Up @@ -189,6 +192,7 @@ func newCompactionPlanHandler(cluster Cluster, sessions session.DataNodeManager,
stopCh: make(chan struct{}),
cluster: cluster,
executingTasks: make(map[int64]CompactionTask),
cleaningTasks: make(map[int64]CompactionTask),
analyzeScheduler: analyzeScheduler,
handler: handler,
}
Expand Down Expand Up @@ -368,6 +372,7 @@ func (c *compactionPlanHandler) loopCheck() {
if err != nil {
log.Info("fail to update compaction", zap.Error(err))
}
c.cleanFailedTasks()
}
}
}
Expand Down Expand Up @@ -614,6 +619,11 @@ func (c *compactionPlanHandler) assignNodeIDs(tasks []CompactionTask) {
}
}

// checkCompaction retrieves executing tasks and calls each task's Process() method
// to evaluate its state and progress through the state machine.
// Completed tasks are removed from executingTasks.
// Tasks that fail or timeout are moved from executingTasks to cleaningTasks,
// where task-specific clean logic is performed asynchronously.
func (c *compactionPlanHandler) checkCompaction() error {
// Get executing executingTasks before GetCompactionState from DataNode to prevent false failure,
// for DC might add new task while GetCompactionState.
Expand Down Expand Up @@ -649,9 +659,38 @@ func (c *compactionPlanHandler) checkCompaction() error {
metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", t.GetTaskProto().GetNodeID()), t.GetTaskProto().GetType().String(), metrics.Done).Inc()
}
c.executingGuard.Unlock()

// insert task need to clean
c.cleaningGuard.Lock()
for _, t := range finishedTasks {
if t.GetTaskProto().GetState() == datapb.CompactionTaskState_failed || t.GetTaskProto().GetState() == datapb.CompactionTaskState_timeout {
c.cleaningTasks[t.GetTaskProto().GetPlanID()] = t
}
}
c.cleaningGuard.Unlock()

return nil
}

// cleanFailedTasks performs task define Clean logic
// while compactionPlanHandler.Clean is to do garbage collection for cleaned tasks
func (c *compactionPlanHandler) cleanFailedTasks() {
c.cleaningGuard.RLock()
cleanedTasks := make([]CompactionTask, 0)
for _, t := range c.cleaningTasks {
clean := t.Clean()
if clean {
cleanedTasks = append(cleanedTasks, t)
}
}
c.cleaningGuard.RUnlock()
c.cleaningGuard.Lock()
for _, t := range cleanedTasks {
delete(c.cleaningTasks, t.GetTaskProto().GetPlanID())
}
c.cleaningGuard.Unlock()
}

func (c *compactionPlanHandler) pickAnyNode(nodeSlots map[int64]int64, task CompactionTask) (nodeID int64, useSlot int64) {
nodeID = NullNodeID
var maxSlots int64 = -1
Expand Down
10 changes: 10 additions & 0 deletions internal/datacoord/compaction_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,17 @@ import (
)

type CompactionTask interface {
// Process performs the task's state machine
//
// Returns:
// - <bool>: whether the task state machine ends.
//
// Notes:
//
// `end` doesn't mean the task completed, its state may be completed or failed or timeout.
Process() bool
// Clean performs clean logic for a fail/timeout task
Clean() bool
BuildCompactionRequest() (*datapb.CompactionPlan, error)
GetSlotUsage() int64
GetLabel() string
Expand Down
63 changes: 45 additions & 18 deletions internal/datacoord/compaction_task_clustering.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,15 +125,22 @@ func (t *clusteringCompactionTask) Process() bool {
if err != nil {
log.Warn("Failed to updateAndSaveTaskMeta", zap.Error(err))
}
log.Info("clustering compaction task state changed", zap.String("lastState", lastState), zap.String("currentState", currentState), zap.Int64("elapse seconds", lastStateDuration))
}
log.Debug("process clustering task", zap.String("lastState", lastState), zap.String("currentState", currentState))
return t.GetTaskProto().State == datapb.CompactionTaskState_completed || t.GetTaskProto().State == datapb.CompactionTaskState_cleaned
return t.GetTaskProto().State == datapb.CompactionTaskState_completed ||
t.GetTaskProto().State == datapb.CompactionTaskState_cleaned ||
t.GetTaskProto().State == datapb.CompactionTaskState_failed ||
t.GetTaskProto().State == datapb.CompactionTaskState_timeout
}

// retryableProcess process task's state transfer, return error if not work as expected
// the outer Process will set state and retry times according to the error type(retryable or not-retryable)
func (t *clusteringCompactionTask) retryableProcess() error {
if t.GetTaskProto().State == datapb.CompactionTaskState_completed || t.GetTaskProto().State == datapb.CompactionTaskState_cleaned {
if t.GetTaskProto().State == datapb.CompactionTaskState_completed ||
t.GetTaskProto().State == datapb.CompactionTaskState_cleaned ||
t.GetTaskProto().State == datapb.CompactionTaskState_failed ||
t.GetTaskProto().State == datapb.CompactionTaskState_timeout {
return nil
}

Expand Down Expand Up @@ -162,15 +169,25 @@ func (t *clusteringCompactionTask) retryableProcess() error {
return t.processIndexing()
case datapb.CompactionTaskState_statistic:
return t.processStats()

case datapb.CompactionTaskState_timeout:
return t.processFailedOrTimeout()
case datapb.CompactionTaskState_failed:
return t.processFailedOrTimeout()
case datapb.CompactionTaskState_timeout:
return t.processFailedOrTimeout()
}
return nil
}

func (t *clusteringCompactionTask) Clean() bool {
log := log.With(zap.Int64("planID", t.GetTaskProto().GetPlanID()), zap.String("type", t.GetTaskProto().GetType().String()))
log.Info("clean task")
err := t.doClean()
if err != nil {
log.Warn("clean task fail", zap.Error(err))
return false
}
return true
}

func (t *clusteringCompactionTask) BuildCompactionRequest() (*datapb.CompactionPlan, error) {
beginLogID, _, err := t.allocator.AllocN(1)
if err != nil {
Expand Down Expand Up @@ -280,12 +297,7 @@ func (t *clusteringCompactionTask) processExecuting() error {
return t.processMetaSaved()
case datapb.CompactionTaskState_executing:
if t.checkTimeout() {
err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_timeout))
if err == nil {
return t.processFailedOrTimeout()
} else {
return err
}
return t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_timeout))
}
return nil
case datapb.CompactionTaskState_failed:
Expand Down Expand Up @@ -516,13 +528,19 @@ func (t *clusteringCompactionTask) resetSegmentCompacting() {
t.meta.SetSegmentsCompacting(t.GetTaskProto().GetInputSegments(), false)
}

// Backward compatibility
func (t *clusteringCompactionTask) processFailedOrTimeout() error {
log.Info("clean task", zap.Int64("triggerID", t.GetTaskProto().GetTriggerID()), zap.Int64("planID", t.GetTaskProto().GetPlanID()), zap.String("state", t.GetTaskProto().GetState().String()))
return nil
}

func (t *clusteringCompactionTask) doClean() error {
log := log.With(zap.Int64("planID", t.GetTaskProto().GetPlanID()))
log.Info("clean task", zap.Int64("triggerID", t.GetTaskProto().GetTriggerID()), zap.String("state", t.GetTaskProto().GetState().String()))

if err := t.sessions.DropCompactionPlan(t.GetTaskProto().GetNodeID(), &datapb.DropCompactionPlanRequest{
PlanID: t.GetTaskProto().GetPlanID(),
}); err != nil {
log.Warn("clusteringCompactionTask processFailedOrTimeout unable to drop compaction plan", zap.Int64("planID", t.GetTaskProto().GetPlanID()), zap.Error(err))
log.Warn("clusteringCompactionTask unable to drop compaction plan", zap.Error(err))
}
isInputDropped := false
for _, segID := range t.GetTaskProto().GetInputSegments() {
Expand All @@ -538,8 +556,8 @@ func (t *clusteringCompactionTask) processFailedOrTimeout() error {
// revert segments meta
var operators []UpdateOperator
// revert level of input segments
// L1 : L1 ->(processPipelining)-> L2 ->(processFailedOrTimeout)-> L1
// L2 : L2 ->(processPipelining)-> L2 ->(processFailedOrTimeout)-> L2
// L1 : L1 ->(process)-> L2 ->(clean)-> L1
// L2 : L2 ->(process)-> L2 ->(clean)-> L2
for _, segID := range t.GetTaskProto().GetInputSegments() {
operators = append(operators, RevertSegmentLevelOperator(segID))
}
Expand Down Expand Up @@ -577,8 +595,6 @@ func (t *clusteringCompactionTask) processFailedOrTimeout() error {
}
}

t.resetSegmentCompacting()

// drop partition stats if uploaded
partitionStatsInfo := &datapb.PartitionStatsInfo{
CollectionID: t.GetTaskProto().GetCollectionID(),
Expand All @@ -590,9 +606,20 @@ func (t *clusteringCompactionTask) processFailedOrTimeout() error {
err := t.meta.CleanPartitionStatsInfo(partitionStatsInfo)
if err != nil {
log.Warn("gcPartitionStatsInfo fail", zap.Error(err))
return merr.WrapErrCleanPartitionStatsFail(fmt.Sprintf("%d-%d-%s-%d", t.GetTaskProto().GetCollectionID(), t.GetTaskProto().GetPartitionID(), t.GetTaskProto().GetChannel(), t.GetTaskProto().GetPlanID()))
}

return t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_cleaned))
err = t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_cleaned))
if err != nil {
log.Warn("clusteringCompactionTask fail to updateAndSaveTaskMeta", zap.Error(err))
return err
}

// resetSegmentCompacting must be the last step of Clean, to make sure resetSegmentCompacting only called once
// otherwise, it may unlock segments locked by other compaction tasks
t.resetSegmentCompacting()
log.Info("clusteringCompactionTask clean done")
return nil
}

func (t *clusteringCompactionTask) doAnalyze() error {
Expand Down
48 changes: 39 additions & 9 deletions internal/datacoord/compaction_task_clustering_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,14 +109,43 @@ func (s *ClusteringCompactionTaskSuite) TestClusteringCompactionSegmentMetaChang
s.mockSessionMgr.EXPECT().Compaction(mock.Anything, mock.Anything, mock.Anything).Return(nil)
task := s.generateBasicTask(false)

task.processPipelining()
err := task.processPipelining()
s.NoError(err)

seg11 := s.meta.GetSegment(101)
s.Equal(datapb.SegmentLevel_L1, seg11.Level)
seg21 := s.meta.GetSegment(102)
s.Equal(datapb.SegmentLevel_L2, seg21.Level)
s.Equal(int64(10000), seg21.PartitionStatsVersion)

task.updateAndSaveTaskMeta(setResultSegments([]int64{103, 104}))
// fake some compaction result segment
s.meta.AddSegment(context.TODO(), &SegmentInfo{
SegmentInfo: &datapb.SegmentInfo{
ID: 103,
State: commonpb.SegmentState_Flushed,
Level: datapb.SegmentLevel_L2,
LastLevel: datapb.SegmentLevel_L1,
CreatedByCompaction: true,
PartitionStatsVersion: 10001,
},
})
s.meta.AddSegment(context.TODO(), &SegmentInfo{
SegmentInfo: &datapb.SegmentInfo{
ID: 104,
State: commonpb.SegmentState_Flushed,
Level: datapb.SegmentLevel_L2,
LastLevel: datapb.SegmentLevel_L1,
CreatedByCompaction: true,
PartitionStatsVersion: 10001,
},
})

s.mockSessionMgr.EXPECT().DropCompactionPlan(mock.Anything, mock.Anything).Return(nil)

err = task.doClean()
s.NoError(err)

s.Run("v2.4.x", func() {
// fake some compaction result segment
s.meta.AddSegment(context.TODO(), &SegmentInfo{
Expand Down Expand Up @@ -164,6 +193,7 @@ func (s *ClusteringCompactionTaskSuite) TestClusteringCompactionSegmentMetaChang
task.GetTaskProto().ResultSegments = []int64{103, 104}

task.processFailedOrTimeout()
task.Clean()

seg12 := s.meta.GetSegment(101)
s.Equal(datapb.SegmentLevel_L1, seg12.Level)
Expand Down Expand Up @@ -253,6 +283,7 @@ func (s *ClusteringCompactionTaskSuite) TestClusteringCompactionSegmentMetaChang
task.GetTaskProto().ResultSegments = []int64{105, 106}

task.processFailedOrTimeout()
task.Clean()

seg12 := s.meta.GetSegment(101)
s.Equal(datapb.SegmentLevel_L1, seg12.Level)
Expand Down Expand Up @@ -336,7 +367,7 @@ func (s *ClusteringCompactionTaskSuite) TestProcessRetryLogic() {
s.Equal(false, task.Process())
s.Equal(int32(3), task.GetTaskProto().RetryTimes)
s.Equal(datapb.CompactionTaskState_pipelining, task.GetTaskProto().GetState())
s.Equal(false, task.Process())
s.True(task.Process())
s.Equal(int32(0), task.GetTaskProto().RetryTimes)
s.Equal(datapb.CompactionTaskState_failed, task.GetTaskProto().GetState())
}
Expand All @@ -345,7 +376,7 @@ func (s *ClusteringCompactionTaskSuite) TestProcessPipelining() {
s.Run("process pipelining fail, segment not found", func() {
task := s.generateBasicTask(false)
task.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining))
s.Equal(false, task.Process())
s.True(task.Process())
s.Equal(datapb.CompactionTaskState_failed, task.GetTaskProto().GetState())
})

Expand Down Expand Up @@ -570,11 +601,10 @@ func (s *ClusteringCompactionTaskSuite) TestProcessExecuting() {
},
},
}, nil).Once()
s.mockSessionMgr.EXPECT().DropCompactionPlan(mock.Anything, mock.Anything).Return(nil).Once()

time.Sleep(time.Second * 1)
s.Equal(true, task.Process())
s.Equal(datapb.CompactionTaskState_cleaned, task.GetTaskProto().GetState())
s.True(task.Process())
s.Equal(datapb.CompactionTaskState_timeout, task.GetTaskProto().GetState())
})
}

Expand Down Expand Up @@ -675,7 +705,7 @@ func (s *ClusteringCompactionTaskSuite) TestProcessAnalyzingState() {
s.Run("analyze task not found", func() {
task := s.generateBasicTask(false)
task.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_analyzing))
s.False(task.Process())
s.True(task.Process())
s.Equal(datapb.CompactionTaskState_failed, task.GetTaskProto().GetState())
})

Expand All @@ -691,7 +721,7 @@ func (s *ClusteringCompactionTaskSuite) TestProcessAnalyzingState() {
State: indexpb.JobState_JobStateFailed,
}
s.meta.analyzeMeta.AddAnalyzeTask(t)
s.False(task.Process())
s.True(task.Process())
s.Equal(datapb.CompactionTaskState_failed, task.GetTaskProto().GetState())
})

Expand All @@ -708,7 +738,7 @@ func (s *ClusteringCompactionTaskSuite) TestProcessAnalyzingState() {
CentroidsFile: "",
}
s.meta.analyzeMeta.AddAnalyzeTask(t)
s.False(task.Process())
s.True(task.Process())
s.Equal(datapb.CompactionTaskState_failed, task.GetTaskProto().GetState())
})

Expand Down
Loading

0 comments on commit c01eeb2

Please sign in to comment.