From 4e405a11aa35dae4cf595a139a6d031ae0055ae5 Mon Sep 17 00:00:00 2001 From: yangxuan Date: Mon, 6 May 2024 17:56:14 +0800 Subject: [PATCH] enhance: Add EnableLevelZeroCompaction config Enabled by default and to make it dynamic, when disabled: 1. Skip to trigger L0 compaction 2. Check and skip submitting L0 compaction when enqueuePlans 3. Check and clear pipelining L0 compaction when schedule 4. Discard executing L0 compaction plans in DN Signed-off-by: yangxuan --- internal/datacoord/compaction.go | 46 +++++++++++-- internal/datacoord/compaction_test.go | 64 +++++++++++++++++-- internal/datacoord/compaction_trigger_v2.go | 3 +- internal/datacoord/compaction_view_manager.go | 11 +++- internal/datanode/services.go | 3 + pkg/util/paramtable/component_param.go | 11 +++- 6 files changed, 123 insertions(+), 15 deletions(-) diff --git a/internal/datacoord/compaction.go b/internal/datacoord/compaction.go index 6e5e50a95019c..dbbe9391ef5ec 100644 --- a/internal/datacoord/compaction.go +++ b/internal/datacoord/compaction.go @@ -34,6 +34,7 @@ import ( "github.com/milvus-io/milvus/pkg/tracer" "github.com/milvus-io/milvus/pkg/util/conc" "github.com/milvus-io/milvus/pkg/util/merr" + "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/tsoutil" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -165,6 +166,19 @@ func (c *compactionPlanHandler) GetCurrentTS() (Timestamp, error) { } func (c *compactionPlanHandler) schedule() { + if !paramtable.Get().DataCoordCfg.EnableLevelZeroCompaction.GetAsBool() { + // remove pipelining L0 tasks + pipeliningTasks := c.getTasksByState(pipelining) + for _, task := range pipeliningTasks { + if task.plan.GetType() == datapb.CompactionType_Level0DeleteCompaction { + c.updateTask(task.plan.PlanID, setState(failed), endSpan()) + c.setSegmentsCompacting(task.plan, true) + c.scheduler.Finish(task.dataNodeID, task.plan) + log.Warn("Discard LevelZeore compaction plans for LevelZero compaction disabled", zap.Int64("planID", task.plan.GetPlanID())) + } + } + } + // schedule queuing tasks tasks := c.scheduler.Schedule() if len(tasks) > 0 { @@ -277,6 +291,12 @@ func (c *compactionPlanHandler) updateTask(planID int64, opts ...compactionTaskO } func (c *compactionPlanHandler) enqueuePlan(signal *compactionSignal, plan *datapb.CompactionPlan) error { + if !paramtable.Get().DataCoordCfg.EnableLevelZeroCompaction.GetAsBool() && + plan.GetType() == datapb.CompactionType_Level0DeleteCompaction { + log.Error("failed to enqueuePlan, level zero compaction disabled", zap.Int64("planID", plan.GetPlanID())) + return errors.New("levelzero compaction is not enabled") + } + nodeID, err := c.chManager.FindWatcher(plan.GetChannel()) if err != nil { log.Error("failed to find watcher", zap.Int64("planID", plan.GetPlanID()), zap.Error(err)) @@ -549,6 +569,7 @@ func (c *compactionPlanHandler) updateCompaction(ts Timestamp) error { if nodePlan, ok := planStates[planID]; ok { planResult := nodePlan.B + switch planResult.GetState() { case commonpb.CompactionState_Completed: log.Info("start to complete compaction") @@ -575,6 +596,20 @@ func (c *compactionPlanHandler) updateCompaction(ts Timestamp) error { } case commonpb.CompactionState_Executing: + // Discard l0 plans executing in DataNode when disabled level zero compaction + if !paramtable.Get().DataCoordCfg.EnableLevelZeroCompaction.GetAsBool() && + task.plan.GetType() == datapb.CompactionType_Level0DeleteCompaction { + log.Warn("compaction failed for level zero compaction disabled") + if err := c.sessions.SyncSegments(task.dataNodeID, &datapb.SyncSegmentsRequest{PlanID: planID}); err != nil { + log.Warn("compaction failed to sync segments with node", zap.Error(err)) + continue + } + c.plans[planID] = c.plans[planID].shadowClone(setState(failed), endSpan()) + c.setSegmentsCompacting(task.plan, false) + c.scheduler.Finish(task.dataNodeID, task.plan) + continue + } + if c.isTimeout(ts, task.plan.GetStartTime(), task.plan.GetTimeoutInSeconds()) { log.Warn("compaction timeout", zap.Int32("timeout in seconds", task.plan.GetTimeoutInSeconds()), @@ -624,11 +659,12 @@ func (c *compactionPlanHandler) updateCompaction(ts Timestamp) error { return planState.B.GetState() == commonpb.CompactionState_Completed }) - unkonwnPlansInWorker, _ := lo.Difference(lo.Keys(completedPlans), cachedPlans) - for _, planID := range unkonwnPlansInWorker { - if nodeUnkonwnPlan, ok := completedPlans[planID]; ok { - nodeID, plan := nodeUnkonwnPlan.A, nodeUnkonwnPlan.B - log := log.With(zap.Int64("planID", planID), zap.Int64("nodeID", nodeID), zap.String("channel", plan.GetChannel())) + unknownPlansInWorker, _ := lo.Difference(lo.Keys(completedPlans), cachedPlans) + + for _, planID := range unknownPlansInWorker { + if nodeUnknownPlan, ok := completedPlans[planID]; ok { + nodeID, plan := nodeUnknownPlan.A, nodeUnknownPlan.B + log := log.With(zap.Int64("planID", planID), zap.String("type", plan.GetType().String()), zap.Int64("nodeID", nodeID), zap.String("channel", plan.GetChannel())) // Sync segments without CompactionFrom segmentsIDs to make sure DN clear the task // without changing the meta diff --git a/internal/datacoord/compaction_test.go b/internal/datacoord/compaction_test.go index 285e88de7810a..727b06e02d034 100644 --- a/internal/datacoord/compaction_test.go +++ b/internal/datacoord/compaction_test.go @@ -29,6 +29,7 @@ import ( "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/metautil" + "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/tsoutil" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -55,6 +56,40 @@ func (s *CompactionPlanHandlerSuite) SetupTest() { s.mockSessMgr = NewMockSessionManager(s.T()) } +func (s *CompactionPlanHandlerSuite) TestSchedule() { + tests := []struct { + enableConfig string + + expectedTasksCount int + description string + }{ + {"true", 3, "enable"}, + {"false", 2, "disabled"}, + } + + s.mockSch.EXPECT().Finish(mock.Anything, mock.Anything).Maybe() + s.mockSch.EXPECT().Schedule().Return(nil).Maybe() + enableLZCKey := paramtable.Get().DataCoordCfg.EnableLevelZeroCompaction.Key + defer paramtable.Get().Reset(enableLZCKey) + for _, test := range tests { + s.Run(test.description, func() { + handler := newCompactionPlanHandler(nil, nil, nil, nil) + handler.scheduler = s.mockSch + handler.plans = map[int64]*compactionTask{ + 1: {state: pipelining, plan: &datapb.CompactionPlan{PlanID: 1, Type: datapb.CompactionType_MixCompaction}}, + 2: {state: pipelining, plan: &datapb.CompactionPlan{PlanID: 2, Type: datapb.CompactionType_MixCompaction}}, + 3: {state: pipelining, plan: &datapb.CompactionPlan{PlanID: 2, Type: datapb.CompactionType_Level0DeleteCompaction}}, + } + + paramtable.Get().Save(enableLZCKey, test.enableConfig) + handler.schedule() + + tasks := handler.getTasksByState(pipelining) + s.Equal(test.expectedTasksCount, len(tasks)) + }) + } +} + func (s *CompactionPlanHandlerSuite) TestRemoveTasksByChannel() { s.mockSch.EXPECT().Finish(mock.Anything, mock.Anything).Return().Once() handler := newCompactionPlanHandler(nil, nil, nil, nil) @@ -440,13 +475,17 @@ func (s *CompactionPlanHandlerSuite) TestExecCompactionPlan() { }).Twice() s.mockSch.EXPECT().Submit(mock.Anything).Return().Once() + enableLZCKey := paramtable.Get().DataCoordCfg.EnableLevelZeroCompaction.Key + defer paramtable.Get().Reset(enableLZCKey) tests := []struct { - description string - channel string - hasError bool + description string + channel string + enalbleL0CompactionV string + hasError bool }{ - {"channel with error", "ch-1", true}, - {"channel with no error", "ch-2", false}, + {"channel with error", "ch-1", "true", true}, + {"channel with no error", "ch-2", "true", false}, + {"disable l0 compaction", "ch-2", "false", true}, } handler := newCompactionPlanHandler(s.mockSessMgr, s.mockCm, s.mockMeta, s.mockAlloc) @@ -456,9 +495,11 @@ func (s *CompactionPlanHandlerSuite) TestExecCompactionPlan() { sig := &compactionSignal{id: int64(idx)} plan := &datapb.CompactionPlan{ PlanID: int64(idx), + Type: datapb.CompactionType_Level0DeleteCompaction, } s.Run(test.description, func() { plan.Channel = test.channel + paramtable.Get().Save(enableLZCKey, test.enalbleL0CompactionV) err := handler.execCompactionPlan(sig, plan) if test.hasError { @@ -688,6 +729,7 @@ func (s *CompactionPlanHandlerSuite) TestUpdateCompaction() { 3: {A: 111, B: &datapb.CompactionPlanResult{PlanID: 3, State: commonpb.CompactionState_Executing}}, 5: {A: 222, B: &datapb.CompactionPlanResult{PlanID: 5, State: commonpb.CompactionState_Completed, Segments: []*datapb.CompactionSegment{{PlanID: 5}}}}, 6: {A: 111, B: &datapb.CompactionPlanResult{Channel: "ch-2", PlanID: 5, State: commonpb.CompactionState_Completed, Segments: []*datapb.CompactionSegment{{PlanID: 6}}}}, + 7: {A: 111, B: &datapb.CompactionPlanResult{Channel: "ch-2", PlanID: 7, State: commonpb.CompactionState_Executing, Segments: []*datapb.CompactionSegment{{PlanID: 7}}}}, }, nil) inPlans := map[int64]*compactionTask{ @@ -721,6 +763,12 @@ func (s *CompactionPlanHandlerSuite) TestUpdateCompaction() { state: executing, dataNodeID: 111, }, + 7: { + triggerInfo: &compactionSignal{}, + plan: &datapb.CompactionPlan{PlanID: 7, Channel: "ch-1", Type: datapb.CompactionType_Level0DeleteCompaction}, + state: executing, + dataNodeID: 111, + }, } s.mockSessMgr.EXPECT().SyncSegments(int64(222), mock.Anything).RunAndReturn(func(nodeID int64, req *datapb.SyncSegmentsRequest) error { @@ -734,6 +782,9 @@ func (s *CompactionPlanHandlerSuite) TestUpdateCompaction() { s.mockCm.EXPECT().Match(int64(111), "ch-1").Return(true) s.mockCm.EXPECT().Match(int64(111), "ch-2").Return(false).Once() + paramtable.Get().Save(paramtable.Get().DataCoordCfg.EnableLevelZeroCompaction.Key, "false") + defer paramtable.Get().Reset(paramtable.Get().DataCoordCfg.EnableLevelZeroCompaction.Key) + handler := newCompactionPlanHandler(s.mockSessMgr, s.mockCm, s.mockMeta, s.mockAlloc) handler.plans = inPlans @@ -757,6 +808,9 @@ func (s *CompactionPlanHandlerSuite) TestUpdateCompaction() { task = handler.plans[6] s.Equal(failed, task.state) + + task = handler.plans[7] + s.Equal(failed, task.state) } func getFieldBinlogIDs(fieldID int64, logIDs ...int64) *datapb.FieldBinlog { diff --git a/internal/datacoord/compaction_trigger_v2.go b/internal/datacoord/compaction_trigger_v2.go index 1ba9c1d9ef4aa..c2e0e42cf9dce 100644 --- a/internal/datacoord/compaction_trigger_v2.go +++ b/internal/datacoord/compaction_trigger_v2.go @@ -103,11 +103,12 @@ func (m *CompactionTriggerManager) SubmitL0ViewToScheduler(taskID int64, outView // TODO, remove handler, use scheduler // m.scheduler.Submit(plan) - m.handler.execCompactionPlan(signal, plan) + err := m.handler.execCompactionPlan(signal, plan) log.Info("Finish to submit a LevelZeroCompaction plan", zap.Int64("taskID", taskID), zap.Int64("planID", plan.GetPlanID()), zap.String("type", plan.GetType().String()), + zap.Error(err), ) } diff --git a/internal/datacoord/compaction_view_manager.go b/internal/datacoord/compaction_view_manager.go index 373044f77e230..b0c06a909cc07 100644 --- a/internal/datacoord/compaction_view_manager.go +++ b/internal/datacoord/compaction_view_manager.go @@ -12,6 +12,7 @@ import ( "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/logutil" + "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -88,7 +89,9 @@ func (m *CompactionViewManager) checkLoop() { log.Info("Compaction View checkLoop quit") return case <-checkTicker.C: - refreshViewsAndTrigger(context.Background()) + if paramtable.Get().DataCoordCfg.EnableLevelZeroCompaction.GetAsBool() { + refreshViewsAndTrigger(context.Background()) + } case <-idleTicker.C: // idelTicker will be reset everytime when Check's able to @@ -96,8 +99,10 @@ func (m *CompactionViewManager) checkLoop() { // if no views are freshed, try to get cached views and trigger a // TriggerTypeViewIDLE event - if !refreshViewsAndTrigger(context.Background()) { - m.triggerEventForIDLEView() + if paramtable.Get().DataCoordCfg.EnableLevelZeroCompaction.GetAsBool() { + if !refreshViewsAndTrigger(context.Background()) { + m.triggerEventForIDLEView() + } } } } diff --git a/internal/datanode/services.go b/internal/datanode/services.go index 1b66f157f9770..0dd050fc7480f 100644 --- a/internal/datanode/services.go +++ b/internal/datanode/services.go @@ -302,6 +302,9 @@ func (node *DataNode) SyncSegments(ctx context.Context, req *datapb.SyncSegments if len(req.GetCompactedFrom()) <= 0 { log.Info("SyncSegments with empty compactedFrom, clearing the plan") + // remove from executing + node.compactionExecutor.stopTask(req.GetPlanID()) + // remove from completing node.compactionExecutor.injectDone(req.GetPlanID()) return merr.Success(), nil } diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index a19e23a463dbf..65585a4b60fc4 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -2663,6 +2663,7 @@ type dataCoordConfig struct { // LevelZero Segment EnableLevelZeroSegment ParamItem `refreshable:"false"` + EnableLevelZeroCompaction ParamItem `refreshable:"true"` LevelZeroCompactionTriggerMinSize ParamItem `refreshable:"true"` LevelZeroCompactionTriggerMaxSize ParamItem `refreshable:"true"` LevelZeroCompactionTriggerDeltalogMinNum ParamItem `refreshable:"true"` @@ -3004,11 +3005,19 @@ During compaction, the size of segment # of rows is able to exceed segment max # p.EnableLevelZeroSegment = ParamItem{ Key: "dataCoord.segment.enableLevelZero", Version: "2.4.0", - Doc: "Whether to enable LevelZeroCompaction", + Doc: "Whether to enable LevelZero Segment", DefaultValue: "true", } p.EnableLevelZeroSegment.Init(base.mgr) + p.EnableLevelZeroCompaction = ParamItem{ + Key: "dataCoord.compaction.enableLevelZeroCompaction", + Version: "2.4.0", + Doc: "Whether to enable LevelZeroCompaction", + DefaultValue: "true", + } + p.EnableLevelZeroCompaction.Init(base.mgr) + p.LevelZeroCompactionTriggerMinSize = ParamItem{ Key: "dataCoord.compaction.levelzero.forceTrigger.minSize", Version: "2.4.0",