From a99ffed82408a046cf54f366f29c4c9e7abcafa4 Mon Sep 17 00:00:00 2001 From: wayblink Date: Thu, 25 Jul 2024 11:59:13 +0800 Subject: [PATCH] enhance compaction clean Signed-off-by: wayblink --- internal/datacoord/compaction.go | 27 +++++- internal/datacoord/compaction_task.go | 1 + .../datacoord/compaction_task_clustering.go | 38 +++++--- .../compaction_task_clustering_test.go | 2 +- internal/datacoord/compaction_task_l0.go | 4 + internal/datacoord/compaction_task_mix.go | 4 + internal/datacoord/compaction_test.go | 93 +++++++++++++++++++ 7 files changed, 152 insertions(+), 17 deletions(-) diff --git a/internal/datacoord/compaction.go b/internal/datacoord/compaction.go index c816fb9a3ca27..c71bf5f72eabe 100644 --- a/internal/datacoord/compaction.go +++ b/internal/datacoord/compaction.go @@ -80,6 +80,9 @@ type compactionPlanHandler struct { executingGuard lock.RWMutex executingTasks map[int64]CompactionTask // planID -> task + cleaningGuard lock.RWMutex + cleaningTasks map[int64]CompactionTask // planID -> task + meta CompactionMeta allocator allocator.Allocator chManager ChannelManager @@ -182,13 +185,14 @@ func newCompactionPlanHandler(cluster Cluster, sessions session.DataNodeManager, ) *compactionPlanHandler { return &compactionPlanHandler{ queueTasks: make(map[int64]CompactionTask), + executingTasks: make(map[int64]CompactionTask), + cleaningTasks: make(map[int64]CompactionTask), chManager: cm, meta: meta, sessions: sessions, allocator: allocator, stopCh: make(chan struct{}), cluster: cluster, - executingTasks: make(map[int64]CompactionTask), taskNumber: atomic.NewInt32(0), analyzeScheduler: analyzeScheduler, handler: handler, @@ -395,6 +399,18 @@ func (c *compactionPlanHandler) loopClean() { } func (c *compactionPlanHandler) Clean() { + c.cleaningGuard.RLock() + cleanedTasks := make([]CompactionTask, 0) + for _, t := range c.cleaningTasks { + clean := t.Clean() + if clean { + cleanedTasks = append(cleanedTasks, t) + } + } + for _, t := range cleanedTasks { + delete(c.cleaningTasks, t.GetPlanID()) + } + c.cleaningGuard.RUnlock() c.cleanCompactionTaskMeta() c.cleanPartitionStats() } @@ -672,6 +688,15 @@ func (c *compactionPlanHandler) checkCompaction() error { } c.executingGuard.Unlock() c.taskNumber.Sub(int32(len(finishedTasks))) + + // insert task need to clean + c.cleaningGuard.Lock() + for _, t := range finishedTasks { + if t.GetState() == datapb.CompactionTaskState_failed || t.GetState() == datapb.CompactionTaskState_failed { + c.cleaningTasks[t.GetPlanID()] = t + } + } + c.cleaningGuard.Unlock() return nil } diff --git a/internal/datacoord/compaction_task.go b/internal/datacoord/compaction_task.go index 6cfdcb9af8274..c01aef6bce376 100644 --- a/internal/datacoord/compaction_task.go +++ b/internal/datacoord/compaction_task.go @@ -26,6 +26,7 @@ import ( type CompactionTask interface { Process() bool BuildCompactionRequest() (*datapb.CompactionPlan, error) + Clean() bool GetTriggerID() UniqueID GetPlanID() UniqueID diff --git a/internal/datacoord/compaction_task_clustering.go b/internal/datacoord/compaction_task_clustering.go index 2a0dee9f89a39..4addfd3a6aa87 100644 --- a/internal/datacoord/compaction_task_clustering.go +++ b/internal/datacoord/compaction_task_clustering.go @@ -111,13 +111,19 @@ func (t *clusteringCompactionTask) Process() bool { } } log.Debug("process clustering task", zap.String("lastState", lastState), zap.String("currentState", currentState)) - return t.State == datapb.CompactionTaskState_completed || t.State == datapb.CompactionTaskState_cleaned + return t.State == datapb.CompactionTaskState_completed || + t.State == datapb.CompactionTaskState_cleaned || + t.State == datapb.CompactionTaskState_failed || + t.State == datapb.CompactionTaskState_timeout } // retryableProcess process task's state transfer, return error if not work as expected // the outer Process will set state and retry times according to the error type(retryable or not-retryable) func (t *clusteringCompactionTask) retryableProcess() error { - if t.State == datapb.CompactionTaskState_completed || t.State == datapb.CompactionTaskState_cleaned { + if t.State == datapb.CompactionTaskState_completed || + t.State == datapb.CompactionTaskState_cleaned || + t.State == datapb.CompactionTaskState_failed || + t.State == datapb.CompactionTaskState_timeout { return nil } @@ -144,14 +150,21 @@ func (t *clusteringCompactionTask) retryableProcess() error { return t.processMetaSaved() case datapb.CompactionTaskState_indexing: return t.processIndexing() - case datapb.CompactionTaskState_timeout: - return t.processFailedOrTimeout() - case datapb.CompactionTaskState_failed: - return t.processFailedOrTimeout() } return nil } +func (t *clusteringCompactionTask) Clean() bool { + log := log.With(zap.Int64("planID", t.GetPlanID()), zap.String("type", t.GetType().String())) + log.Info("clean task") + err := t.doClean() + if err != nil { + log.Warn("clean task fail", zap.Error(err)) + return false + } + return true +} + func (t *clusteringCompactionTask) BuildCompactionRequest() (*datapb.CompactionPlan, error) { beginLogID, _, err := t.allocator.AllocN(1) if err != nil { @@ -269,12 +282,7 @@ func (t *clusteringCompactionTask) processExecuting() error { return t.processMetaSaved() case datapb.CompactionTaskState_executing: if t.checkTimeout() { - err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_timeout)) - if err == nil { - return t.processFailedOrTimeout() - } else { - return err - } + return t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_timeout)) } return nil case datapb.CompactionTaskState_failed: @@ -374,13 +382,13 @@ func (t *clusteringCompactionTask) resetSegmentCompacting() { t.meta.SetSegmentsCompacting(t.GetInputSegments(), false) } -func (t *clusteringCompactionTask) processFailedOrTimeout() error { +func (t *clusteringCompactionTask) doClean() error { log.Info("clean task", zap.Int64("triggerID", t.GetTriggerID()), zap.Int64("planID", t.GetPlanID()), zap.String("state", t.GetState().String())) // revert segments meta var operators []UpdateOperator // revert level of input segments - // L1 : L1 ->(processPipelining)-> L2 ->(processFailedOrTimeout)-> L1 - // L2 : L2 ->(processPipelining)-> L2 ->(processFailedOrTimeout)-> L2 + // L1 : L1 ->(processPipelining)-> L2 ->(doClean)-> L1 + // L2 : L2 ->(processPipelining)-> L2 ->(doClean)-> L2 for _, segID := range t.InputSegments { operators = append(operators, RevertSegmentLevelOperator(segID)) } diff --git a/internal/datacoord/compaction_task_clustering_test.go b/internal/datacoord/compaction_task_clustering_test.go index 0da0b5f074b17..209ea73fd0dc5 100644 --- a/internal/datacoord/compaction_task_clustering_test.go +++ b/internal/datacoord/compaction_task_clustering_test.go @@ -136,7 +136,7 @@ func (s *ClusteringCompactionTaskSuite) TestClusteringCompactionSegmentMetaChang }, }) - task.processFailedOrTimeout() + task.doClean() seg12 := s.meta.GetSegment(101) s.Equal(datapb.SegmentLevel_L1, seg12.Level) diff --git a/internal/datacoord/compaction_task_l0.go b/internal/datacoord/compaction_task_l0.go index e45a502fc8e81..952cea7280db4 100644 --- a/internal/datacoord/compaction_task_l0.go +++ b/internal/datacoord/compaction_task_l0.go @@ -422,3 +422,7 @@ func (t *l0CompactionTask) saveSegmentMeta() error { return t.meta.UpdateSegmentsInfo(operators...) } + +func (t *l0CompactionTask) Clean() bool { + return true +} diff --git a/internal/datacoord/compaction_task_mix.go b/internal/datacoord/compaction_task_mix.go index 3d73845dfbc76..f6cd14859ae66 100644 --- a/internal/datacoord/compaction_task_mix.go +++ b/internal/datacoord/compaction_task_mix.go @@ -374,3 +374,7 @@ func (t *mixCompactionTask) BuildCompactionRequest() (*datapb.CompactionPlan, er log.Info("Compaction handler refreshed mix compaction plan", zap.Any("segID2DeltaLogs", segIDMap)) return plan, nil } + +func (t *mixCompactionTask) Clean() bool { + return true +} diff --git a/internal/datacoord/compaction_test.go b/internal/datacoord/compaction_test.go index 84c0b61c9d443..283c0c693a1c8 100644 --- a/internal/datacoord/compaction_test.go +++ b/internal/datacoord/compaction_test.go @@ -886,6 +886,99 @@ func (s *CompactionPlanHandlerSuite) TestProcessCompleteCompaction() { s.Equal(0, len(s.handler.getTasksByState(datapb.CompactionTaskState_completed))) } +func (s *CompactionPlanHandlerSuite) TestCleanCompaction() { + s.SetupTest() + + tests := []struct { + task CompactionTask + }{ + { + &mixCompactionTask{ + CompactionTask: &datapb.CompactionTask{ + PlanID: 1, + TriggerID: 1, + Type: datapb.CompactionType_MixCompaction, + State: datapb.CompactionTaskState_failed, + NodeID: 1, + InputSegments: []UniqueID{1, 2}, + }, + sessions: s.mockSessMgr, + meta: s.mockMeta, + }, + }, + { + &l0CompactionTask{ + CompactionTask: &datapb.CompactionTask{ + PlanID: 1, + TriggerID: 1, + Type: datapb.CompactionType_Level0DeleteCompaction, + State: datapb.CompactionTaskState_failed, + NodeID: 1, + InputSegments: []UniqueID{1, 2}, + }, + sessions: s.mockSessMgr, + meta: s.mockMeta, + }, + }, + } + for _, test := range tests { + task := test.task + s.mockMeta.EXPECT().SetSegmentsCompacting(mock.Anything, mock.Anything).Return().Once() + s.mockSessMgr.EXPECT().DropCompactionPlan(mock.Anything, mock.Anything).Return(nil) + s.mockMeta.EXPECT().GetCompactionTasks().Return(nil) + s.mockMeta.EXPECT().GetPartitionStatsMeta().Return(nil) + + s.handler.executingTasks[1] = task + s.Equal(1, len(s.handler.executingTasks)) + s.handler.checkCompaction() + s.Equal(0, len(s.handler.executingTasks)) + s.Equal(1, len(s.handler.cleaningTasks)) + s.handler.Clean() + s.Equal(0, len(s.handler.cleaningTasks)) + } +} + +func (s *CompactionPlanHandlerSuite) TestCleanClusteringCompaction() { + s.SetupTest() + + tests := []struct { + task CompactionTask + }{ + { + &clusteringCompactionTask{ + CompactionTask: &datapb.CompactionTask{ + PlanID: 1, + TriggerID: 1, + Type: datapb.CompactionType_ClusteringCompaction, + State: datapb.CompactionTaskState_failed, + NodeID: 1, + InputSegments: []UniqueID{1, 2}, + }, + sessions: s.mockSessMgr, + meta: s.mockMeta, + }, + }, + } + for _, test := range tests { + task := test.task + s.mockMeta.EXPECT().SetSegmentsCompacting(mock.Anything, mock.Anything).Return().Once() + //s.mockSessMgr.EXPECT().DropCompactionPlan(mock.Anything, mock.Anything).Return(nil) + s.mockMeta.EXPECT().GetCompactionTasks().Return(nil) + s.mockMeta.EXPECT().GetPartitionStatsMeta().Return(nil) + s.mockMeta.EXPECT().UpdateSegmentsInfo(mock.Anything, mock.Anything).Return(nil) + s.mockMeta.EXPECT().CleanPartitionStatsInfo(mock.Anything).Return(nil) + s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil) + + s.handler.executingTasks[1] = task + s.Equal(1, len(s.handler.executingTasks)) + s.handler.checkCompaction() + s.Equal(0, len(s.handler.executingTasks)) + s.Equal(1, len(s.handler.cleaningTasks)) + s.handler.Clean() + s.Equal(0, len(s.handler.cleaningTasks)) + } +} + func getFieldBinlogIDs(fieldID int64, logIDs ...int64) *datapb.FieldBinlog { l := &datapb.FieldBinlog{ FieldID: fieldID,