Skip to content

Commit

Permalink
enhance compaction clean
Browse files Browse the repository at this point in the history
Signed-off-by: wayblink <[email protected]>
  • Loading branch information
wayblink committed Aug 23, 2024
1 parent 9dc1311 commit a99ffed
Show file tree
Hide file tree
Showing 7 changed files with 152 additions and 17 deletions.
27 changes: 26 additions & 1 deletion internal/datacoord/compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ type compactionPlanHandler struct {
executingGuard lock.RWMutex
executingTasks map[int64]CompactionTask // planID -> task

cleaningGuard lock.RWMutex
cleaningTasks map[int64]CompactionTask // planID -> task

meta CompactionMeta
allocator allocator.Allocator
chManager ChannelManager
Expand Down Expand Up @@ -182,13 +185,14 @@ func newCompactionPlanHandler(cluster Cluster, sessions session.DataNodeManager,
) *compactionPlanHandler {
return &compactionPlanHandler{
queueTasks: make(map[int64]CompactionTask),
executingTasks: make(map[int64]CompactionTask),
cleaningTasks: make(map[int64]CompactionTask),
chManager: cm,
meta: meta,
sessions: sessions,
allocator: allocator,
stopCh: make(chan struct{}),
cluster: cluster,
executingTasks: make(map[int64]CompactionTask),
taskNumber: atomic.NewInt32(0),
analyzeScheduler: analyzeScheduler,
handler: handler,
Expand Down Expand Up @@ -395,6 +399,18 @@ func (c *compactionPlanHandler) loopClean() {
}

func (c *compactionPlanHandler) Clean() {
c.cleaningGuard.RLock()
cleanedTasks := make([]CompactionTask, 0)
for _, t := range c.cleaningTasks {
clean := t.Clean()
if clean {
cleanedTasks = append(cleanedTasks, t)
}
}
for _, t := range cleanedTasks {
delete(c.cleaningTasks, t.GetPlanID())
}
c.cleaningGuard.RUnlock()
c.cleanCompactionTaskMeta()
c.cleanPartitionStats()
}
Expand Down Expand Up @@ -672,6 +688,15 @@ func (c *compactionPlanHandler) checkCompaction() error {
}
c.executingGuard.Unlock()
c.taskNumber.Sub(int32(len(finishedTasks)))

// insert task need to clean
c.cleaningGuard.Lock()
for _, t := range finishedTasks {
if t.GetState() == datapb.CompactionTaskState_failed || t.GetState() == datapb.CompactionTaskState_failed {
c.cleaningTasks[t.GetPlanID()] = t
}
}
c.cleaningGuard.Unlock()
return nil
}

Expand Down
1 change: 1 addition & 0 deletions internal/datacoord/compaction_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
type CompactionTask interface {
Process() bool
BuildCompactionRequest() (*datapb.CompactionPlan, error)
Clean() bool

GetTriggerID() UniqueID
GetPlanID() UniqueID
Expand Down
38 changes: 23 additions & 15 deletions internal/datacoord/compaction_task_clustering.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,13 +111,19 @@ func (t *clusteringCompactionTask) Process() bool {
}
}
log.Debug("process clustering task", zap.String("lastState", lastState), zap.String("currentState", currentState))
return t.State == datapb.CompactionTaskState_completed || t.State == datapb.CompactionTaskState_cleaned
return t.State == datapb.CompactionTaskState_completed ||
t.State == datapb.CompactionTaskState_cleaned ||
t.State == datapb.CompactionTaskState_failed ||
t.State == datapb.CompactionTaskState_timeout
}

// retryableProcess process task's state transfer, return error if not work as expected
// the outer Process will set state and retry times according to the error type(retryable or not-retryable)
func (t *clusteringCompactionTask) retryableProcess() error {
if t.State == datapb.CompactionTaskState_completed || t.State == datapb.CompactionTaskState_cleaned {
if t.State == datapb.CompactionTaskState_completed ||
t.State == datapb.CompactionTaskState_cleaned ||
t.State == datapb.CompactionTaskState_failed ||
t.State == datapb.CompactionTaskState_timeout {
return nil
}

Expand All @@ -144,14 +150,21 @@ func (t *clusteringCompactionTask) retryableProcess() error {
return t.processMetaSaved()
case datapb.CompactionTaskState_indexing:
return t.processIndexing()
case datapb.CompactionTaskState_timeout:
return t.processFailedOrTimeout()
case datapb.CompactionTaskState_failed:
return t.processFailedOrTimeout()
}
return nil
}

func (t *clusteringCompactionTask) Clean() bool {
log := log.With(zap.Int64("planID", t.GetPlanID()), zap.String("type", t.GetType().String()))
log.Info("clean task")
err := t.doClean()
if err != nil {
log.Warn("clean task fail", zap.Error(err))
return false
}
return true
}

func (t *clusteringCompactionTask) BuildCompactionRequest() (*datapb.CompactionPlan, error) {
beginLogID, _, err := t.allocator.AllocN(1)
if err != nil {
Expand Down Expand Up @@ -269,12 +282,7 @@ func (t *clusteringCompactionTask) processExecuting() error {
return t.processMetaSaved()
case datapb.CompactionTaskState_executing:
if t.checkTimeout() {
err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_timeout))
if err == nil {
return t.processFailedOrTimeout()
} else {
return err
}
return t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_timeout))
}
return nil
case datapb.CompactionTaskState_failed:
Expand Down Expand Up @@ -374,13 +382,13 @@ func (t *clusteringCompactionTask) resetSegmentCompacting() {
t.meta.SetSegmentsCompacting(t.GetInputSegments(), false)
}

func (t *clusteringCompactionTask) processFailedOrTimeout() error {
func (t *clusteringCompactionTask) doClean() error {
log.Info("clean task", zap.Int64("triggerID", t.GetTriggerID()), zap.Int64("planID", t.GetPlanID()), zap.String("state", t.GetState().String()))
// revert segments meta
var operators []UpdateOperator
// revert level of input segments
// L1 : L1 ->(processPipelining)-> L2 ->(processFailedOrTimeout)-> L1
// L2 : L2 ->(processPipelining)-> L2 ->(processFailedOrTimeout)-> L2
// L1 : L1 ->(processPipelining)-> L2 ->(doClean)-> L1
// L2 : L2 ->(processPipelining)-> L2 ->(doClean)-> L2
for _, segID := range t.InputSegments {
operators = append(operators, RevertSegmentLevelOperator(segID))
}
Expand Down
2 changes: 1 addition & 1 deletion internal/datacoord/compaction_task_clustering_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func (s *ClusteringCompactionTaskSuite) TestClusteringCompactionSegmentMetaChang
},
})

task.processFailedOrTimeout()
task.doClean()

seg12 := s.meta.GetSegment(101)
s.Equal(datapb.SegmentLevel_L1, seg12.Level)
Expand Down
4 changes: 4 additions & 0 deletions internal/datacoord/compaction_task_l0.go
Original file line number Diff line number Diff line change
Expand Up @@ -422,3 +422,7 @@ func (t *l0CompactionTask) saveSegmentMeta() error {

return t.meta.UpdateSegmentsInfo(operators...)
}

func (t *l0CompactionTask) Clean() bool {
return true
}
4 changes: 4 additions & 0 deletions internal/datacoord/compaction_task_mix.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,3 +374,7 @@ func (t *mixCompactionTask) BuildCompactionRequest() (*datapb.CompactionPlan, er
log.Info("Compaction handler refreshed mix compaction plan", zap.Any("segID2DeltaLogs", segIDMap))
return plan, nil
}

func (t *mixCompactionTask) Clean() bool {
return true
}
93 changes: 93 additions & 0 deletions internal/datacoord/compaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -886,6 +886,99 @@ func (s *CompactionPlanHandlerSuite) TestProcessCompleteCompaction() {
s.Equal(0, len(s.handler.getTasksByState(datapb.CompactionTaskState_completed)))
}

func (s *CompactionPlanHandlerSuite) TestCleanCompaction() {
s.SetupTest()

tests := []struct {
task CompactionTask
}{
{
&mixCompactionTask{
CompactionTask: &datapb.CompactionTask{
PlanID: 1,
TriggerID: 1,
Type: datapb.CompactionType_MixCompaction,
State: datapb.CompactionTaskState_failed,
NodeID: 1,
InputSegments: []UniqueID{1, 2},
},
sessions: s.mockSessMgr,
meta: s.mockMeta,
},
},
{
&l0CompactionTask{
CompactionTask: &datapb.CompactionTask{
PlanID: 1,
TriggerID: 1,
Type: datapb.CompactionType_Level0DeleteCompaction,
State: datapb.CompactionTaskState_failed,
NodeID: 1,
InputSegments: []UniqueID{1, 2},
},
sessions: s.mockSessMgr,
meta: s.mockMeta,
},
},
}
for _, test := range tests {
task := test.task
s.mockMeta.EXPECT().SetSegmentsCompacting(mock.Anything, mock.Anything).Return().Once()
s.mockSessMgr.EXPECT().DropCompactionPlan(mock.Anything, mock.Anything).Return(nil)
s.mockMeta.EXPECT().GetCompactionTasks().Return(nil)
s.mockMeta.EXPECT().GetPartitionStatsMeta().Return(nil)

s.handler.executingTasks[1] = task
s.Equal(1, len(s.handler.executingTasks))
s.handler.checkCompaction()
s.Equal(0, len(s.handler.executingTasks))
s.Equal(1, len(s.handler.cleaningTasks))
s.handler.Clean()
s.Equal(0, len(s.handler.cleaningTasks))
}
}

func (s *CompactionPlanHandlerSuite) TestCleanClusteringCompaction() {
s.SetupTest()

tests := []struct {
task CompactionTask
}{
{
&clusteringCompactionTask{
CompactionTask: &datapb.CompactionTask{
PlanID: 1,
TriggerID: 1,
Type: datapb.CompactionType_ClusteringCompaction,
State: datapb.CompactionTaskState_failed,
NodeID: 1,
InputSegments: []UniqueID{1, 2},
},
sessions: s.mockSessMgr,
meta: s.mockMeta,
},
},
}
for _, test := range tests {
task := test.task
s.mockMeta.EXPECT().SetSegmentsCompacting(mock.Anything, mock.Anything).Return().Once()
//s.mockSessMgr.EXPECT().DropCompactionPlan(mock.Anything, mock.Anything).Return(nil)
s.mockMeta.EXPECT().GetCompactionTasks().Return(nil)
s.mockMeta.EXPECT().GetPartitionStatsMeta().Return(nil)
s.mockMeta.EXPECT().UpdateSegmentsInfo(mock.Anything, mock.Anything).Return(nil)
s.mockMeta.EXPECT().CleanPartitionStatsInfo(mock.Anything).Return(nil)
s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil)

s.handler.executingTasks[1] = task
s.Equal(1, len(s.handler.executingTasks))
s.handler.checkCompaction()
s.Equal(0, len(s.handler.executingTasks))
s.Equal(1, len(s.handler.cleaningTasks))
s.handler.Clean()
s.Equal(0, len(s.handler.cleaningTasks))
}
}

func getFieldBinlogIDs(fieldID int64, logIDs ...int64) *datapb.FieldBinlog {
l := &datapb.FieldBinlog{
FieldID: fieldID,
Expand Down

0 comments on commit a99ffed

Please sign in to comment.