From a83b2b6e3c26aa5e8757eb6ad726eda48ea96a02 Mon Sep 17 00:00:00 2001 From: wayblink Date: Wed, 23 Oct 2024 17:32:29 +0800 Subject: [PATCH] enhance: clean failed/timeout compaction task in compactionHandler Signed-off-by: wayblink --- internal/datacoord/compaction.go | 39 ++++ internal/datacoord/compaction_task.go | 10 + .../datacoord/compaction_task_clustering.go | 74 +++++-- .../compaction_task_clustering_test.go | 42 +++- internal/datacoord/compaction_task_l0.go | 26 ++- internal/datacoord/compaction_task_l0_test.go | 40 +--- internal/datacoord/compaction_task_mix.go | 76 ++++--- .../datacoord/compaction_task_mix_test.go | 24 ++- internal/datacoord/compaction_task_test.go | 25 --- internal/datacoord/compaction_test.go | 188 ++++++++++++++++++ internal/datacoord/partition_stats_meta.go | 28 +++ .../datacoord/partition_stats_meta_test.go | 59 ++++++ pkg/util/merr/errors.go | 1 + pkg/util/merr/utils.go | 8 + 14 files changed, 514 insertions(+), 126 deletions(-) delete mode 100644 internal/datacoord/compaction_task_test.go diff --git a/internal/datacoord/compaction.go b/internal/datacoord/compaction.go index f3ed92ae6632f..6777ecda066d2 100644 --- a/internal/datacoord/compaction.go +++ b/internal/datacoord/compaction.go @@ -81,6 +81,9 @@ type compactionPlanHandler struct { executingGuard lock.RWMutex executingTasks map[int64]CompactionTask // planID -> task + cleaningGuard lock.RWMutex + cleaningTasks map[int64]CompactionTask // planID -> task + meta CompactionMeta allocator allocator.Allocator chManager ChannelManager @@ -189,6 +192,7 @@ func newCompactionPlanHandler(cluster Cluster, sessions session.DataNodeManager, stopCh: make(chan struct{}), cluster: cluster, executingTasks: make(map[int64]CompactionTask), + cleaningTasks: make(map[int64]CompactionTask), analyzeScheduler: analyzeScheduler, handler: handler, } @@ -368,6 +372,7 @@ func (c *compactionPlanHandler) loopCheck() { if err != nil { log.Info("fail to update compaction", zap.Error(err)) } + c.cleanFailedTasks() } } } @@ -614,6 +619,11 @@ func (c *compactionPlanHandler) assignNodeIDs(tasks []CompactionTask) { } } +// checkCompaction retrieves executing tasks and calls each task's Process() method +// to evaluate its state and progress through the state machine. +// Completed tasks are removed from executingTasks. +// Tasks that fail or timeout are moved from executingTasks to cleaningTasks, +// where task-specific clean logic is performed asynchronously. func (c *compactionPlanHandler) checkCompaction() error { // Get executing executingTasks before GetCompactionState from DataNode to prevent false failure, // for DC might add new task while GetCompactionState. @@ -649,9 +659,38 @@ func (c *compactionPlanHandler) checkCompaction() error { metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", t.GetNodeID()), t.GetType().String(), metrics.Done).Inc() } c.executingGuard.Unlock() + + // insert task need to clean + c.cleaningGuard.Lock() + for _, t := range finishedTasks { + if t.GetState() == datapb.CompactionTaskState_failed || t.GetState() == datapb.CompactionTaskState_timeout { + c.cleaningTasks[t.GetPlanID()] = t + } + } + c.cleaningGuard.Unlock() + return nil } +// cleanFailedTasks performs task define Clean logic +// while compactionPlanHandler.Clean is to do garbage collection for cleaned tasks +func (c *compactionPlanHandler) cleanFailedTasks() { + c.cleaningGuard.RLock() + cleanedTasks := make([]CompactionTask, 0) + for _, t := range c.cleaningTasks { + clean := t.Clean() + if clean { + cleanedTasks = append(cleanedTasks, t) + } + } + c.cleaningGuard.RUnlock() + c.cleaningGuard.Lock() + for _, t := range cleanedTasks { + delete(c.cleaningTasks, t.GetPlanID()) + } + c.cleaningGuard.Unlock() +} + func (c *compactionPlanHandler) pickAnyNode(nodeSlots map[int64]int64, task CompactionTask) (nodeID int64, useSlot int64) { nodeID = NullNodeID var maxSlots int64 = -1 diff --git a/internal/datacoord/compaction_task.go b/internal/datacoord/compaction_task.go index 2edeeaf7004bb..035f12851750e 100644 --- a/internal/datacoord/compaction_task.go +++ b/internal/datacoord/compaction_task.go @@ -24,7 +24,17 @@ import ( ) type CompactionTask interface { + // Process performs the task's state machine + // + // Returns: + // - : whether the task state machine ends. + // + // Notes: + // + // `end` doesn't mean the task completed, its state may be completed or failed or timeout. Process() bool + // Clean performs clean logic for a fail/timeout task + Clean() bool BuildCompactionRequest() (*datapb.CompactionPlan, error) GetSlotUsage() int64 diff --git a/internal/datacoord/compaction_task_clustering.go b/internal/datacoord/compaction_task_clustering.go index 081d46ae8f53a..9d73873be8fbf 100644 --- a/internal/datacoord/compaction_task_clustering.go +++ b/internal/datacoord/compaction_task_clustering.go @@ -97,7 +97,6 @@ func (t *clusteringCompactionTask) Process() bool { if currentState != lastState { ts := time.Now().Unix() lastStateDuration := ts - t.GetLastStateStartTime() - log.Info("clustering compaction task state changed", zap.String("lastState", lastState), zap.String("currentState", currentState), zap.Int64("elapse seconds", lastStateDuration)) metrics.DataCoordCompactionLatency. WithLabelValues(fmt.Sprint(typeutil.IsVectorType(t.GetClusteringKeyField().DataType)), fmt.Sprint(t.CollectionID), t.Channel, datapb.CompactionType_ClusteringCompaction.String(), lastState). Observe(float64(lastStateDuration * 1000)) @@ -115,15 +114,22 @@ func (t *clusteringCompactionTask) Process() bool { if err != nil { log.Warn("Failed to updateAndSaveTaskMeta", zap.Error(err)) } + log.Info("clustering compaction task state changed", zap.String("lastState", lastState), zap.String("currentState", currentState), zap.Int64("elapse seconds", lastStateDuration)) } log.Debug("process clustering task", zap.String("lastState", lastState), zap.String("currentState", currentState)) - return t.State == datapb.CompactionTaskState_completed || t.State == datapb.CompactionTaskState_cleaned + return t.State == datapb.CompactionTaskState_completed || + t.State == datapb.CompactionTaskState_cleaned || + t.State == datapb.CompactionTaskState_failed || + t.State == datapb.CompactionTaskState_timeout } // retryableProcess process task's state transfer, return error if not work as expected // the outer Process will set state and retry times according to the error type(retryable or not-retryable) func (t *clusteringCompactionTask) retryableProcess() error { - if t.State == datapb.CompactionTaskState_completed || t.State == datapb.CompactionTaskState_cleaned { + if t.State == datapb.CompactionTaskState_completed || + t.State == datapb.CompactionTaskState_cleaned || + t.State == datapb.CompactionTaskState_failed || + t.State == datapb.CompactionTaskState_timeout { return nil } @@ -152,15 +158,25 @@ func (t *clusteringCompactionTask) retryableProcess() error { return t.processIndexing() case datapb.CompactionTaskState_statistic: return t.processStats() - - case datapb.CompactionTaskState_timeout: - return t.processFailedOrTimeout() case datapb.CompactionTaskState_failed: return t.processFailedOrTimeout() + case datapb.CompactionTaskState_timeout: + return t.processFailedOrTimeout() } return nil } +func (t *clusteringCompactionTask) Clean() bool { + log := log.With(zap.Int64("planID", t.GetPlanID()), zap.String("type", t.GetType().String())) + log.Info("clean task") + err := t.doClean() + if err != nil { + log.Warn("clean task fail", zap.Error(err)) + return false + } + return true +} + func (t *clusteringCompactionTask) BuildCompactionRequest() (*datapb.CompactionPlan, error) { beginLogID, _, err := t.allocator.AllocN(1) if err != nil { @@ -256,24 +272,25 @@ func (t *clusteringCompactionTask) processExecuting() error { return segment.GetSegmentID() }) + // update the tmp segmentIDs first, revert the segments if CompleteCompactionMutation fails + err = t.updateAndSaveTaskMeta(setTmpSegments(resultSegmentIDs)) + if err != nil { + return err + } + _, metricMutation, err := t.meta.CompleteCompactionMutation(t.CompactionTask, t.result) if err != nil { return err } metricMutation.commit() - err = t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_meta_saved), setTmpSegments(resultSegmentIDs)) + err = t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_meta_saved), setCompactionCommited(true)) if err != nil { return err } return t.processMetaSaved() case datapb.CompactionTaskState_executing: if t.checkTimeout() { - err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_timeout)) - if err == nil { - return t.processFailedOrTimeout() - } else { - return err - } + return t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_timeout)) } return nil case datapb.CompactionTaskState_failed: @@ -504,12 +521,19 @@ func (t *clusteringCompactionTask) resetSegmentCompacting() { t.meta.SetSegmentsCompacting(t.GetInputSegments(), false) } +// Backward compatibility func (t *clusteringCompactionTask) processFailedOrTimeout() error { - log.Info("clean task", zap.Int64("triggerID", t.GetTriggerID()), zap.Int64("planID", t.GetPlanID()), zap.String("state", t.GetState().String())) + return nil +} + +func (t *clusteringCompactionTask) doClean() error { + log := log.With(zap.Int64("planID", t.GetPlanID())) + log.Info("clean task", zap.Int64("triggerID", t.GetTriggerID()), zap.String("state", t.GetState().String())) + if err := t.sessions.DropCompactionPlan(t.GetNodeID(), &datapb.DropCompactionPlanRequest{ PlanID: t.GetPlanID(), }); err != nil { - log.Warn("clusteringCompactionTask processFailedOrTimeout unable to drop compaction plan", zap.Int64("planID", t.GetPlanID()), zap.Error(err)) + log.Warn("clusteringCompactionTask unable to drop compaction plan", zap.Error(err)) } isInputDropped := false for _, segID := range t.GetInputSegments() { @@ -525,8 +549,8 @@ func (t *clusteringCompactionTask) processFailedOrTimeout() error { // revert segments meta var operators []UpdateOperator // revert level of input segments - // L1 : L1 ->(processPipelining)-> L2 ->(processFailedOrTimeout)-> L1 - // L2 : L2 ->(processPipelining)-> L2 ->(processFailedOrTimeout)-> L2 + // L1 : L1 ->(process)-> L2 ->(clean)-> L1 + // L2 : L2 ->(process)-> L2 ->(clean)-> L2 for _, segID := range t.GetInputSegments() { operators = append(operators, RevertSegmentLevelOperator(segID)) } @@ -557,7 +581,6 @@ func (t *clusteringCompactionTask) processFailedOrTimeout() error { // tmpSegment is always invisible operators = append(operators, UpdateStatusOperator(segID, commonpb.SegmentState_Dropped)) } - err := t.meta.UpdateSegmentsInfo(operators...) if err != nil { log.Warn("UpdateSegmentsInfo fail", zap.Error(err)) @@ -565,8 +588,6 @@ func (t *clusteringCompactionTask) processFailedOrTimeout() error { } } - t.resetSegmentCompacting() - // drop partition stats if uploaded partitionStatsInfo := &datapb.PartitionStatsInfo{ CollectionID: t.GetCollectionID(), @@ -578,9 +599,20 @@ func (t *clusteringCompactionTask) processFailedOrTimeout() error { err := t.meta.CleanPartitionStatsInfo(partitionStatsInfo) if err != nil { log.Warn("gcPartitionStatsInfo fail", zap.Error(err)) + return merr.WrapErrCleanPartitionStatsFail(fmt.Sprintf("%d-%d-%s-%d", t.GetCollectionID(), t.GetPartitionID(), t.GetChannel(), t.GetPlanID())) } - return t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_cleaned)) + err = t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_cleaned)) + if err != nil { + log.Warn("clusteringCompactionTask fail to updateAndSaveTaskMeta", zap.Error(err)) + return err + } + + // resetSegmentCompacting must be the last step of Clean, to make sure resetSegmentCompacting only called once + // otherwise, it may unlock segments locked by other compaction tasks + t.resetSegmentCompacting() + log.Info("clusteringCompactionTask clean done") + return nil } func (t *clusteringCompactionTask) doAnalyze() error { diff --git a/internal/datacoord/compaction_task_clustering_test.go b/internal/datacoord/compaction_task_clustering_test.go index a318bc8f71cda..4a85dcaeddb41 100644 --- a/internal/datacoord/compaction_task_clustering_test.go +++ b/internal/datacoord/compaction_task_clustering_test.go @@ -109,7 +109,8 @@ func (s *ClusteringCompactionTaskSuite) TestClusteringCompactionSegmentMetaChang s.mockSessionMgr.EXPECT().Compaction(mock.Anything, mock.Anything, mock.Anything).Return(nil) task := s.generateBasicTask(false) - task.processPipelining() + err := task.processPipelining() + s.NoError(err) seg11 := s.meta.GetSegment(101) s.Equal(datapb.SegmentLevel_L1, seg11.Level) @@ -117,6 +118,32 @@ func (s *ClusteringCompactionTaskSuite) TestClusteringCompactionSegmentMetaChang s.Equal(datapb.SegmentLevel_L2, seg21.Level) s.Equal(int64(10000), seg21.PartitionStatsVersion) + task.ResultSegments = []int64{103, 104} + // fake some compaction result segment + s.meta.AddSegment(context.TODO(), &SegmentInfo{ + SegmentInfo: &datapb.SegmentInfo{ + ID: 103, + State: commonpb.SegmentState_Flushed, + Level: datapb.SegmentLevel_L2, + LastLevel: datapb.SegmentLevel_L1, + CreatedByCompaction: true, + PartitionStatsVersion: 10001, + }, + }) + s.meta.AddSegment(context.TODO(), &SegmentInfo{ + SegmentInfo: &datapb.SegmentInfo{ + ID: 104, + State: commonpb.SegmentState_Flushed, + Level: datapb.SegmentLevel_L2, + LastLevel: datapb.SegmentLevel_L1, + CreatedByCompaction: true, + PartitionStatsVersion: 10001, + }, + }) + + err = task.doClean() + s.NoError(err) + s.Run("v2.4.x", func() { // fake some compaction result segment s.meta.AddSegment(context.TODO(), &SegmentInfo{ @@ -336,7 +363,7 @@ func (s *ClusteringCompactionTaskSuite) TestProcessRetryLogic() { s.Equal(false, task.Process()) s.Equal(int32(3), task.RetryTimes) s.Equal(datapb.CompactionTaskState_pipelining, task.GetState()) - s.Equal(false, task.Process()) + s.Equal(true, task.Process()) s.Equal(int32(0), task.RetryTimes) s.Equal(datapb.CompactionTaskState_failed, task.GetState()) } @@ -345,7 +372,7 @@ func (s *ClusteringCompactionTaskSuite) TestProcessPipelining() { s.Run("process pipelining fail, segment not found", func() { task := s.generateBasicTask(false) task.State = datapb.CompactionTaskState_pipelining - s.Equal(false, task.Process()) + s.Equal(true, task.Process()) s.Equal(datapb.CompactionTaskState_failed, task.GetState()) }) @@ -572,11 +599,10 @@ func (s *ClusteringCompactionTaskSuite) TestProcessExecuting() { }, }, }, nil).Once() - s.mockSessionMgr.EXPECT().DropCompactionPlan(mock.Anything, mock.Anything).Return(nil).Once() time.Sleep(time.Second * 1) s.Equal(true, task.Process()) - s.Equal(datapb.CompactionTaskState_cleaned, task.GetState()) + s.Equal(datapb.CompactionTaskState_timeout, task.GetState()) }) } @@ -677,7 +703,7 @@ func (s *ClusteringCompactionTaskSuite) TestProcessAnalyzingState() { s.Run("analyze task not found", func() { task := s.generateBasicTask(false) task.State = datapb.CompactionTaskState_analyzing - s.False(task.Process()) + s.True(task.Process()) s.Equal(datapb.CompactionTaskState_failed, task.GetState()) }) @@ -694,7 +720,7 @@ func (s *ClusteringCompactionTaskSuite) TestProcessAnalyzingState() { State: indexpb.JobState_JobStateFailed, } s.meta.analyzeMeta.AddAnalyzeTask(t) - s.False(task.Process()) + s.True(task.Process()) s.Equal(datapb.CompactionTaskState_failed, task.GetState()) }) @@ -712,7 +738,7 @@ func (s *ClusteringCompactionTaskSuite) TestProcessAnalyzingState() { CentroidsFile: "", } s.meta.analyzeMeta.AddAnalyzeTask(t) - s.False(task.Process()) + s.True(task.Process()) s.Equal(datapb.CompactionTaskState_failed, task.GetState()) }) diff --git a/internal/datacoord/compaction_task_l0.go b/internal/datacoord/compaction_task_l0.go index 0754a1480ffb1..bb831d563a623 100644 --- a/internal/datacoord/compaction_task_l0.go +++ b/internal/datacoord/compaction_task_l0.go @@ -181,16 +181,15 @@ func (t *l0CompactionTask) processCompleted() bool { } func (t *l0CompactionTask) processTimeout() bool { - t.resetSegmentCompacting() - err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_cleaned)) - if err != nil { - log.Warn("l0CompactionTask failed to updateAndSaveTaskMeta", zap.Error(err)) - return false - } return true } func (t *l0CompactionTask) processFailed() bool { + return true +} + +func (t *l0CompactionTask) doClean() error { + log := log.With(zap.Int64("planID", t.GetPlanID())) if t.hasAssignedWorker() { err := t.sessions.DropCompactionPlan(t.GetNodeID(), &datapb.DropCompactionPlanRequest{ PlanID: t.GetPlanID(), @@ -200,15 +199,17 @@ func (t *l0CompactionTask) processFailed() bool { } } - t.resetSegmentCompacting() err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_cleaned)) if err != nil { log.Warn("l0CompactionTask failed to updateAndSaveTaskMeta", zap.Error(err)) - return false + return err } - log.Info("l0CompactionTask processFailed done", zap.Int64("taskID", t.GetTriggerID()), zap.Int64("planID", t.GetPlanID())) - return true + // resetSegmentCompacting must be the last step of Clean, to make sure resetSegmentCompacting only called once + // otherwise, it may unlock segments locked by other compaction tasks + t.resetSegmentCompacting() + log.Info("mixCompactionTask clean done") + return nil } func (t *l0CompactionTask) GetResult() *datapb.CompactionPlanResult { @@ -430,6 +431,11 @@ func (t *l0CompactionTask) saveSegmentMeta() error { return t.meta.UpdateSegmentsInfo(operators...) } +func (t *l0CompactionTask) Clean() bool { + err := t.doClean() + return err == nil +} + func (t *l0CompactionTask) GetSlotUsage() int64 { return t.slotUsage } diff --git a/internal/datacoord/compaction_task_l0_test.go b/internal/datacoord/compaction_task_l0_test.go index 316eb3f34b56f..060ec1fdbf293 100644 --- a/internal/datacoord/compaction_task_l0_test.go +++ b/internal/datacoord/compaction_task_l0_test.go @@ -260,14 +260,11 @@ func (s *L0CompactionTaskSuite) TestPorcessStateTrans() { Deltalogs: deltaLogs, }} }).Twice() - s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil).Times(2) - s.mockMeta.EXPECT().SetSegmentsCompacting(mock.Anything, false).Return() - - s.mockSessMgr.EXPECT().DropCompactionPlan(mock.Anything, mock.Anything).Return(nil).Once() + s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil).Times(1) got := t.Process() s.True(got) - s.Equal(datapb.CompactionTaskState_cleaned, t.State) + s.Equal(datapb.CompactionTaskState_failed, t.State) }) s.Run("test pipelining saveTaskMeta failed", func() { t := s.generateTestL0Task(datapb.CompactionTaskState_pipelining) @@ -427,15 +424,10 @@ func (s *L0CompactionTaskSuite) TestPorcessStateTrans() { t.TimeoutInSeconds = 10 s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil) - s.mockMeta.EXPECT().SetSegmentsCompacting(mock.Anything, false). - RunAndReturn(func(inputs []int64, compacting bool) { - s.ElementsMatch(inputs, t.GetInputSegments()) - s.False(compacting) - }).Once() got = t.Process() s.True(got) - s.Equal(datapb.CompactionTaskState_cleaned, t.GetState()) + s.Equal(datapb.CompactionTaskState_timeout, t.GetState()) }) s.Run("test executing with result executing timeout and updataAndSaveTaskMeta failed", func() { @@ -525,14 +517,11 @@ func (s *L0CompactionTaskSuite) TestPorcessStateTrans() { PlanID: t.GetPlanID(), State: datapb.CompactionTaskState_failed, }, nil).Once() - s.mockSessMgr.EXPECT().DropCompactionPlan(t.GetNodeID(), mock.Anything).Return(nil) - - s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil).Times(2) - s.mockMeta.EXPECT().SetSegmentsCompacting(mock.Anything, false).Return().Once() + s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil).Times(1) got := t.Process() s.True(got) - s.Equal(datapb.CompactionTaskState_cleaned, t.GetState()) + s.Equal(datapb.CompactionTaskState_failed, t.GetState()) }) s.Run("test executing with result failed save compaction meta failed", func() { t := s.generateTestL0Task(datapb.CompactionTaskState_executing) @@ -555,11 +544,6 @@ func (s *L0CompactionTaskSuite) TestPorcessStateTrans() { t := s.generateTestL0Task(datapb.CompactionTaskState_timeout) t.NodeID = 100 s.Require().True(t.GetNodeID() > 0) - s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil).Times(1) - s.mockMeta.EXPECT().SetSegmentsCompacting(mock.Anything, false).RunAndReturn(func(segIDs []int64, isCompacting bool) { - s.Require().False(isCompacting) - s.ElementsMatch(segIDs, t.GetInputSegments()) - }).Once() got := t.Process() s.True(got) @@ -629,30 +613,20 @@ func (s *L0CompactionTaskSuite) TestPorcessStateTrans() { t := s.generateTestL0Task(datapb.CompactionTaskState_failed) t.NodeID = 100 s.Require().True(t.GetNodeID() > 0) - s.mockSessMgr.EXPECT().DropCompactionPlan(t.GetNodeID(), mock.Anything).Return(nil).Once() - s.mockMeta.EXPECT().SetSegmentsCompacting(mock.Anything, false).RunAndReturn(func(segIDs []int64, isCompacting bool) { - s.ElementsMatch(segIDs, t.GetInputSegments()) - }).Once() - s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil).Times(1) got := t.Process() s.True(got) - s.Equal(datapb.CompactionTaskState_cleaned, t.GetState()) + s.Equal(datapb.CompactionTaskState_failed, t.GetState()) }) s.Run("test process failed failed", func() { t := s.generateTestL0Task(datapb.CompactionTaskState_failed) t.NodeID = 100 s.Require().True(t.GetNodeID() > 0) - s.mockSessMgr.EXPECT().DropCompactionPlan(t.GetNodeID(), mock.Anything).Return(errors.New("mock error")).Once() - s.mockMeta.EXPECT().SetSegmentsCompacting(mock.Anything, false).RunAndReturn(func(segIDs []int64, isCompacting bool) { - s.ElementsMatch(segIDs, t.GetInputSegments()) - }).Once() - s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil).Times(1) got := t.Process() s.True(got) - s.Equal(datapb.CompactionTaskState_cleaned, t.GetState()) + s.Equal(datapb.CompactionTaskState_failed, t.GetState()) }) s.Run("test unknown task", func() { diff --git a/internal/datacoord/compaction_task_mix.go b/internal/datacoord/compaction_task_mix.go index 983f702368fec..10b8f12604f32 100644 --- a/internal/datacoord/compaction_task_mix.go +++ b/internal/datacoord/compaction_task_mix.go @@ -60,20 +60,23 @@ func (t *mixCompactionTask) processPipelining() bool { log.Warn("mixCompactionTask failed to updateAndSaveTaskMeta", zap.Error(err)) return false } - return t.processFailed() + return true } err = t.sessions.Compaction(context.TODO(), t.GetNodeID(), t.GetPlan()) if err != nil { - log.Warn("mixCompactionTask failed to notify compaction tasks to DataNode", zap.Error(err)) - t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining), setNodeID(NullNodeID)) + log.Warn("mixCompactionTask failed to notify compaction tasks to DataNode", zap.Int64("planID", t.GetPlanID()), zap.Error(err)) + err = t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining), setNodeID(NullNodeID)) + if err != nil { + log.Warn("mixCompactionTask failed to updateAndSaveTaskMeta", zap.Error(err)) + } return false } log.Warn("mixCompactionTask notify compaction tasks to DataNode") err = t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_executing)) if err != nil { - log.Warn("mixCompactionTask update task state failed", zap.Error(err)) + log.Warn("mixCompactionTask failed to updateAndSaveTaskMeta", zap.Error(err)) return false } return false @@ -110,7 +113,7 @@ func (t *mixCompactionTask) processExecuting() bool { log.Warn("mixCompactionTask failed to updateAndSaveTaskMeta", zap.Error(err)) return false } - return t.processTimeout() + return true } case datapb.CompactionTaskState_completed: t.result = result @@ -121,8 +124,19 @@ func (t *mixCompactionTask) processExecuting() bool { log.Warn("mixCompactionTask failed to setState failed", zap.Error(err)) return false } - return t.processFailed() + return true } + + // update the tmp segmentIDs first, revert the segments if CompleteCompactionMutation fails + resultSegmentIDs := lo.Map(result.Segments, func(segment *datapb.CompactionSegment, _ int) int64 { + return segment.GetSegmentID() + }) + err = t.updateAndSaveTaskMeta(setTmpSegments(resultSegmentIDs)) + if err != nil { + log.Warn("mixCompactionTask failed to setTmpSegments failed", zap.Error(err)) + return false + } + if err := t.saveSegmentMeta(); err != nil { log.Warn("mixCompactionTask failed to save segment meta", zap.Error(err)) if errors.Is(err, merr.ErrIllegalCompactionPlan) { @@ -131,11 +145,11 @@ func (t *mixCompactionTask) processExecuting() bool { log.Warn("mixCompactionTask failed to setState failed", zap.Error(err)) return false } - return t.processFailed() + return true } return false } - err = t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_meta_saved), setResultSegments(t.newSegmentIDs)) + err = t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_meta_saved), setCompactionCommited(true)) if err != nil { log.Warn("mixCompaction failed to setState meta saved", zap.Error(err)) return false @@ -147,7 +161,7 @@ func (t *mixCompactionTask) processExecuting() bool { if err != nil { log.Warn("fail to updateAndSaveTaskMeta") } - return false + return true } return false } @@ -185,14 +199,14 @@ func (t *mixCompactionTask) Process() bool { processResult = t.processPipelining() case datapb.CompactionTaskState_executing: processResult = t.processExecuting() - case datapb.CompactionTaskState_timeout: - processResult = t.processTimeout() case datapb.CompactionTaskState_meta_saved: processResult = t.processMetaSaved() case datapb.CompactionTaskState_completed: processResult = t.processCompleted() case datapb.CompactionTaskState_failed: processResult = t.processFailed() + case datapb.CompactionTaskState_timeout: + processResult = t.processTimeout() } currentState := t.GetState().String() if currentState != lastState { @@ -236,16 +250,6 @@ func (t *mixCompactionTask) resetSegmentCompacting() { t.meta.SetSegmentsCompacting(t.GetInputSegments(), false) } -func (t *mixCompactionTask) processTimeout() bool { - t.resetSegmentCompacting() - err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_cleaned)) - if err != nil { - log.Warn("mixCompactionTask failed to updateAndSaveTaskMeta", zap.Error(err)) - return false - } - return true -} - func (t *mixCompactionTask) ShadowClone(opts ...compactionTaskOpt) *datapb.CompactionTask { taskClone := proto.Clone(t).(*datapb.CompactionTask) for _, opt := range opts { @@ -254,22 +258,40 @@ func (t *mixCompactionTask) ShadowClone(opts ...compactionTaskOpt) *datapb.Compa return taskClone } +// Backward compatibility func (t *mixCompactionTask) processFailed() bool { - log := log.With(zap.Int64("triggerID", t.GetTriggerID()), zap.Int64("PlanID", t.GetPlanID()), zap.Int64("collectionID", t.GetCollectionID())) + return true +} + +// Backward compatibility +func (t *mixCompactionTask) processTimeout() bool { + return true +} + +func (t *mixCompactionTask) Clean() bool { + err := t.doClean() + return err == nil +} + +func (t *mixCompactionTask) doClean() error { + log := log.With(zap.Int64("planID", t.GetPlanID())) if err := t.sessions.DropCompactionPlan(t.GetNodeID(), &datapb.DropCompactionPlanRequest{ PlanID: t.GetPlanID(), }); err != nil { log.Warn("mixCompactionTask processFailed unable to drop compaction plan", zap.Error(err)) + return err } - log.Info("mixCompactionTask processFailed done") - t.resetSegmentCompacting() err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_cleaned)) if err != nil { - log.Warn("mixCompactionTask failed to updateAndSaveTaskMeta", zap.Error(err)) - return false + log.Warn("mixCompactionTask fail to updateAndSaveTaskMeta", zap.Error(err)) + return err } - return true + // resetSegmentCompacting must be the last step of Clean, to make sure resetSegmentCompacting only called once + // otherwise, it may unlock segments locked by other compaction tasks + t.resetSegmentCompacting() + log.Info("mixCompactionTask clean done") + return nil } func (t *mixCompactionTask) checkTimeout() bool { diff --git a/internal/datacoord/compaction_task_mix_test.go b/internal/datacoord/compaction_task_mix_test.go index 4b750b1500bae..cdcc01b4eaed4 100644 --- a/internal/datacoord/compaction_task_mix_test.go +++ b/internal/datacoord/compaction_task_mix_test.go @@ -1,17 +1,36 @@ package datacoord import ( + "testing" "time" "github.com/samber/lo" "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/suite" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus/internal/datacoord/allocator" + "github.com/milvus-io/milvus/internal/datacoord/session" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/pkg/util/merr" ) +func TestCompactionTaskSuite(t *testing.T) { + suite.Run(t, new(CompactionTaskSuite)) +} + +type CompactionTaskSuite struct { + suite.Suite + + mockMeta *MockCompactionMeta + mockSessMgr *session.MockDataNodeManager +} + +func (s *CompactionTaskSuite) SetupTest() { + s.mockMeta = NewMockCompactionMeta(s.T()) + s.mockSessMgr = session.NewMockDataNodeManager(s.T()) +} + func (s *CompactionTaskSuite) TestProcessRefreshPlan_NormalMix() { channel := "Ch-1" binLogs := []*datapb.FieldBinlog{getFieldBinlogIDs(101, 3)} @@ -95,7 +114,6 @@ func (s *CompactionTaskSuite) TestCompactionTimeout() { }} }).Times(2) s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil) - s.mockMeta.EXPECT().SetSegmentsCompacting(mock.Anything, mock.Anything) alloc := allocator.NewMockAllocator(s.T()) alloc.EXPECT().AllocN(mock.Anything).Return(100, 200, nil) task := &mixCompactionTask{ @@ -125,5 +143,7 @@ func (s *CompactionTaskSuite) TestCompactionTimeout() { }, nil) end := task.processExecuting() s.Equal(true, end) - s.Equal(datapb.CompactionTaskState_cleaned, task.State) + s.Equal(datapb.CompactionTaskState_timeout, task.State) + end = task.processTimeout() + s.Equal(true, end) } diff --git a/internal/datacoord/compaction_task_test.go b/internal/datacoord/compaction_task_test.go deleted file mode 100644 index 41f33cdee13c6..0000000000000 --- a/internal/datacoord/compaction_task_test.go +++ /dev/null @@ -1,25 +0,0 @@ -package datacoord - -import ( - "testing" - - "github.com/stretchr/testify/suite" - - "github.com/milvus-io/milvus/internal/datacoord/session" -) - -func TestCompactionTaskSuite(t *testing.T) { - suite.Run(t, new(CompactionTaskSuite)) -} - -type CompactionTaskSuite struct { - suite.Suite - - mockMeta *MockCompactionMeta - mockSessMgr *session.MockDataNodeManager -} - -func (s *CompactionTaskSuite) SetupTest() { - s.mockMeta = NewMockCompactionMeta(s.T()) - s.mockSessMgr = session.NewMockDataNodeManager(s.T()) -} diff --git a/internal/datacoord/compaction_test.go b/internal/datacoord/compaction_test.go index a199708b05605..bc377d00cd65c 100644 --- a/internal/datacoord/compaction_test.go +++ b/internal/datacoord/compaction_test.go @@ -26,6 +26,7 @@ import ( "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" + "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/datacoord/allocator" "github.com/milvus-io/milvus/internal/datacoord/session" "github.com/milvus-io/milvus/internal/metastore/kv/binlog" @@ -49,6 +50,7 @@ type CompactionPlanHandlerSuite struct { mockSessMgr *session.MockDataNodeManager handler *compactionPlanHandler cluster Cluster + mockHandler *NMockHandler } func (s *CompactionPlanHandlerSuite) SetupTest() { @@ -58,6 +60,8 @@ func (s *CompactionPlanHandlerSuite) SetupTest() { s.mockSessMgr = session.NewMockDataNodeManager(s.T()) s.cluster = NewMockCluster(s.T()) s.handler = newCompactionPlanHandler(s.cluster, s.mockSessMgr, s.mockCm, s.mockMeta, s.mockAlloc, nil, nil) + s.mockHandler = NewNMockHandler(s.T()) + s.mockHandler.EXPECT().GetCollection(mock.Anything, mock.Anything).Return(&collectionInfo{}, nil).Maybe() } func (s *CompactionPlanHandlerSuite) TestScheduleEmpty() { @@ -933,6 +937,190 @@ func (s *CompactionPlanHandlerSuite) TestProcessCompleteCompaction() { s.NoError(err) } +func (s *CompactionPlanHandlerSuite) TestCleanCompaction() { + s.SetupTest() + + tests := []struct { + task CompactionTask + }{ + { + &mixCompactionTask{ + CompactionTask: &datapb.CompactionTask{ + PlanID: 1, + TriggerID: 1, + Type: datapb.CompactionType_MixCompaction, + State: datapb.CompactionTaskState_failed, + NodeID: 1, + InputSegments: []UniqueID{1, 2}, + }, + sessions: s.mockSessMgr, + meta: s.mockMeta, + }, + }, + { + &l0CompactionTask{ + CompactionTask: &datapb.CompactionTask{ + PlanID: 1, + TriggerID: 1, + Type: datapb.CompactionType_Level0DeleteCompaction, + State: datapb.CompactionTaskState_failed, + NodeID: 1, + InputSegments: []UniqueID{1, 2}, + }, + sessions: s.mockSessMgr, + meta: s.mockMeta, + }, + }, + } + for _, test := range tests { + task := test.task + s.mockMeta.EXPECT().SetSegmentsCompacting(mock.Anything, mock.Anything).Return().Once() + s.mockSessMgr.EXPECT().DropCompactionPlan(mock.Anything, mock.Anything).Return(nil) + s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil) + s.mockSessMgr.EXPECT().DropCompactionPlan(mock.Anything, mock.Anything).Return(nil) + + s.handler.executingTasks[1] = task + s.Equal(1, len(s.handler.executingTasks)) + err := s.handler.checkCompaction() + s.NoError(err) + s.Equal(0, len(s.handler.executingTasks)) + s.Equal(1, len(s.handler.cleaningTasks)) + s.handler.cleanFailedTasks() + s.Equal(0, len(s.handler.cleaningTasks)) + } +} + +func (s *CompactionPlanHandlerSuite) TestCleanClusteringCompaction() { + s.SetupTest() + + task := &clusteringCompactionTask{ + CompactionTask: &datapb.CompactionTask{ + PlanID: 1, + TriggerID: 1, + CollectionID: 1001, + Type: datapb.CompactionType_ClusteringCompaction, + State: datapb.CompactionTaskState_failed, + NodeID: 1, + InputSegments: []UniqueID{1, 2}, + }, + sessions: s.mockSessMgr, + meta: s.mockMeta, + } + s.mockMeta.EXPECT().SetSegmentsCompacting(mock.Anything, mock.Anything).Return().Once() + s.mockMeta.EXPECT().UpdateSegmentsInfo(mock.Anything, mock.Anything).Return(nil) + s.mockMeta.EXPECT().CleanPartitionStatsInfo(mock.Anything).Return(nil) + s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil) + s.mockSessMgr.EXPECT().DropCompactionPlan(mock.Anything, mock.Anything).Return(nil) + + s.handler.executingTasks[1] = task + s.Equal(1, len(s.handler.executingTasks)) + s.handler.checkCompaction() + s.Equal(0, len(s.handler.executingTasks)) + s.Equal(1, len(s.handler.cleaningTasks)) + s.handler.cleanFailedTasks() + s.Equal(0, len(s.handler.cleaningTasks)) +} + +func (s *CompactionPlanHandlerSuite) TestCleanClusteringCompactionCommitFail() { + s.SetupTest() + + task := &clusteringCompactionTask{ + CompactionTask: &datapb.CompactionTask{ + PlanID: 1, + TriggerID: 1, + CollectionID: 1001, + Channel: "ch-1", + Type: datapb.CompactionType_ClusteringCompaction, + State: datapb.CompactionTaskState_executing, + NodeID: 1, + InputSegments: []UniqueID{1, 2}, + ClusteringKeyField: &schemapb.FieldSchema{ + FieldID: 100, + Name: Int64Field, + IsPrimaryKey: true, + DataType: schemapb.DataType_Int64, + AutoID: true, + IsClusteringKey: true, + }, + }, + sessions: s.mockSessMgr, + meta: s.mockMeta, + handler: s.mockHandler, + } + + s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil) + s.mockSessMgr.EXPECT().GetCompactionPlanResult(UniqueID(1), int64(1)).Return( + &datapb.CompactionPlanResult{ + PlanID: 1, + State: datapb.CompactionTaskState_completed, + Segments: []*datapb.CompactionSegment{ + { + PlanID: 1, + SegmentID: 101, + }, + }, + }, nil).Once() + s.mockMeta.EXPECT().CompleteCompactionMutation(mock.Anything, mock.Anything).Return(nil, nil, errors.New("mock error")) + s.mockSessMgr.EXPECT().DropCompactionPlan(mock.Anything, mock.Anything).Return(nil) + s.handler.executingTasks[1] = task + s.Equal(1, len(s.handler.executingTasks)) + s.handler.checkCompaction() + s.Equal(0, len(task.GetResultSegments())) + + s.Equal(datapb.CompactionTaskState_failed, task.GetState()) + s.Equal(0, len(s.handler.executingTasks)) + s.Equal(1, len(s.handler.cleaningTasks)) + + s.mockMeta.EXPECT().SetSegmentsCompacting(mock.Anything, mock.Anything).Return().Once() + s.mockMeta.EXPECT().UpdateSegmentsInfo(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) + s.mockMeta.EXPECT().CleanPartitionStatsInfo(mock.Anything).Return(nil) + s.handler.cleanFailedTasks() + s.Equal(0, len(s.handler.cleaningTasks)) +} + +// test compactionHandler should keep clean the failed task until it become cleaned +func (s *CompactionPlanHandlerSuite) TestKeepClean() { + s.SetupTest() + + tests := []struct { + task CompactionTask + }{ + { + &clusteringCompactionTask{ + CompactionTask: &datapb.CompactionTask{ + PlanID: 1, + TriggerID: 1, + Type: datapb.CompactionType_ClusteringCompaction, + State: datapb.CompactionTaskState_failed, + NodeID: 1, + InputSegments: []UniqueID{1, 2}, + }, + sessions: s.mockSessMgr, + meta: s.mockMeta, + }, + }, + } + for _, test := range tests { + task := test.task + s.mockMeta.EXPECT().SetSegmentsCompacting(mock.Anything, mock.Anything).Return() + s.mockMeta.EXPECT().UpdateSegmentsInfo(mock.Anything, mock.Anything).Return(nil) + s.mockMeta.EXPECT().CleanPartitionStatsInfo(mock.Anything).Return(errors.New("mock error")).Once() + s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil) + s.mockSessMgr.EXPECT().DropCompactionPlan(mock.Anything, mock.Anything).Return(nil) + + s.handler.executingTasks[1] = task + s.Equal(1, len(s.handler.executingTasks)) + s.handler.checkCompaction() + s.Equal(0, len(s.handler.executingTasks)) + s.Equal(1, len(s.handler.cleaningTasks)) + s.handler.cleanFailedTasks() + s.Equal(1, len(s.handler.cleaningTasks)) + s.mockMeta.EXPECT().CleanPartitionStatsInfo(mock.Anything).Return(nil).Once() + s.handler.cleanFailedTasks() + s.Equal(0, len(s.handler.cleaningTasks)) + } +} + func getFieldBinlogIDs(fieldID int64, logIDs ...int64) *datapb.FieldBinlog { l := &datapb.FieldBinlog{ FieldID: fieldID, diff --git a/internal/datacoord/partition_stats_meta.go b/internal/datacoord/partition_stats_meta.go index 27009992cecd8..c6c0f059c98ba 100644 --- a/internal/datacoord/partition_stats_meta.go +++ b/internal/datacoord/partition_stats_meta.go @@ -151,13 +151,38 @@ func (psm *partitionStatsMeta) DropPartitionStatsInfo(info *datapb.PartitionStat if len(psm.partitionStatsInfos[info.GetVChannel()]) == 0 { delete(psm.partitionStatsInfos, info.GetVChannel()) } + // if the dropping partitionStats is the current version, should update currentPartitionStats + currentVersion := psm.innerGetCurrentPartitionStatsVersion(info.GetCollectionID(), info.GetPartitionID(), info.GetVChannel()) + if currentVersion == info.GetVersion() && currentVersion != emptyPartitionStatsVersion { + err := psm.catalog.DropCurrentPartitionStatsVersion(psm.ctx, info.GetCollectionID(), info.GetPartitionID(), info.GetVChannel()) + if err != nil { + return err + } + infos := psm.partitionStatsInfos[info.GetVChannel()][info.GetPartitionID()].infos + delete(infos, currentVersion) + if len(infos) > 0 { + var maxVersion int64 = 0 + for version := range infos { + if version > maxVersion { + maxVersion = version + } + } + err := psm.innerSaveCurrentPartitionStatsVersion(info.GetCollectionID(), info.GetPartitionID(), info.GetVChannel(), maxVersion) + if err != nil { + return err + } + } + } return nil } func (psm *partitionStatsMeta) SaveCurrentPartitionStatsVersion(collectionID, partitionID int64, vChannel string, currentPartitionStatsVersion int64) error { psm.Lock() defer psm.Unlock() + return psm.innerSaveCurrentPartitionStatsVersion(collectionID, partitionID, vChannel, currentPartitionStatsVersion) +} +func (psm *partitionStatsMeta) innerSaveCurrentPartitionStatsVersion(collectionID, partitionID int64, vChannel string, currentPartitionStatsVersion int64) error { log.Info("update current partition stats version", zap.Int64("collectionID", collectionID), zap.Int64("partitionID", partitionID), zap.String("vChannel", vChannel), zap.Int64("currentPartitionStatsVersion", currentPartitionStatsVersion)) @@ -182,7 +207,10 @@ func (psm *partitionStatsMeta) SaveCurrentPartitionStatsVersion(collectionID, pa func (psm *partitionStatsMeta) GetCurrentPartitionStatsVersion(collectionID, partitionID int64, vChannel string) int64 { psm.RLock() defer psm.RUnlock() + return psm.innerGetCurrentPartitionStatsVersion(collectionID, partitionID, vChannel) +} +func (psm *partitionStatsMeta) innerGetCurrentPartitionStatsVersion(collectionID, partitionID int64, vChannel string) int64 { if _, ok := psm.partitionStatsInfos[vChannel]; !ok { return emptyPartitionStatsVersion } diff --git a/internal/datacoord/partition_stats_meta_test.go b/internal/datacoord/partition_stats_meta_test.go index 2a124a6471c65..696373caaee5b 100644 --- a/internal/datacoord/partition_stats_meta_test.go +++ b/internal/datacoord/partition_stats_meta_test.go @@ -87,3 +87,62 @@ func (s *PartitionStatsMetaSuite) TestGetPartitionStats() { currentVersion4 := partitionStatsMeta.GetCurrentPartitionStatsVersion(1, 2, "ch-1") s.Equal(int64(100), currentVersion4) } + +func (s *PartitionStatsMetaSuite) TestDropPartitionStats() { + ctx := context.Background() + partitionStatsMeta, err := newPartitionStatsMeta(ctx, s.catalog) + s.NoError(err) + collectionID := int64(1) + partitionID := int64(2) + channel := "ch-1" + s.catalog.EXPECT().DropPartitionStatsInfo(mock.Anything, mock.Anything).Return(nil) + s.catalog.EXPECT().SaveCurrentPartitionStatsVersion(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) + s.catalog.EXPECT().DropCurrentPartitionStatsVersion(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) + + partitionStats := []*datapb.PartitionStatsInfo{ + { + CollectionID: collectionID, + PartitionID: partitionID, + VChannel: channel, + SegmentIDs: []int64{100000}, + Version: 100, + }, + { + CollectionID: collectionID, + PartitionID: partitionID, + VChannel: channel, + SegmentIDs: []int64{100000}, + Version: 101, + }, + { + CollectionID: collectionID, + PartitionID: partitionID, + VChannel: channel, + SegmentIDs: []int64{100000}, + Version: 102, + }, + } + for _, partitionStats := range partitionStats { + partitionStatsMeta.SavePartitionStatsInfo(partitionStats) + } + partitionStatsMeta.SaveCurrentPartitionStatsVersion(collectionID, partitionID, channel, 102) + version := partitionStatsMeta.GetCurrentPartitionStatsVersion(collectionID, partitionID, channel) + s.Equal(int64(102), version) + + err = partitionStatsMeta.DropPartitionStatsInfo(partitionStats[2]) + s.NoError(err) + s.Equal(2, len(partitionStatsMeta.partitionStatsInfos[channel][partitionID].infos)) + version2 := partitionStatsMeta.GetCurrentPartitionStatsVersion(collectionID, partitionID, channel) + s.Equal(int64(101), version2) + + err = partitionStatsMeta.DropPartitionStatsInfo(partitionStats[1]) + s.Equal(1, len(partitionStatsMeta.partitionStatsInfos[channel][partitionID].infos)) + version3 := partitionStatsMeta.GetCurrentPartitionStatsVersion(collectionID, partitionID, channel) + s.Equal(int64(100), version3) + + err = partitionStatsMeta.DropPartitionStatsInfo(partitionStats[0]) + s.NoError(err) + s.Nil(partitionStatsMeta.partitionStatsInfos[channel][partitionID]) + version4 := partitionStatsMeta.GetCurrentPartitionStatsVersion(collectionID, partitionID, channel) + s.Equal(emptyPartitionStatsVersion, version4) +} diff --git a/pkg/util/merr/errors.go b/pkg/util/merr/errors.go index 30bd26ec49f8f..b43041bc3b1c9 100644 --- a/pkg/util/merr/errors.go +++ b/pkg/util/merr/errors.go @@ -210,6 +210,7 @@ var ( ErrGetCompactionPlanResultFail = newMilvusError("fail to get compaction plan", 2313, true) ErrCompactionResult = newMilvusError("illegal compaction results", 2314, false) ErrDuplicatedCompactionTask = newMilvusError("duplicated compaction task", 2315, false) + ErrCleanPartitionStatsFail = newMilvusError("fail to clean partition Stats", 2316, true) ErrDataNodeSlotExhausted = newMilvusError("datanode slot exhausted", 2401, false) diff --git a/pkg/util/merr/utils.go b/pkg/util/merr/utils.go index 2ad30564e570b..de68e12836409 100644 --- a/pkg/util/merr/utils.go +++ b/pkg/util/merr/utils.go @@ -1177,6 +1177,14 @@ func WrapErrClusteringCompactionMetaError(operation string, err error) error { return wrapFieldsWithDesc(ErrClusteringCompactionMetaError, err.Error(), value("operation", operation)) } +func WrapErrCleanPartitionStatsFail(msg ...string) error { + err := error(ErrCleanPartitionStatsFail) + if len(msg) > 0 { + err = errors.Wrap(err, strings.Join(msg, "->")) + } + return err +} + func WrapErrAnalyzeTaskNotFound(id int64) error { return wrapFields(ErrAnalyzeTaskNotFound, value("analyzeId", id)) }