Skip to content

Commit

Permalink
enhance: Add EnableLevelZeroCompaction config
Browse files Browse the repository at this point in the history
Enabled by default and to make it dynamic, when disabled:

1. Skip to trigger L0 compaction
2. Check and skip submitting L0 compaction when enqueuePlans
3. Check and clear pipelining L0 compaction when schedule
4. Discard executing L0 compaction plans in DN

Signed-off-by: yangxuan <[email protected]>
  • Loading branch information
XuanYang-cn committed May 8, 2024
1 parent 17a79f4 commit 4e405a1
Show file tree
Hide file tree
Showing 6 changed files with 123 additions and 15 deletions.
46 changes: 41 additions & 5 deletions internal/datacoord/compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/milvus-io/milvus/pkg/tracer"
"github.com/milvus-io/milvus/pkg/util/conc"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/tsoutil"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
Expand Down Expand Up @@ -165,6 +166,19 @@ func (c *compactionPlanHandler) GetCurrentTS() (Timestamp, error) {
}

func (c *compactionPlanHandler) schedule() {
if !paramtable.Get().DataCoordCfg.EnableLevelZeroCompaction.GetAsBool() {
// remove pipelining L0 tasks
pipeliningTasks := c.getTasksByState(pipelining)
for _, task := range pipeliningTasks {
if task.plan.GetType() == datapb.CompactionType_Level0DeleteCompaction {
c.updateTask(task.plan.PlanID, setState(failed), endSpan())
c.setSegmentsCompacting(task.plan, true)
c.scheduler.Finish(task.dataNodeID, task.plan)
log.Warn("Discard LevelZeore compaction plans for LevelZero compaction disabled", zap.Int64("planID", task.plan.GetPlanID()))
}
}
}

// schedule queuing tasks
tasks := c.scheduler.Schedule()
if len(tasks) > 0 {
Expand Down Expand Up @@ -277,6 +291,12 @@ func (c *compactionPlanHandler) updateTask(planID int64, opts ...compactionTaskO
}

func (c *compactionPlanHandler) enqueuePlan(signal *compactionSignal, plan *datapb.CompactionPlan) error {
if !paramtable.Get().DataCoordCfg.EnableLevelZeroCompaction.GetAsBool() &&
plan.GetType() == datapb.CompactionType_Level0DeleteCompaction {
log.Error("failed to enqueuePlan, level zero compaction disabled", zap.Int64("planID", plan.GetPlanID()))
return errors.New("levelzero compaction is not enabled")
}

nodeID, err := c.chManager.FindWatcher(plan.GetChannel())
if err != nil {
log.Error("failed to find watcher", zap.Int64("planID", plan.GetPlanID()), zap.Error(err))
Expand Down Expand Up @@ -549,6 +569,7 @@ func (c *compactionPlanHandler) updateCompaction(ts Timestamp) error {

if nodePlan, ok := planStates[planID]; ok {
planResult := nodePlan.B

switch planResult.GetState() {
case commonpb.CompactionState_Completed:
log.Info("start to complete compaction")
Expand All @@ -575,6 +596,20 @@ func (c *compactionPlanHandler) updateCompaction(ts Timestamp) error {
}

case commonpb.CompactionState_Executing:
// Discard l0 plans executing in DataNode when disabled level zero compaction
if !paramtable.Get().DataCoordCfg.EnableLevelZeroCompaction.GetAsBool() &&
task.plan.GetType() == datapb.CompactionType_Level0DeleteCompaction {
log.Warn("compaction failed for level zero compaction disabled")
if err := c.sessions.SyncSegments(task.dataNodeID, &datapb.SyncSegmentsRequest{PlanID: planID}); err != nil {
log.Warn("compaction failed to sync segments with node", zap.Error(err))
continue
}
c.plans[planID] = c.plans[planID].shadowClone(setState(failed), endSpan())
c.setSegmentsCompacting(task.plan, false)
c.scheduler.Finish(task.dataNodeID, task.plan)
continue
}

if c.isTimeout(ts, task.plan.GetStartTime(), task.plan.GetTimeoutInSeconds()) {
log.Warn("compaction timeout",
zap.Int32("timeout in seconds", task.plan.GetTimeoutInSeconds()),
Expand Down Expand Up @@ -624,11 +659,12 @@ func (c *compactionPlanHandler) updateCompaction(ts Timestamp) error {
return planState.B.GetState() == commonpb.CompactionState_Completed
})

unkonwnPlansInWorker, _ := lo.Difference(lo.Keys(completedPlans), cachedPlans)
for _, planID := range unkonwnPlansInWorker {
if nodeUnkonwnPlan, ok := completedPlans[planID]; ok {
nodeID, plan := nodeUnkonwnPlan.A, nodeUnkonwnPlan.B
log := log.With(zap.Int64("planID", planID), zap.Int64("nodeID", nodeID), zap.String("channel", plan.GetChannel()))
unknownPlansInWorker, _ := lo.Difference(lo.Keys(completedPlans), cachedPlans)

for _, planID := range unknownPlansInWorker {
if nodeUnknownPlan, ok := completedPlans[planID]; ok {
nodeID, plan := nodeUnknownPlan.A, nodeUnknownPlan.B
log := log.With(zap.Int64("planID", planID), zap.String("type", plan.GetType().String()), zap.Int64("nodeID", nodeID), zap.String("channel", plan.GetChannel()))

// Sync segments without CompactionFrom segmentsIDs to make sure DN clear the task
// without changing the meta
Expand Down
64 changes: 59 additions & 5 deletions internal/datacoord/compaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/metautil"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/tsoutil"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
Expand All @@ -55,6 +56,40 @@ func (s *CompactionPlanHandlerSuite) SetupTest() {
s.mockSessMgr = NewMockSessionManager(s.T())
}

func (s *CompactionPlanHandlerSuite) TestSchedule() {
tests := []struct {
enableConfig string

expectedTasksCount int
description string
}{
{"true", 3, "enable"},
{"false", 2, "disabled"},
}

s.mockSch.EXPECT().Finish(mock.Anything, mock.Anything).Maybe()
s.mockSch.EXPECT().Schedule().Return(nil).Maybe()
enableLZCKey := paramtable.Get().DataCoordCfg.EnableLevelZeroCompaction.Key
defer paramtable.Get().Reset(enableLZCKey)
for _, test := range tests {
s.Run(test.description, func() {
handler := newCompactionPlanHandler(nil, nil, nil, nil)
handler.scheduler = s.mockSch
handler.plans = map[int64]*compactionTask{
1: {state: pipelining, plan: &datapb.CompactionPlan{PlanID: 1, Type: datapb.CompactionType_MixCompaction}},
2: {state: pipelining, plan: &datapb.CompactionPlan{PlanID: 2, Type: datapb.CompactionType_MixCompaction}},
3: {state: pipelining, plan: &datapb.CompactionPlan{PlanID: 2, Type: datapb.CompactionType_Level0DeleteCompaction}},
}

paramtable.Get().Save(enableLZCKey, test.enableConfig)
handler.schedule()

tasks := handler.getTasksByState(pipelining)
s.Equal(test.expectedTasksCount, len(tasks))
})
}
}

func (s *CompactionPlanHandlerSuite) TestRemoveTasksByChannel() {
s.mockSch.EXPECT().Finish(mock.Anything, mock.Anything).Return().Once()
handler := newCompactionPlanHandler(nil, nil, nil, nil)
Expand Down Expand Up @@ -440,13 +475,17 @@ func (s *CompactionPlanHandlerSuite) TestExecCompactionPlan() {
}).Twice()
s.mockSch.EXPECT().Submit(mock.Anything).Return().Once()

enableLZCKey := paramtable.Get().DataCoordCfg.EnableLevelZeroCompaction.Key
defer paramtable.Get().Reset(enableLZCKey)
tests := []struct {
description string
channel string
hasError bool
description string
channel string
enalbleL0CompactionV string
hasError bool
}{
{"channel with error", "ch-1", true},
{"channel with no error", "ch-2", false},
{"channel with error", "ch-1", "true", true},
{"channel with no error", "ch-2", "true", false},
{"disable l0 compaction", "ch-2", "false", true},
}

handler := newCompactionPlanHandler(s.mockSessMgr, s.mockCm, s.mockMeta, s.mockAlloc)
Expand All @@ -456,9 +495,11 @@ func (s *CompactionPlanHandlerSuite) TestExecCompactionPlan() {
sig := &compactionSignal{id: int64(idx)}
plan := &datapb.CompactionPlan{
PlanID: int64(idx),
Type: datapb.CompactionType_Level0DeleteCompaction,
}
s.Run(test.description, func() {
plan.Channel = test.channel
paramtable.Get().Save(enableLZCKey, test.enalbleL0CompactionV)

err := handler.execCompactionPlan(sig, plan)
if test.hasError {
Expand Down Expand Up @@ -688,6 +729,7 @@ func (s *CompactionPlanHandlerSuite) TestUpdateCompaction() {
3: {A: 111, B: &datapb.CompactionPlanResult{PlanID: 3, State: commonpb.CompactionState_Executing}},
5: {A: 222, B: &datapb.CompactionPlanResult{PlanID: 5, State: commonpb.CompactionState_Completed, Segments: []*datapb.CompactionSegment{{PlanID: 5}}}},
6: {A: 111, B: &datapb.CompactionPlanResult{Channel: "ch-2", PlanID: 5, State: commonpb.CompactionState_Completed, Segments: []*datapb.CompactionSegment{{PlanID: 6}}}},
7: {A: 111, B: &datapb.CompactionPlanResult{Channel: "ch-2", PlanID: 7, State: commonpb.CompactionState_Executing, Segments: []*datapb.CompactionSegment{{PlanID: 7}}}},
}, nil)

inPlans := map[int64]*compactionTask{
Expand Down Expand Up @@ -721,6 +763,12 @@ func (s *CompactionPlanHandlerSuite) TestUpdateCompaction() {
state: executing,
dataNodeID: 111,
},
7: {
triggerInfo: &compactionSignal{},
plan: &datapb.CompactionPlan{PlanID: 7, Channel: "ch-1", Type: datapb.CompactionType_Level0DeleteCompaction},
state: executing,
dataNodeID: 111,
},
}

s.mockSessMgr.EXPECT().SyncSegments(int64(222), mock.Anything).RunAndReturn(func(nodeID int64, req *datapb.SyncSegmentsRequest) error {
Expand All @@ -734,6 +782,9 @@ func (s *CompactionPlanHandlerSuite) TestUpdateCompaction() {
s.mockCm.EXPECT().Match(int64(111), "ch-1").Return(true)
s.mockCm.EXPECT().Match(int64(111), "ch-2").Return(false).Once()

paramtable.Get().Save(paramtable.Get().DataCoordCfg.EnableLevelZeroCompaction.Key, "false")
defer paramtable.Get().Reset(paramtable.Get().DataCoordCfg.EnableLevelZeroCompaction.Key)

handler := newCompactionPlanHandler(s.mockSessMgr, s.mockCm, s.mockMeta, s.mockAlloc)
handler.plans = inPlans

Expand All @@ -757,6 +808,9 @@ func (s *CompactionPlanHandlerSuite) TestUpdateCompaction() {

task = handler.plans[6]
s.Equal(failed, task.state)

task = handler.plans[7]
s.Equal(failed, task.state)
}

func getFieldBinlogIDs(fieldID int64, logIDs ...int64) *datapb.FieldBinlog {
Expand Down
3 changes: 2 additions & 1 deletion internal/datacoord/compaction_trigger_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,12 @@ func (m *CompactionTriggerManager) SubmitL0ViewToScheduler(taskID int64, outView

// TODO, remove handler, use scheduler
// m.scheduler.Submit(plan)
m.handler.execCompactionPlan(signal, plan)
err := m.handler.execCompactionPlan(signal, plan)
log.Info("Finish to submit a LevelZeroCompaction plan",
zap.Int64("taskID", taskID),
zap.Int64("planID", plan.GetPlanID()),
zap.String("type", plan.GetType().String()),
zap.Error(err),
)
}

Expand Down
11 changes: 8 additions & 3 deletions internal/datacoord/compaction_view_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/logutil"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)

Expand Down Expand Up @@ -88,16 +89,20 @@ func (m *CompactionViewManager) checkLoop() {
log.Info("Compaction View checkLoop quit")
return
case <-checkTicker.C:
refreshViewsAndTrigger(context.Background())
if paramtable.Get().DataCoordCfg.EnableLevelZeroCompaction.GetAsBool() {
refreshViewsAndTrigger(context.Background())
}

case <-idleTicker.C:
// idelTicker will be reset everytime when Check's able to
// generates compaction events

// if no views are freshed, try to get cached views and trigger a
// TriggerTypeViewIDLE event
if !refreshViewsAndTrigger(context.Background()) {
m.triggerEventForIDLEView()
if paramtable.Get().DataCoordCfg.EnableLevelZeroCompaction.GetAsBool() {
if !refreshViewsAndTrigger(context.Background()) {
m.triggerEventForIDLEView()
}
}
}
}
Expand Down
3 changes: 3 additions & 0 deletions internal/datanode/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,9 @@ func (node *DataNode) SyncSegments(ctx context.Context, req *datapb.SyncSegments

if len(req.GetCompactedFrom()) <= 0 {
log.Info("SyncSegments with empty compactedFrom, clearing the plan")
// remove from executing
node.compactionExecutor.stopTask(req.GetPlanID())
// remove from completing
node.compactionExecutor.injectDone(req.GetPlanID())
return merr.Success(), nil
}
Expand Down
11 changes: 10 additions & 1 deletion pkg/util/paramtable/component_param.go
Original file line number Diff line number Diff line change
Expand Up @@ -2663,6 +2663,7 @@ type dataCoordConfig struct {

// LevelZero Segment
EnableLevelZeroSegment ParamItem `refreshable:"false"`
EnableLevelZeroCompaction ParamItem `refreshable:"true"`
LevelZeroCompactionTriggerMinSize ParamItem `refreshable:"true"`
LevelZeroCompactionTriggerMaxSize ParamItem `refreshable:"true"`
LevelZeroCompactionTriggerDeltalogMinNum ParamItem `refreshable:"true"`
Expand Down Expand Up @@ -3004,11 +3005,19 @@ During compaction, the size of segment # of rows is able to exceed segment max #
p.EnableLevelZeroSegment = ParamItem{
Key: "dataCoord.segment.enableLevelZero",
Version: "2.4.0",
Doc: "Whether to enable LevelZeroCompaction",
Doc: "Whether to enable LevelZero Segment",
DefaultValue: "true",
}
p.EnableLevelZeroSegment.Init(base.mgr)

p.EnableLevelZeroCompaction = ParamItem{
Key: "dataCoord.compaction.enableLevelZeroCompaction",
Version: "2.4.0",
Doc: "Whether to enable LevelZeroCompaction",
DefaultValue: "true",
}
p.EnableLevelZeroCompaction.Init(base.mgr)

p.LevelZeroCompactionTriggerMinSize = ParamItem{
Key: "dataCoord.compaction.levelzero.forceTrigger.minSize",
Version: "2.4.0",
Expand Down

0 comments on commit 4e405a1

Please sign in to comment.