From c01eeb218a77b79f934a1bbb0ee303f234cb9395 Mon Sep 17 00:00:00 2001 From: wayblink Date: Thu, 24 Oct 2024 10:27:04 +0800 Subject: [PATCH] enhance: clean failed/timeout compaction task in compactionHandler Signed-off-by: wayblink --- internal/datacoord/compaction.go | 39 ++++ internal/datacoord/compaction_task.go | 10 + .../datacoord/compaction_task_clustering.go | 63 +++++-- .../compaction_task_clustering_test.go | 48 ++++- internal/datacoord/compaction_task_l0.go | 26 ++- internal/datacoord/compaction_task_l0_test.go | 32 +--- internal/datacoord/compaction_task_mix.go | 72 ++++--- .../datacoord/compaction_task_mix_test.go | 5 +- internal/datacoord/compaction_test.go | 177 ++++++++++++++++++ internal/datacoord/partition_stats_meta.go | 28 +++ .../datacoord/partition_stats_meta_test.go | 59 ++++++ pkg/util/merr/errors.go | 1 + pkg/util/merr/utils.go | 8 + 13 files changed, 477 insertions(+), 91 deletions(-) diff --git a/internal/datacoord/compaction.go b/internal/datacoord/compaction.go index 05193eb31d0c1..f231a63784dbd 100644 --- a/internal/datacoord/compaction.go +++ b/internal/datacoord/compaction.go @@ -81,6 +81,9 @@ type compactionPlanHandler struct { executingGuard lock.RWMutex executingTasks map[int64]CompactionTask // planID -> task + cleaningGuard lock.RWMutex + cleaningTasks map[int64]CompactionTask // planID -> task + meta CompactionMeta allocator allocator.Allocator chManager ChannelManager @@ -189,6 +192,7 @@ func newCompactionPlanHandler(cluster Cluster, sessions session.DataNodeManager, stopCh: make(chan struct{}), cluster: cluster, executingTasks: make(map[int64]CompactionTask), + cleaningTasks: make(map[int64]CompactionTask), analyzeScheduler: analyzeScheduler, handler: handler, } @@ -368,6 +372,7 @@ func (c *compactionPlanHandler) loopCheck() { if err != nil { log.Info("fail to update compaction", zap.Error(err)) } + c.cleanFailedTasks() } } } @@ -614,6 +619,11 @@ func (c *compactionPlanHandler) assignNodeIDs(tasks []CompactionTask) { } } +// checkCompaction retrieves executing tasks and calls each task's Process() method +// to evaluate its state and progress through the state machine. +// Completed tasks are removed from executingTasks. +// Tasks that fail or timeout are moved from executingTasks to cleaningTasks, +// where task-specific clean logic is performed asynchronously. func (c *compactionPlanHandler) checkCompaction() error { // Get executing executingTasks before GetCompactionState from DataNode to prevent false failure, // for DC might add new task while GetCompactionState. @@ -649,9 +659,38 @@ func (c *compactionPlanHandler) checkCompaction() error { metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", t.GetTaskProto().GetNodeID()), t.GetTaskProto().GetType().String(), metrics.Done).Inc() } c.executingGuard.Unlock() + + // insert task need to clean + c.cleaningGuard.Lock() + for _, t := range finishedTasks { + if t.GetTaskProto().GetState() == datapb.CompactionTaskState_failed || t.GetTaskProto().GetState() == datapb.CompactionTaskState_timeout { + c.cleaningTasks[t.GetTaskProto().GetPlanID()] = t + } + } + c.cleaningGuard.Unlock() + return nil } +// cleanFailedTasks performs task define Clean logic +// while compactionPlanHandler.Clean is to do garbage collection for cleaned tasks +func (c *compactionPlanHandler) cleanFailedTasks() { + c.cleaningGuard.RLock() + cleanedTasks := make([]CompactionTask, 0) + for _, t := range c.cleaningTasks { + clean := t.Clean() + if clean { + cleanedTasks = append(cleanedTasks, t) + } + } + c.cleaningGuard.RUnlock() + c.cleaningGuard.Lock() + for _, t := range cleanedTasks { + delete(c.cleaningTasks, t.GetTaskProto().GetPlanID()) + } + c.cleaningGuard.Unlock() +} + func (c *compactionPlanHandler) pickAnyNode(nodeSlots map[int64]int64, task CompactionTask) (nodeID int64, useSlot int64) { nodeID = NullNodeID var maxSlots int64 = -1 diff --git a/internal/datacoord/compaction_task.go b/internal/datacoord/compaction_task.go index f9cd27e972da6..4f0bd375a180b 100644 --- a/internal/datacoord/compaction_task.go +++ b/internal/datacoord/compaction_task.go @@ -23,7 +23,17 @@ import ( ) type CompactionTask interface { + // Process performs the task's state machine + // + // Returns: + // - : whether the task state machine ends. + // + // Notes: + // + // `end` doesn't mean the task completed, its state may be completed or failed or timeout. Process() bool + // Clean performs clean logic for a fail/timeout task + Clean() bool BuildCompactionRequest() (*datapb.CompactionPlan, error) GetSlotUsage() int64 GetLabel() string diff --git a/internal/datacoord/compaction_task_clustering.go b/internal/datacoord/compaction_task_clustering.go index d6a5b88a25f6a..f5706bd5150f6 100644 --- a/internal/datacoord/compaction_task_clustering.go +++ b/internal/datacoord/compaction_task_clustering.go @@ -125,15 +125,22 @@ func (t *clusteringCompactionTask) Process() bool { if err != nil { log.Warn("Failed to updateAndSaveTaskMeta", zap.Error(err)) } + log.Info("clustering compaction task state changed", zap.String("lastState", lastState), zap.String("currentState", currentState), zap.Int64("elapse seconds", lastStateDuration)) } log.Debug("process clustering task", zap.String("lastState", lastState), zap.String("currentState", currentState)) - return t.GetTaskProto().State == datapb.CompactionTaskState_completed || t.GetTaskProto().State == datapb.CompactionTaskState_cleaned + return t.GetTaskProto().State == datapb.CompactionTaskState_completed || + t.GetTaskProto().State == datapb.CompactionTaskState_cleaned || + t.GetTaskProto().State == datapb.CompactionTaskState_failed || + t.GetTaskProto().State == datapb.CompactionTaskState_timeout } // retryableProcess process task's state transfer, return error if not work as expected // the outer Process will set state and retry times according to the error type(retryable or not-retryable) func (t *clusteringCompactionTask) retryableProcess() error { - if t.GetTaskProto().State == datapb.CompactionTaskState_completed || t.GetTaskProto().State == datapb.CompactionTaskState_cleaned { + if t.GetTaskProto().State == datapb.CompactionTaskState_completed || + t.GetTaskProto().State == datapb.CompactionTaskState_cleaned || + t.GetTaskProto().State == datapb.CompactionTaskState_failed || + t.GetTaskProto().State == datapb.CompactionTaskState_timeout { return nil } @@ -162,15 +169,25 @@ func (t *clusteringCompactionTask) retryableProcess() error { return t.processIndexing() case datapb.CompactionTaskState_statistic: return t.processStats() - - case datapb.CompactionTaskState_timeout: - return t.processFailedOrTimeout() case datapb.CompactionTaskState_failed: return t.processFailedOrTimeout() + case datapb.CompactionTaskState_timeout: + return t.processFailedOrTimeout() } return nil } +func (t *clusteringCompactionTask) Clean() bool { + log := log.With(zap.Int64("planID", t.GetTaskProto().GetPlanID()), zap.String("type", t.GetTaskProto().GetType().String())) + log.Info("clean task") + err := t.doClean() + if err != nil { + log.Warn("clean task fail", zap.Error(err)) + return false + } + return true +} + func (t *clusteringCompactionTask) BuildCompactionRequest() (*datapb.CompactionPlan, error) { beginLogID, _, err := t.allocator.AllocN(1) if err != nil { @@ -280,12 +297,7 @@ func (t *clusteringCompactionTask) processExecuting() error { return t.processMetaSaved() case datapb.CompactionTaskState_executing: if t.checkTimeout() { - err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_timeout)) - if err == nil { - return t.processFailedOrTimeout() - } else { - return err - } + return t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_timeout)) } return nil case datapb.CompactionTaskState_failed: @@ -516,13 +528,19 @@ func (t *clusteringCompactionTask) resetSegmentCompacting() { t.meta.SetSegmentsCompacting(t.GetTaskProto().GetInputSegments(), false) } +// Backward compatibility func (t *clusteringCompactionTask) processFailedOrTimeout() error { - log.Info("clean task", zap.Int64("triggerID", t.GetTaskProto().GetTriggerID()), zap.Int64("planID", t.GetTaskProto().GetPlanID()), zap.String("state", t.GetTaskProto().GetState().String())) + return nil +} + +func (t *clusteringCompactionTask) doClean() error { + log := log.With(zap.Int64("planID", t.GetTaskProto().GetPlanID())) + log.Info("clean task", zap.Int64("triggerID", t.GetTaskProto().GetTriggerID()), zap.String("state", t.GetTaskProto().GetState().String())) if err := t.sessions.DropCompactionPlan(t.GetTaskProto().GetNodeID(), &datapb.DropCompactionPlanRequest{ PlanID: t.GetTaskProto().GetPlanID(), }); err != nil { - log.Warn("clusteringCompactionTask processFailedOrTimeout unable to drop compaction plan", zap.Int64("planID", t.GetTaskProto().GetPlanID()), zap.Error(err)) + log.Warn("clusteringCompactionTask unable to drop compaction plan", zap.Error(err)) } isInputDropped := false for _, segID := range t.GetTaskProto().GetInputSegments() { @@ -538,8 +556,8 @@ func (t *clusteringCompactionTask) processFailedOrTimeout() error { // revert segments meta var operators []UpdateOperator // revert level of input segments - // L1 : L1 ->(processPipelining)-> L2 ->(processFailedOrTimeout)-> L1 - // L2 : L2 ->(processPipelining)-> L2 ->(processFailedOrTimeout)-> L2 + // L1 : L1 ->(process)-> L2 ->(clean)-> L1 + // L2 : L2 ->(process)-> L2 ->(clean)-> L2 for _, segID := range t.GetTaskProto().GetInputSegments() { operators = append(operators, RevertSegmentLevelOperator(segID)) } @@ -577,8 +595,6 @@ func (t *clusteringCompactionTask) processFailedOrTimeout() error { } } - t.resetSegmentCompacting() - // drop partition stats if uploaded partitionStatsInfo := &datapb.PartitionStatsInfo{ CollectionID: t.GetTaskProto().GetCollectionID(), @@ -590,9 +606,20 @@ func (t *clusteringCompactionTask) processFailedOrTimeout() error { err := t.meta.CleanPartitionStatsInfo(partitionStatsInfo) if err != nil { log.Warn("gcPartitionStatsInfo fail", zap.Error(err)) + return merr.WrapErrCleanPartitionStatsFail(fmt.Sprintf("%d-%d-%s-%d", t.GetTaskProto().GetCollectionID(), t.GetTaskProto().GetPartitionID(), t.GetTaskProto().GetChannel(), t.GetTaskProto().GetPlanID())) } - return t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_cleaned)) + err = t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_cleaned)) + if err != nil { + log.Warn("clusteringCompactionTask fail to updateAndSaveTaskMeta", zap.Error(err)) + return err + } + + // resetSegmentCompacting must be the last step of Clean, to make sure resetSegmentCompacting only called once + // otherwise, it may unlock segments locked by other compaction tasks + t.resetSegmentCompacting() + log.Info("clusteringCompactionTask clean done") + return nil } func (t *clusteringCompactionTask) doAnalyze() error { diff --git a/internal/datacoord/compaction_task_clustering_test.go b/internal/datacoord/compaction_task_clustering_test.go index ca6e78cbd05d5..7f1787131b8c1 100644 --- a/internal/datacoord/compaction_task_clustering_test.go +++ b/internal/datacoord/compaction_task_clustering_test.go @@ -109,7 +109,8 @@ func (s *ClusteringCompactionTaskSuite) TestClusteringCompactionSegmentMetaChang s.mockSessionMgr.EXPECT().Compaction(mock.Anything, mock.Anything, mock.Anything).Return(nil) task := s.generateBasicTask(false) - task.processPipelining() + err := task.processPipelining() + s.NoError(err) seg11 := s.meta.GetSegment(101) s.Equal(datapb.SegmentLevel_L1, seg11.Level) @@ -117,6 +118,34 @@ func (s *ClusteringCompactionTaskSuite) TestClusteringCompactionSegmentMetaChang s.Equal(datapb.SegmentLevel_L2, seg21.Level) s.Equal(int64(10000), seg21.PartitionStatsVersion) + task.updateAndSaveTaskMeta(setResultSegments([]int64{103, 104})) + // fake some compaction result segment + s.meta.AddSegment(context.TODO(), &SegmentInfo{ + SegmentInfo: &datapb.SegmentInfo{ + ID: 103, + State: commonpb.SegmentState_Flushed, + Level: datapb.SegmentLevel_L2, + LastLevel: datapb.SegmentLevel_L1, + CreatedByCompaction: true, + PartitionStatsVersion: 10001, + }, + }) + s.meta.AddSegment(context.TODO(), &SegmentInfo{ + SegmentInfo: &datapb.SegmentInfo{ + ID: 104, + State: commonpb.SegmentState_Flushed, + Level: datapb.SegmentLevel_L2, + LastLevel: datapb.SegmentLevel_L1, + CreatedByCompaction: true, + PartitionStatsVersion: 10001, + }, + }) + + s.mockSessionMgr.EXPECT().DropCompactionPlan(mock.Anything, mock.Anything).Return(nil) + + err = task.doClean() + s.NoError(err) + s.Run("v2.4.x", func() { // fake some compaction result segment s.meta.AddSegment(context.TODO(), &SegmentInfo{ @@ -164,6 +193,7 @@ func (s *ClusteringCompactionTaskSuite) TestClusteringCompactionSegmentMetaChang task.GetTaskProto().ResultSegments = []int64{103, 104} task.processFailedOrTimeout() + task.Clean() seg12 := s.meta.GetSegment(101) s.Equal(datapb.SegmentLevel_L1, seg12.Level) @@ -253,6 +283,7 @@ func (s *ClusteringCompactionTaskSuite) TestClusteringCompactionSegmentMetaChang task.GetTaskProto().ResultSegments = []int64{105, 106} task.processFailedOrTimeout() + task.Clean() seg12 := s.meta.GetSegment(101) s.Equal(datapb.SegmentLevel_L1, seg12.Level) @@ -336,7 +367,7 @@ func (s *ClusteringCompactionTaskSuite) TestProcessRetryLogic() { s.Equal(false, task.Process()) s.Equal(int32(3), task.GetTaskProto().RetryTimes) s.Equal(datapb.CompactionTaskState_pipelining, task.GetTaskProto().GetState()) - s.Equal(false, task.Process()) + s.True(task.Process()) s.Equal(int32(0), task.GetTaskProto().RetryTimes) s.Equal(datapb.CompactionTaskState_failed, task.GetTaskProto().GetState()) } @@ -345,7 +376,7 @@ func (s *ClusteringCompactionTaskSuite) TestProcessPipelining() { s.Run("process pipelining fail, segment not found", func() { task := s.generateBasicTask(false) task.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining)) - s.Equal(false, task.Process()) + s.True(task.Process()) s.Equal(datapb.CompactionTaskState_failed, task.GetTaskProto().GetState()) }) @@ -570,11 +601,10 @@ func (s *ClusteringCompactionTaskSuite) TestProcessExecuting() { }, }, }, nil).Once() - s.mockSessionMgr.EXPECT().DropCompactionPlan(mock.Anything, mock.Anything).Return(nil).Once() time.Sleep(time.Second * 1) - s.Equal(true, task.Process()) - s.Equal(datapb.CompactionTaskState_cleaned, task.GetTaskProto().GetState()) + s.True(task.Process()) + s.Equal(datapb.CompactionTaskState_timeout, task.GetTaskProto().GetState()) }) } @@ -675,7 +705,7 @@ func (s *ClusteringCompactionTaskSuite) TestProcessAnalyzingState() { s.Run("analyze task not found", func() { task := s.generateBasicTask(false) task.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_analyzing)) - s.False(task.Process()) + s.True(task.Process()) s.Equal(datapb.CompactionTaskState_failed, task.GetTaskProto().GetState()) }) @@ -691,7 +721,7 @@ func (s *ClusteringCompactionTaskSuite) TestProcessAnalyzingState() { State: indexpb.JobState_JobStateFailed, } s.meta.analyzeMeta.AddAnalyzeTask(t) - s.False(task.Process()) + s.True(task.Process()) s.Equal(datapb.CompactionTaskState_failed, task.GetTaskProto().GetState()) }) @@ -708,7 +738,7 @@ func (s *ClusteringCompactionTaskSuite) TestProcessAnalyzingState() { CentroidsFile: "", } s.meta.analyzeMeta.AddAnalyzeTask(t) - s.False(task.Process()) + s.True(task.Process()) s.Equal(datapb.CompactionTaskState_failed, task.GetTaskProto().GetState()) }) diff --git a/internal/datacoord/compaction_task_l0.go b/internal/datacoord/compaction_task_l0.go index c0769ff02c62b..4555f3d6cb31a 100644 --- a/internal/datacoord/compaction_task_l0.go +++ b/internal/datacoord/compaction_task_l0.go @@ -191,16 +191,15 @@ func (t *l0CompactionTask) processCompleted() bool { } func (t *l0CompactionTask) processTimeout() bool { - t.resetSegmentCompacting() - err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_cleaned)) - if err != nil { - log.Warn("l0CompactionTask failed to updateAndSaveTaskMeta", zap.Error(err)) - return false - } return true } func (t *l0CompactionTask) processFailed() bool { + return true +} + +func (t *l0CompactionTask) doClean() error { + log := log.With(zap.Int64("planID", t.GetTaskProto().GetPlanID())) if t.hasAssignedWorker() { err := t.sessions.DropCompactionPlan(t.GetTaskProto().GetNodeID(), &datapb.DropCompactionPlanRequest{ PlanID: t.GetTaskProto().GetPlanID(), @@ -210,15 +209,17 @@ func (t *l0CompactionTask) processFailed() bool { } } - t.resetSegmentCompacting() err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_cleaned)) if err != nil { log.Warn("l0CompactionTask failed to updateAndSaveTaskMeta", zap.Error(err)) - return false + return err } - log.Info("l0CompactionTask processFailed done", zap.Int64("taskID", t.GetTaskProto().GetTriggerID()), zap.Int64("planID", t.GetTaskProto().GetPlanID())) - return true + // resetSegmentCompacting must be the last step of Clean, to make sure resetSegmentCompacting only called once + // otherwise, it may unlock segments locked by other compaction tasks + t.resetSegmentCompacting() + log.Info("mixCompactionTask clean done") + return nil } func (t *l0CompactionTask) GetResult() *datapb.CompactionPlanResult { @@ -417,6 +418,11 @@ func (t *l0CompactionTask) saveSegmentMeta() error { return t.meta.UpdateSegmentsInfo(operators...) } +func (t *l0CompactionTask) Clean() bool { + err := t.doClean() + return err == nil +} + func (t *l0CompactionTask) GetSlotUsage() int64 { return t.slotUsage } diff --git a/internal/datacoord/compaction_task_l0_test.go b/internal/datacoord/compaction_task_l0_test.go index 89f478771125f..9f812d772ef4a 100644 --- a/internal/datacoord/compaction_task_l0_test.go +++ b/internal/datacoord/compaction_task_l0_test.go @@ -249,13 +249,10 @@ func (s *L0CompactionTaskSuite) TestPorcessStateTrans() { Deltalogs: deltaLogs, }} }).Twice() - s.mockMeta.EXPECT().SetSegmentsCompacting(mock.Anything, false).Return() - - s.mockSessMgr.EXPECT().DropCompactionPlan(mock.Anything, mock.Anything).Return(nil).Once() got := t.Process() s.True(got) - s.Equal(datapb.CompactionTaskState_cleaned, t.GetTaskProto().State) + s.Equal(datapb.CompactionTaskState_failed, t.GetTaskProto().State) }) s.Run("test pipelining saveTaskMeta failed", func() { s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil).Once() @@ -418,15 +415,10 @@ func (s *L0CompactionTaskSuite) TestPorcessStateTrans() { t.updateAndSaveTaskMeta(setStartTime(time.Now().Add(-time.Hour).Unix()), setTimeoutInSeconds(10)) s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil) - s.mockMeta.EXPECT().SetSegmentsCompacting(mock.Anything, false). - RunAndReturn(func(inputs []int64, compacting bool) { - s.ElementsMatch(inputs, t.GetTaskProto().GetInputSegments()) - s.False(compacting) - }).Once() got = t.Process() s.True(got) - s.Equal(datapb.CompactionTaskState_cleaned, t.GetTaskProto().GetState()) + s.Equal(datapb.CompactionTaskState_timeout, t.GetTaskProto().GetState()) }) s.Run("test executing with result executing timeout and updataAndSaveTaskMeta failed", func() { @@ -520,12 +512,10 @@ func (s *L0CompactionTaskSuite) TestPorcessStateTrans() { PlanID: t.GetTaskProto().GetPlanID(), State: datapb.CompactionTaskState_failed, }, nil).Once() - s.mockSessMgr.EXPECT().DropCompactionPlan(t.GetTaskProto().GetNodeID(), mock.Anything).Return(nil) - s.mockMeta.EXPECT().SetSegmentsCompacting(mock.Anything, false).Return().Once() got := t.Process() s.True(got) - s.Equal(datapb.CompactionTaskState_cleaned, t.GetTaskProto().GetState()) + s.Equal(datapb.CompactionTaskState_failed, t.GetTaskProto().GetState()) }) s.Run("test executing with result failed save compaction meta failed", func() { s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil).Once() @@ -550,10 +540,6 @@ func (s *L0CompactionTaskSuite) TestPorcessStateTrans() { t := s.generateTestL0Task(datapb.CompactionTaskState_timeout) t.updateAndSaveTaskMeta(setNodeID(100)) s.Require().True(t.GetTaskProto().GetNodeID() > 0) - s.mockMeta.EXPECT().SetSegmentsCompacting(mock.Anything, false).RunAndReturn(func(segIDs []int64, isCompacting bool) { - s.Require().False(isCompacting) - s.ElementsMatch(segIDs, t.GetTaskProto().GetInputSegments()) - }).Once() got := t.Process() s.True(got) @@ -627,14 +613,10 @@ func (s *L0CompactionTaskSuite) TestPorcessStateTrans() { t := s.generateTestL0Task(datapb.CompactionTaskState_failed) t.updateAndSaveTaskMeta(setNodeID(100)) s.Require().True(t.GetTaskProto().GetNodeID() > 0) - s.mockSessMgr.EXPECT().DropCompactionPlan(t.GetTaskProto().GetNodeID(), mock.Anything).Return(nil).Once() - s.mockMeta.EXPECT().SetSegmentsCompacting(mock.Anything, false).RunAndReturn(func(segIDs []int64, isCompacting bool) { - s.ElementsMatch(segIDs, t.GetTaskProto().GetInputSegments()) - }).Once() got := t.Process() s.True(got) - s.Equal(datapb.CompactionTaskState_cleaned, t.GetTaskProto().GetState()) + s.Equal(datapb.CompactionTaskState_failed, t.GetTaskProto().GetState()) }) s.Run("test process failed failed", func() { @@ -642,14 +624,10 @@ func (s *L0CompactionTaskSuite) TestPorcessStateTrans() { t := s.generateTestL0Task(datapb.CompactionTaskState_failed) t.updateAndSaveTaskMeta(setNodeID(100)) s.Require().True(t.GetTaskProto().GetNodeID() > 0) - s.mockSessMgr.EXPECT().DropCompactionPlan(t.GetTaskProto().GetNodeID(), mock.Anything).Return(errors.New("mock error")).Once() - s.mockMeta.EXPECT().SetSegmentsCompacting(mock.Anything, false).RunAndReturn(func(segIDs []int64, isCompacting bool) { - s.ElementsMatch(segIDs, t.GetTaskProto().GetInputSegments()) - }).Once() got := t.Process() s.True(got) - s.Equal(datapb.CompactionTaskState_cleaned, t.GetTaskProto().GetState()) + s.Equal(datapb.CompactionTaskState_failed, t.GetTaskProto().GetState()) }) s.Run("test unknown task", func() { diff --git a/internal/datacoord/compaction_task_mix.go b/internal/datacoord/compaction_task_mix.go index c3deaacd89def..e1867b9781e4e 100644 --- a/internal/datacoord/compaction_task_mix.go +++ b/internal/datacoord/compaction_task_mix.go @@ -70,20 +70,23 @@ func (t *mixCompactionTask) processPipelining() bool { log.Warn("mixCompactionTask failed to updateAndSaveTaskMeta", zap.Error(err)) return false } - return t.processFailed() + return true } err = t.sessions.Compaction(context.TODO(), t.GetTaskProto().GetNodeID(), t.GetPlan()) if err != nil { - log.Warn("mixCompactionTask failed to notify compaction tasks to DataNode", zap.Error(err)) - t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining), setNodeID(NullNodeID)) + log.Warn("mixCompactionTask failed to notify compaction tasks to DataNode", zap.Int64("planID", t.GetTaskProto().GetPlanID()), zap.Error(err)) + err = t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining), setNodeID(NullNodeID)) + if err != nil { + log.Warn("mixCompactionTask failed to updateAndSaveTaskMeta", zap.Error(err)) + } return false } log.Warn("mixCompactionTask notify compaction tasks to DataNode") err = t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_executing)) if err != nil { - log.Warn("mixCompactionTask update task state failed", zap.Error(err)) + log.Warn("mixCompactionTask failed to updateAndSaveTaskMeta", zap.Error(err)) return false } return false @@ -120,7 +123,7 @@ func (t *mixCompactionTask) processExecuting() bool { log.Warn("mixCompactionTask failed to updateAndSaveTaskMeta", zap.Error(err)) return false } - return t.processTimeout() + return true } case datapb.CompactionTaskState_completed: t.result = result @@ -131,8 +134,19 @@ func (t *mixCompactionTask) processExecuting() bool { log.Warn("mixCompactionTask failed to setState failed", zap.Error(err)) return false } - return t.processFailed() + return true + } + + // update the tmp segmentIDs first, revert the segments if CompleteCompactionMutation fails + resultSegmentIDs := lo.Map(result.Segments, func(segment *datapb.CompactionSegment, _ int) int64 { + return segment.GetSegmentID() + }) + err = t.updateAndSaveTaskMeta(setTmpSegments(resultSegmentIDs)) + if err != nil { + log.Warn("mixCompactionTask failed to setTmpSegments failed", zap.Error(err)) + return false } + if err := t.saveSegmentMeta(); err != nil { log.Warn("mixCompactionTask failed to save segment meta", zap.Error(err)) if errors.Is(err, merr.ErrIllegalCompactionPlan) { @@ -141,7 +155,7 @@ func (t *mixCompactionTask) processExecuting() bool { log.Warn("mixCompactionTask failed to setState failed", zap.Error(err)) return false } - return t.processFailed() + return true } return false } @@ -157,7 +171,7 @@ func (t *mixCompactionTask) processExecuting() bool { if err != nil { log.Warn("fail to updateAndSaveTaskMeta") } - return false + return true } return false } @@ -195,14 +209,14 @@ func (t *mixCompactionTask) Process() bool { processResult = t.processPipelining() case datapb.CompactionTaskState_executing: processResult = t.processExecuting() - case datapb.CompactionTaskState_timeout: - processResult = t.processTimeout() case datapb.CompactionTaskState_meta_saved: processResult = t.processMetaSaved() case datapb.CompactionTaskState_completed: processResult = t.processCompleted() case datapb.CompactionTaskState_failed: processResult = t.processFailed() + case datapb.CompactionTaskState_timeout: + processResult = t.processTimeout() } currentState := t.GetTaskProto().GetState().String() if currentState != lastState { @@ -250,16 +264,6 @@ func (t *mixCompactionTask) resetSegmentCompacting() { t.meta.SetSegmentsCompacting(t.taskProto.Load().(*datapb.CompactionTask).GetInputSegments(), false) } -func (t *mixCompactionTask) processTimeout() bool { - t.resetSegmentCompacting() - err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_cleaned)) - if err != nil { - log.Warn("mixCompactionTask failed to updateAndSaveTaskMeta", zap.Error(err)) - return false - } - return true -} - func (t *mixCompactionTask) ShadowClone(opts ...compactionTaskOpt) *datapb.CompactionTask { taskClone := proto.Clone(t.GetTaskProto()).(*datapb.CompactionTask) for _, opt := range opts { @@ -268,22 +272,40 @@ func (t *mixCompactionTask) ShadowClone(opts ...compactionTaskOpt) *datapb.Compa return taskClone } +// Backward compatibility func (t *mixCompactionTask) processFailed() bool { + return true +} + +// Backward compatibility +func (t *mixCompactionTask) processTimeout() bool { + return true +} + +func (t *mixCompactionTask) Clean() bool { + err := t.doClean() + return err == nil +} + +func (t *mixCompactionTask) doClean() error { log := log.With(zap.Int64("triggerID", t.GetTaskProto().GetTriggerID()), zap.Int64("PlanID", t.GetTaskProto().GetPlanID()), zap.Int64("collectionID", t.GetTaskProto().GetCollectionID())) if err := t.sessions.DropCompactionPlan(t.GetTaskProto().GetNodeID(), &datapb.DropCompactionPlanRequest{ PlanID: t.GetTaskProto().GetPlanID(), }); err != nil { log.Warn("mixCompactionTask processFailed unable to drop compaction plan", zap.Error(err)) + return err } - log.Info("mixCompactionTask processFailed done") - t.resetSegmentCompacting() err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_cleaned)) if err != nil { - log.Warn("mixCompactionTask failed to updateAndSaveTaskMeta", zap.Error(err)) - return false + log.Warn("mixCompactionTask fail to updateAndSaveTaskMeta", zap.Error(err)) + return err } - return true + // resetSegmentCompacting must be the last step of Clean, to make sure resetSegmentCompacting only called once + // otherwise, it may unlock segments locked by other compaction tasks + t.resetSegmentCompacting() + log.Info("mixCompactionTask clean done") + return nil } func (t *mixCompactionTask) checkTimeout() bool { diff --git a/internal/datacoord/compaction_task_mix_test.go b/internal/datacoord/compaction_task_mix_test.go index 00230dd658849..a2860122e7385 100644 --- a/internal/datacoord/compaction_task_mix_test.go +++ b/internal/datacoord/compaction_task_mix_test.go @@ -107,7 +107,6 @@ func (s *MixCompactionTaskSuite) TestCompactionTimeout() { }} }).Times(2) s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil) - s.mockMeta.EXPECT().SetSegmentsCompacting(mock.Anything, mock.Anything) alloc := allocator.NewMockAllocator(s.T()) alloc.EXPECT().AllocN(mock.Anything).Return(100, 200, nil) task := newMixCompactionTask(&datapb.CompactionTask{ @@ -132,5 +131,7 @@ func (s *MixCompactionTaskSuite) TestCompactionTimeout() { }, nil) end := task.processExecuting() s.Equal(true, end) - s.Equal(datapb.CompactionTaskState_cleaned, task.GetTaskProto().State) + s.Equal(datapb.CompactionTaskState_timeout, task.GetTaskProto().State) + end = task.processTimeout() + s.Equal(true, end) } diff --git a/internal/datacoord/compaction_test.go b/internal/datacoord/compaction_test.go index 009cf855178ef..a15ac35231b57 100644 --- a/internal/datacoord/compaction_test.go +++ b/internal/datacoord/compaction_test.go @@ -26,6 +26,7 @@ import ( "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" + "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/datacoord/allocator" "github.com/milvus-io/milvus/internal/datacoord/session" "github.com/milvus-io/milvus/internal/metastore/kv/binlog" @@ -49,6 +50,7 @@ type CompactionPlanHandlerSuite struct { mockSessMgr *session.MockDataNodeManager handler *compactionPlanHandler cluster Cluster + mockHandler *NMockHandler } func (s *CompactionPlanHandlerSuite) SetupTest() { @@ -58,6 +60,8 @@ func (s *CompactionPlanHandlerSuite) SetupTest() { s.mockSessMgr = session.NewMockDataNodeManager(s.T()) s.cluster = NewMockCluster(s.T()) s.handler = newCompactionPlanHandler(s.cluster, s.mockSessMgr, s.mockCm, s.mockMeta, s.mockAlloc, nil, nil) + s.mockHandler = NewNMockHandler(s.T()) + s.mockHandler.EXPECT().GetCollection(mock.Anything, mock.Anything).Return(&collectionInfo{}, nil).Maybe() } func (s *CompactionPlanHandlerSuite) TestScheduleEmpty() { @@ -865,6 +869,179 @@ func (s *CompactionPlanHandlerSuite) TestProcessCompleteCompaction() { s.NoError(err) } +func (s *CompactionPlanHandlerSuite) TestCleanCompaction() { + s.SetupTest() + + tests := []struct { + task CompactionTask + }{ + { + newMixCompactionTask( + &datapb.CompactionTask{ + PlanID: 1, + TriggerID: 1, + Type: datapb.CompactionType_MixCompaction, + State: datapb.CompactionTaskState_failed, + NodeID: 1, + InputSegments: []UniqueID{1, 2}, + }, + nil, s.mockMeta, s.mockSessMgr), + }, + { + newL0CompactionTask(&datapb.CompactionTask{ + PlanID: 1, + TriggerID: 1, + Type: datapb.CompactionType_Level0DeleteCompaction, + State: datapb.CompactionTaskState_failed, + NodeID: 1, + InputSegments: []UniqueID{1, 2}, + }, + nil, s.mockMeta, s.mockSessMgr), + }, + } + for _, test := range tests { + task := test.task + s.mockMeta.EXPECT().SetSegmentsCompacting(mock.Anything, mock.Anything).Return().Once() + s.mockSessMgr.EXPECT().DropCompactionPlan(mock.Anything, mock.Anything).Return(nil) + s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil) + s.mockSessMgr.EXPECT().DropCompactionPlan(mock.Anything, mock.Anything).Return(nil) + + s.handler.executingTasks[1] = task + s.Equal(1, len(s.handler.executingTasks)) + err := s.handler.checkCompaction() + s.NoError(err) + s.Equal(0, len(s.handler.executingTasks)) + s.Equal(1, len(s.handler.cleaningTasks)) + s.handler.cleanFailedTasks() + s.Equal(0, len(s.handler.cleaningTasks)) + } +} + +func (s *CompactionPlanHandlerSuite) TestCleanClusteringCompaction() { + s.SetupTest() + + task := newClusteringCompactionTask( + &datapb.CompactionTask{ + PlanID: 1, + TriggerID: 1, + CollectionID: 1001, + Type: datapb.CompactionType_ClusteringCompaction, + State: datapb.CompactionTaskState_failed, + NodeID: 1, + InputSegments: []UniqueID{1, 2}, + }, + nil, s.mockMeta, s.mockSessMgr, s.mockHandler, nil) + s.mockMeta.EXPECT().GetHealthySegment(mock.Anything).Return(nil) + s.mockMeta.EXPECT().SetSegmentsCompacting(mock.Anything, mock.Anything).Return().Once() + s.mockMeta.EXPECT().UpdateSegmentsInfo(mock.Anything, mock.Anything).Return(nil) + s.mockMeta.EXPECT().CleanPartitionStatsInfo(mock.Anything).Return(nil) + s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil) + s.mockSessMgr.EXPECT().DropCompactionPlan(mock.Anything, mock.Anything).Return(nil) + + s.handler.executingTasks[1] = task + s.Equal(1, len(s.handler.executingTasks)) + s.handler.checkCompaction() + s.Equal(0, len(s.handler.executingTasks)) + s.Equal(1, len(s.handler.cleaningTasks)) + s.handler.cleanFailedTasks() + s.Equal(0, len(s.handler.cleaningTasks)) +} + +func (s *CompactionPlanHandlerSuite) TestCleanClusteringCompactionCommitFail() { + s.SetupTest() + + task := newClusteringCompactionTask(&datapb.CompactionTask{ + PlanID: 1, + TriggerID: 1, + CollectionID: 1001, + Channel: "ch-1", + Type: datapb.CompactionType_ClusteringCompaction, + State: datapb.CompactionTaskState_executing, + NodeID: 1, + InputSegments: []UniqueID{1, 2}, + ClusteringKeyField: &schemapb.FieldSchema{ + FieldID: 100, + Name: Int64Field, + IsPrimaryKey: true, + DataType: schemapb.DataType_Int64, + AutoID: true, + IsClusteringKey: true, + }, + }, + nil, s.mockMeta, s.mockSessMgr, s.mockHandler, nil) + + s.mockMeta.EXPECT().GetHealthySegment(mock.Anything).Return(nil) + s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil) + s.mockSessMgr.EXPECT().GetCompactionPlanResult(UniqueID(1), int64(1)).Return( + &datapb.CompactionPlanResult{ + PlanID: 1, + State: datapb.CompactionTaskState_completed, + Segments: []*datapb.CompactionSegment{ + { + PlanID: 1, + SegmentID: 101, + }, + }, + }, nil).Once() + s.mockMeta.EXPECT().CompleteCompactionMutation(mock.Anything, mock.Anything).Return(nil, nil, errors.New("mock error")) + s.mockSessMgr.EXPECT().DropCompactionPlan(mock.Anything, mock.Anything).Return(nil) + s.handler.executingTasks[1] = task + s.Equal(1, len(s.handler.executingTasks)) + s.handler.checkCompaction() + s.Equal(0, len(task.GetTaskProto().GetResultSegments())) + + s.Equal(datapb.CompactionTaskState_failed, task.GetTaskProto().GetState()) + s.Equal(0, len(s.handler.executingTasks)) + s.Equal(1, len(s.handler.cleaningTasks)) + + s.mockMeta.EXPECT().SetSegmentsCompacting(mock.Anything, mock.Anything).Return().Once() + s.mockMeta.EXPECT().UpdateSegmentsInfo(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) + s.mockMeta.EXPECT().CleanPartitionStatsInfo(mock.Anything).Return(nil) + s.handler.cleanFailedTasks() + s.Equal(0, len(s.handler.cleaningTasks)) +} + +// test compactionHandler should keep clean the failed task until it become cleaned +func (s *CompactionPlanHandlerSuite) TestKeepClean() { + s.SetupTest() + + tests := []struct { + task CompactionTask + }{ + { + newClusteringCompactionTask(&datapb.CompactionTask{ + PlanID: 1, + TriggerID: 1, + Type: datapb.CompactionType_ClusteringCompaction, + State: datapb.CompactionTaskState_failed, + NodeID: 1, + InputSegments: []UniqueID{1, 2}, + }, + nil, s.mockMeta, s.mockSessMgr, s.mockHandler, nil), + }, + } + for _, test := range tests { + task := test.task + s.mockMeta.EXPECT().GetHealthySegment(mock.Anything).Return(nil) + s.mockMeta.EXPECT().SetSegmentsCompacting(mock.Anything, mock.Anything).Return() + s.mockMeta.EXPECT().UpdateSegmentsInfo(mock.Anything, mock.Anything).Return(nil) + s.mockMeta.EXPECT().CleanPartitionStatsInfo(mock.Anything).Return(errors.New("mock error")).Once() + s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil) + s.mockSessMgr.EXPECT().DropCompactionPlan(mock.Anything, mock.Anything).Return(nil) + + s.handler.executingTasks[1] = task + s.Equal(1, len(s.handler.executingTasks)) + s.handler.checkCompaction() + s.Equal(0, len(s.handler.executingTasks)) + s.Equal(1, len(s.handler.cleaningTasks)) + s.handler.cleanFailedTasks() + s.Equal(1, len(s.handler.cleaningTasks)) + s.mockMeta.EXPECT().CleanPartitionStatsInfo(mock.Anything).Return(nil).Once() + s.handler.cleanFailedTasks() + s.Equal(0, len(s.handler.cleaningTasks)) + } +} + func getFieldBinlogIDs(fieldID int64, logIDs ...int64) *datapb.FieldBinlog { l := &datapb.FieldBinlog{ FieldID: fieldID, diff --git a/internal/datacoord/partition_stats_meta.go b/internal/datacoord/partition_stats_meta.go index 27009992cecd8..c6c0f059c98ba 100644 --- a/internal/datacoord/partition_stats_meta.go +++ b/internal/datacoord/partition_stats_meta.go @@ -151,13 +151,38 @@ func (psm *partitionStatsMeta) DropPartitionStatsInfo(info *datapb.PartitionStat if len(psm.partitionStatsInfos[info.GetVChannel()]) == 0 { delete(psm.partitionStatsInfos, info.GetVChannel()) } + // if the dropping partitionStats is the current version, should update currentPartitionStats + currentVersion := psm.innerGetCurrentPartitionStatsVersion(info.GetCollectionID(), info.GetPartitionID(), info.GetVChannel()) + if currentVersion == info.GetVersion() && currentVersion != emptyPartitionStatsVersion { + err := psm.catalog.DropCurrentPartitionStatsVersion(psm.ctx, info.GetCollectionID(), info.GetPartitionID(), info.GetVChannel()) + if err != nil { + return err + } + infos := psm.partitionStatsInfos[info.GetVChannel()][info.GetPartitionID()].infos + delete(infos, currentVersion) + if len(infos) > 0 { + var maxVersion int64 = 0 + for version := range infos { + if version > maxVersion { + maxVersion = version + } + } + err := psm.innerSaveCurrentPartitionStatsVersion(info.GetCollectionID(), info.GetPartitionID(), info.GetVChannel(), maxVersion) + if err != nil { + return err + } + } + } return nil } func (psm *partitionStatsMeta) SaveCurrentPartitionStatsVersion(collectionID, partitionID int64, vChannel string, currentPartitionStatsVersion int64) error { psm.Lock() defer psm.Unlock() + return psm.innerSaveCurrentPartitionStatsVersion(collectionID, partitionID, vChannel, currentPartitionStatsVersion) +} +func (psm *partitionStatsMeta) innerSaveCurrentPartitionStatsVersion(collectionID, partitionID int64, vChannel string, currentPartitionStatsVersion int64) error { log.Info("update current partition stats version", zap.Int64("collectionID", collectionID), zap.Int64("partitionID", partitionID), zap.String("vChannel", vChannel), zap.Int64("currentPartitionStatsVersion", currentPartitionStatsVersion)) @@ -182,7 +207,10 @@ func (psm *partitionStatsMeta) SaveCurrentPartitionStatsVersion(collectionID, pa func (psm *partitionStatsMeta) GetCurrentPartitionStatsVersion(collectionID, partitionID int64, vChannel string) int64 { psm.RLock() defer psm.RUnlock() + return psm.innerGetCurrentPartitionStatsVersion(collectionID, partitionID, vChannel) +} +func (psm *partitionStatsMeta) innerGetCurrentPartitionStatsVersion(collectionID, partitionID int64, vChannel string) int64 { if _, ok := psm.partitionStatsInfos[vChannel]; !ok { return emptyPartitionStatsVersion } diff --git a/internal/datacoord/partition_stats_meta_test.go b/internal/datacoord/partition_stats_meta_test.go index 2a124a6471c65..696373caaee5b 100644 --- a/internal/datacoord/partition_stats_meta_test.go +++ b/internal/datacoord/partition_stats_meta_test.go @@ -87,3 +87,62 @@ func (s *PartitionStatsMetaSuite) TestGetPartitionStats() { currentVersion4 := partitionStatsMeta.GetCurrentPartitionStatsVersion(1, 2, "ch-1") s.Equal(int64(100), currentVersion4) } + +func (s *PartitionStatsMetaSuite) TestDropPartitionStats() { + ctx := context.Background() + partitionStatsMeta, err := newPartitionStatsMeta(ctx, s.catalog) + s.NoError(err) + collectionID := int64(1) + partitionID := int64(2) + channel := "ch-1" + s.catalog.EXPECT().DropPartitionStatsInfo(mock.Anything, mock.Anything).Return(nil) + s.catalog.EXPECT().SaveCurrentPartitionStatsVersion(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) + s.catalog.EXPECT().DropCurrentPartitionStatsVersion(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) + + partitionStats := []*datapb.PartitionStatsInfo{ + { + CollectionID: collectionID, + PartitionID: partitionID, + VChannel: channel, + SegmentIDs: []int64{100000}, + Version: 100, + }, + { + CollectionID: collectionID, + PartitionID: partitionID, + VChannel: channel, + SegmentIDs: []int64{100000}, + Version: 101, + }, + { + CollectionID: collectionID, + PartitionID: partitionID, + VChannel: channel, + SegmentIDs: []int64{100000}, + Version: 102, + }, + } + for _, partitionStats := range partitionStats { + partitionStatsMeta.SavePartitionStatsInfo(partitionStats) + } + partitionStatsMeta.SaveCurrentPartitionStatsVersion(collectionID, partitionID, channel, 102) + version := partitionStatsMeta.GetCurrentPartitionStatsVersion(collectionID, partitionID, channel) + s.Equal(int64(102), version) + + err = partitionStatsMeta.DropPartitionStatsInfo(partitionStats[2]) + s.NoError(err) + s.Equal(2, len(partitionStatsMeta.partitionStatsInfos[channel][partitionID].infos)) + version2 := partitionStatsMeta.GetCurrentPartitionStatsVersion(collectionID, partitionID, channel) + s.Equal(int64(101), version2) + + err = partitionStatsMeta.DropPartitionStatsInfo(partitionStats[1]) + s.Equal(1, len(partitionStatsMeta.partitionStatsInfos[channel][partitionID].infos)) + version3 := partitionStatsMeta.GetCurrentPartitionStatsVersion(collectionID, partitionID, channel) + s.Equal(int64(100), version3) + + err = partitionStatsMeta.DropPartitionStatsInfo(partitionStats[0]) + s.NoError(err) + s.Nil(partitionStatsMeta.partitionStatsInfos[channel][partitionID]) + version4 := partitionStatsMeta.GetCurrentPartitionStatsVersion(collectionID, partitionID, channel) + s.Equal(emptyPartitionStatsVersion, version4) +} diff --git a/pkg/util/merr/errors.go b/pkg/util/merr/errors.go index 30bd26ec49f8f..b43041bc3b1c9 100644 --- a/pkg/util/merr/errors.go +++ b/pkg/util/merr/errors.go @@ -210,6 +210,7 @@ var ( ErrGetCompactionPlanResultFail = newMilvusError("fail to get compaction plan", 2313, true) ErrCompactionResult = newMilvusError("illegal compaction results", 2314, false) ErrDuplicatedCompactionTask = newMilvusError("duplicated compaction task", 2315, false) + ErrCleanPartitionStatsFail = newMilvusError("fail to clean partition Stats", 2316, true) ErrDataNodeSlotExhausted = newMilvusError("datanode slot exhausted", 2401, false) diff --git a/pkg/util/merr/utils.go b/pkg/util/merr/utils.go index 2ad30564e570b..de68e12836409 100644 --- a/pkg/util/merr/utils.go +++ b/pkg/util/merr/utils.go @@ -1177,6 +1177,14 @@ func WrapErrClusteringCompactionMetaError(operation string, err error) error { return wrapFieldsWithDesc(ErrClusteringCompactionMetaError, err.Error(), value("operation", operation)) } +func WrapErrCleanPartitionStatsFail(msg ...string) error { + err := error(ErrCleanPartitionStatsFail) + if len(msg) > 0 { + err = errors.Wrap(err, strings.Join(msg, "->")) + } + return err +} + func WrapErrAnalyzeTaskNotFound(id int64) error { return wrapFields(ErrAnalyzeTaskNotFound, value("analyzeId", id)) }