Skip to content

Commit

Permalink
Refine compaction
Browse files Browse the repository at this point in the history
Signed-off-by: zhenshan.cao <[email protected]>
  • Loading branch information
czs007 committed Jun 19, 2024
1 parent 1216a4b commit 25c0e87
Show file tree
Hide file tree
Showing 10 changed files with 267 additions and 305 deletions.
3 changes: 3 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ run:
- scripts
- internal/core
- cmake_build
- mmap
- data
- ci
skip-files:
- partial_search_test.go

Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,9 @@ require (
github.com/jolestar/go-commons-pool/v2 v2.1.2
github.com/milvus-io/milvus/pkg v0.0.0-00010101000000-000000000000
github.com/pkg/errors v0.9.1
github.com/valyala/fastjson v1.6.4
github.com/zeebo/xxh3 v1.0.2
google.golang.org/protobuf v1.33.0
gopkg.in/yaml.v3 v3.0.1
)

Expand Down Expand Up @@ -208,7 +210,6 @@ require (
github.com/twmb/murmur3 v1.1.3 // indirect
github.com/uber/jaeger-client-go v2.30.0+incompatible // indirect
github.com/ugorji/go/codec v1.2.11 // indirect
github.com/valyala/fastjson v1.6.4 // indirect
github.com/x448/float16 v0.8.4 // indirect
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 // indirect
github.com/yusufpapurcu/wmi v1.2.2 // indirect
Expand Down Expand Up @@ -238,7 +239,6 @@ require (
google.golang.org/genproto v0.0.0-20230706204954-ccb25ca9f130 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20230629202037-9506855d4529 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240401170217-c3f982113cda // indirect
google.golang.org/protobuf v1.33.0 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect
Expand Down
90 changes: 53 additions & 37 deletions internal/datacoord/compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,8 +274,37 @@ func (c *compactionPlanHandler) loadMeta() {
triggers := c.meta.(*meta).compactionTaskMeta.GetCompactionTasks()
for _, tasks := range triggers {
for _, task := range tasks {
if task.State != datapb.CompactionTaskState_completed && task.State != datapb.CompactionTaskState_cleaned {
c.enqueueCompaction(task)
state := task.GetState()
if state == datapb.CompactionTaskState_completed ||
state == datapb.CompactionTaskState_cleaned ||
state == datapb.CompactionTaskState_unknown {
log.Info("compactionPlanHandler loadMeta abandon compactionTask",
zap.Int64("planID", task.GetPlanID()),
zap.String("State", task.GetState().String()))
continue
} else {
t, err := c.createCompactTask(task)
if err != nil {
log.Warn("compactionPlanHandler loadMeta create compactionTask failed",
zap.Int64("planID", task.GetPlanID()),
zap.String("State", task.GetState().String()))
continue
}
if t.NeedReAssignNodeID() {
c.submitTask(t)
log.Info("compactionPlanHandler loadMeta submitTask",
zap.Int64("planID", t.GetPlanID()),
zap.Int64("triggerID", t.GetTriggerID()),
zap.Int64("collectionID", t.GetCollectionID()),
zap.String("state", t.GetState().String()))
} else {
c.restoreTask(t)
log.Info("compactionPlanHandler loadMeta restoreTask",
zap.Int64("planID", t.GetPlanID()),
zap.Int64("triggerID", t.GetTriggerID()),
zap.Int64("collectionID", t.GetCollectionID()),
zap.String("state", t.GetState().String()))
}
}
}
}
Expand Down Expand Up @@ -466,6 +495,8 @@ func (c *compactionPlanHandler) removeTasksByChannel(channel string) {
}

func (c *compactionPlanHandler) submitTask(t CompactionTask) {
_, span := otel.Tracer(typeutil.DataCoordRole).Start(context.Background(), fmt.Sprintf("Compaction-%s", t.GetType()))
t.SetSpan(span)
c.mu.Lock()
c.queueTasks[t.GetPlanID()] = t
c.mu.Unlock()
Expand All @@ -474,6 +505,8 @@ func (c *compactionPlanHandler) submitTask(t CompactionTask) {

// restoreTask used to restore Task from etcd
func (c *compactionPlanHandler) restoreTask(t CompactionTask) {
_, span := otel.Tracer(typeutil.DataCoordRole).Start(context.Background(), fmt.Sprintf("Compaction-%s", t.GetType()))
t.SetSpan(span)
c.executingMu.Lock()
c.executingTasks[t.GetPlanID()] = t
c.executingMu.Unlock()
Expand Down Expand Up @@ -504,38 +537,23 @@ func (c *compactionPlanHandler) enqueueCompaction(task *datapb.CompactionTask) e
if c.isFull() {
return errCompactionBusy
}
// TODO change to set this on scheduling task
exist, succeed := c.checkAndSetSegmentsCompacting(task)
if !exist {
return merr.WrapErrIllegalCompactionPlan("segment not exist")
}
if !succeed {
return merr.WrapErrCompactionPlanConflict("segment is compacting")
}

// TODO change to set this on scheduling task
t := c.createCompactTask(task)
if t == nil {
return merr.WrapErrIllegalCompactionPlan("illegal compaction type")
t, err := c.createCompactTask(task)
if err != nil {
return err
}
if task.StartTime != 0 {
t.SetTask(t.ShadowClone(setStartTime(time.Now().Unix())))
err := t.SaveTaskMeta()
if err != nil {
return err
}
t.SetTask(t.ShadowClone(setStartTime(time.Now().Unix())))
err = t.SaveTaskMeta()
if err != nil {
c.meta.SetSegmentsCompacting(t.GetInputSegments(), false)
return err
}

_, span := otel.Tracer(typeutil.DataCoordRole).Start(context.Background(), fmt.Sprintf("Compaction-%s", task.GetType()))
t.SetSpan(span)

c.submitTask(t)
log.Info("Compaction plan submitted")
return nil
}

// set segments compacting, one segment can only participate one compactionTask
func (c *compactionPlanHandler) createCompactTask(t *datapb.CompactionTask) CompactionTask {
func (c *compactionPlanHandler) createCompactTask(t *datapb.CompactionTask) (CompactionTask, error) {
var task CompactionTask
switch t.GetType() {
case datapb.CompactionType_MixCompaction:
Expand All @@ -558,19 +576,17 @@ func (c *compactionPlanHandler) createCompactTask(t *datapb.CompactionTask) Comp
handler: c.handler,
analyzeScheduler: c.analyzeScheduler,
}
default:
return nil, merr.WrapErrIllegalCompactionPlan("illegal compaction type")
}
return task
}

// set segments compacting, one segment can only participate one compactionTask
func (c *compactionPlanHandler) setSegmentsCompacting(task CompactionTask, compacting bool) {
for _, segmentID := range task.GetInputSegments() {
c.meta.SetSegmentCompacting(segmentID, compacting)
exist, succeed := c.meta.CheckAndSetSegmentsCompacting(t.GetInputSegments())
if !exist {
return nil, merr.WrapErrIllegalCompactionPlan("segment not exist")
}
}

func (c *compactionPlanHandler) checkAndSetSegmentsCompacting(task *datapb.CompactionTask) (bool, bool) {
return c.meta.CheckAndSetSegmentsCompacting(task.GetInputSegments())
if !succeed {
return nil, merr.WrapErrCompactionPlanConflict("segment is compacting")
}
return task, nil
}

func (c *compactionPlanHandler) assignNodeIDs(tasks []CompactionTask) {
Expand Down
12 changes: 7 additions & 5 deletions internal/datacoord/compaction_task_clustering.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ func (t *clusteringCompactionTask) processExecuting() error {
if errors.Is(err, merr.ErrNodeNotFound) {
log.Warn("GetCompactionPlanResult fail", zap.Error(err))
// todo reassign node ID
t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining), setNodeID(0))
t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining), setNodeID(NullNodeID))
return nil
}
return err
Expand All @@ -232,7 +232,7 @@ func (t *clusteringCompactionTask) processExecuting() error {
return segment.GetSegmentID()
})

_, metricMutation, err := t.meta.CompleteCompactionMutation(t.GetPlan(), t.result)
_, metricMutation, err := t.meta.CompleteCompactionMutation(t.CompactionTask, t.result)
if err != nil {
return err
}
Expand Down Expand Up @@ -336,9 +336,11 @@ func (t *clusteringCompactionTask) processAnalyzing() error {
}

func (t *clusteringCompactionTask) resetSegmentCompacting() {
for _, segmentBinlogs := range t.GetPlan().GetSegmentBinlogs() {
t.meta.SetSegmentCompacting(segmentBinlogs.GetSegmentID(), false)
var segmentIDs []UniqueID
for _, binLogs := range t.GetPlan().GetSegmentBinlogs() {
segmentIDs = append(segmentIDs, binLogs.GetSegmentID())
}
t.meta.SetSegmentsCompacting(segmentIDs, false)
}

func (t *clusteringCompactionTask) processFailedOrTimeout() error {
Expand Down Expand Up @@ -414,7 +416,7 @@ func (t *clusteringCompactionTask) doCompact() error {
err = t.sessions.Compaction(context.Background(), t.GetNodeID(), t.GetPlan())
if err != nil {
log.Warn("Failed to notify compaction tasks to DataNode", zap.Error(err))
t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining), setNodeID(0))
t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining), setNodeID(NullNodeID))
return err
}
t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_executing))
Expand Down
12 changes: 7 additions & 5 deletions internal/datacoord/compaction_task_l0.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func (t *l0CompactionTask) processPipelining() bool {
err = t.sessions.Compaction(context.Background(), t.GetNodeID(), t.GetPlan())
if err != nil {
log.Warn("Failed to notify compaction tasks to DataNode", zap.Error(err))
t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining), setNodeID(0))
t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining), setNodeID(NullNodeID))
return false
}
t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_executing))
Expand All @@ -85,7 +85,7 @@ func (t *l0CompactionTask) processExecuting() bool {
result, err := t.sessions.GetCompactionPlanResult(t.GetNodeID(), t.GetPlanID())
if err != nil || result == nil {
if errors.Is(err, merr.ErrNodeNotFound) {
t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining), setNodeID(0))
t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining), setNodeID(NullNodeID))
}
return false
}
Expand Down Expand Up @@ -186,7 +186,7 @@ func (t *l0CompactionTask) SetStartTime(startTime int64) {
}

func (t *l0CompactionTask) NeedReAssignNodeID() bool {
return t.GetState() == datapb.CompactionTaskState_pipelining && t.GetNodeID() == 0
return t.GetState() == datapb.CompactionTaskState_pipelining && t.GetNodeID() == NullNodeID
}

func (t *l0CompactionTask) SetResult(result *datapb.CompactionPlanResult) {
Expand Down Expand Up @@ -307,9 +307,11 @@ func (t *l0CompactionTask) processCompleted() bool {
}

func (t *l0CompactionTask) resetSegmentCompacting() {
for _, segmentBinlogs := range t.GetPlan().GetSegmentBinlogs() {
t.meta.SetSegmentCompacting(segmentBinlogs.GetSegmentID(), false)
var segmentIDs []UniqueID
for _, binLogs := range t.GetPlan().GetSegmentBinlogs() {
segmentIDs = append(segmentIDs, binLogs.GetSegmentID())
}
t.meta.SetSegmentsCompacting(segmentIDs, false)
}

func (t *l0CompactionTask) processTimeout() bool {
Expand Down
36 changes: 23 additions & 13 deletions internal/datacoord/compaction_task_mix.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,15 @@ func (t *mixCompactionTask) processPipelining() bool {
}
var err error
t.plan, err = t.BuildCompactionRequest()
// Segment not found
if err != nil {
err2 := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_failed), setFailReason(err.Error()))
return err2 == nil
}
err = t.sessions.Compaction(context.Background(), t.GetNodeID(), t.GetPlan())
if err != nil {
log.Warn("Failed to notify compaction tasks to DataNode", zap.Error(err))
t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining), setNodeID(0))
t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining), setNodeID(NullNodeID))
return false
}
t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_executing))
Expand All @@ -59,7 +60,7 @@ func (t *mixCompactionTask) processExecuting() bool {
result, err := t.sessions.GetCompactionPlanResult(t.GetNodeID(), t.GetPlanID())
if err != nil || result == nil {
if errors.Is(err, merr.ErrNodeNotFound) {
t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining), setNodeID(0))
t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining), setNodeID(NullNodeID))
}
return false
}
Expand All @@ -82,13 +83,20 @@ func (t *mixCompactionTask) processExecuting() bool {
}
return t.processFailed()
}
saveSuccess := t.saveSegmentMeta()
if !saveSuccess {
err2 := t.saveSegmentMeta()
if err2 != nil {
if errors.Is(err2, merr.ErrIllegalCompactionPlan) {
err3 := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_failed))
if err3 != nil {
log.Warn("fail to updateAndSaveTaskMeta")
}
return true
}
return false
}
segments := []UniqueID{t.newSegment.GetID()}
err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_meta_saved), setResultSegments(segments))
if err == nil {
err3 := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_meta_saved), setResultSegments(segments))
if err3 == nil {
return t.processMetaSaved()
}
return false
Expand All @@ -110,18 +118,18 @@ func (t *mixCompactionTask) SaveTaskMeta() error {
return t.saveTaskMeta(t.CompactionTask)
}

func (t *mixCompactionTask) saveSegmentMeta() bool {
func (t *mixCompactionTask) saveSegmentMeta() error {
log := log.With(zap.Int64("planID", t.GetPlanID()), zap.String("type", t.GetType().String()))
// Also prepare metric updates.
newSegments, metricMutation, err := t.meta.CompleteCompactionMutation(t.GetPlan(), t.result)
newSegments, metricMutation, err := t.meta.CompleteCompactionMutation(t.CompactionTask, t.result)
if err != nil {
return false
return err
}
// Apply metrics after successful meta update.
t.newSegment = newSegments[0]
metricMutation.commit()
log.Info("mixCompactionTask success to save segment meta")
return true
return nil
}

func (t *mixCompactionTask) Process() bool {
Expand Down Expand Up @@ -161,7 +169,7 @@ func (t *mixCompactionTask) GetLabel() string {
}

func (t *mixCompactionTask) NeedReAssignNodeID() bool {
return t.GetState() == datapb.CompactionTaskState_pipelining && t.GetNodeID() == 0
return t.GetState() == datapb.CompactionTaskState_pipelining && t.GetNodeID() == NullNodeID
}

func (t *mixCompactionTask) processCompleted() bool {
Expand All @@ -178,9 +186,11 @@ func (t *mixCompactionTask) processCompleted() bool {
}

func (t *mixCompactionTask) resetSegmentCompacting() {
for _, segmentBinlogs := range t.GetPlan().GetSegmentBinlogs() {
t.meta.SetSegmentCompacting(segmentBinlogs.GetSegmentID(), false)
var segmentIDs []UniqueID
for _, binLogs := range t.GetPlan().GetSegmentBinlogs() {
segmentIDs = append(segmentIDs, binLogs.GetSegmentID())
}
t.meta.SetSegmentsCompacting(segmentIDs, false)
}

func (t *mixCompactionTask) processTimeout() bool {
Expand Down
9 changes: 5 additions & 4 deletions internal/datacoord/compaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,7 @@ func (s *CompactionPlanHandlerSuite) TestGetCompactionTask() {
func (s *CompactionPlanHandlerSuite) TestExecCompactionPlan() {
s.SetupTest()
s.mockMeta.EXPECT().CheckAndSetSegmentsCompacting(mock.Anything).Return(true, true).Once()
s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil)
handler := newCompactionPlanHandler(nil, s.mockSessMgr, s.mockCm, s.mockMeta, s.mockAlloc, nil, nil)

task := &datapb.CompactionTask{
Expand Down Expand Up @@ -658,11 +659,11 @@ func (s *CompactionPlanHandlerSuite) TestCheckCompaction() {
// s.mockMeta.EXPECT().UpdateSegmentsInfo(mock.Anything).Return(nil)
s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil)
s.mockMeta.EXPECT().CompleteCompactionMutation(mock.Anything, mock.Anything).RunAndReturn(
func(plan *datapb.CompactionPlan, result *datapb.CompactionPlanResult) ([]*SegmentInfo, *segMetricMutation, error) {
if plan.GetPlanID() == 2 {
func(t *datapb.CompactionTask, result *datapb.CompactionPlanResult) ([]*SegmentInfo, *segMetricMutation, error) {
if t.GetPlanID() == 2 {
segment := NewSegmentInfo(&datapb.SegmentInfo{ID: 100})
return []*SegmentInfo{segment}, &segMetricMutation{}, nil
} else if plan.GetPlanID() == 6 {
} else if t.GetPlanID() == 6 {
return nil, nil, errors.Errorf("intended error")
}
return nil, nil, errors.Errorf("unexpected error")
Expand Down Expand Up @@ -706,7 +707,7 @@ func (s *CompactionPlanHandlerSuite) TestProcessCompleteCompaction() {

// s.mockSessMgr.EXPECT().SyncSegments(mock.Anything, mock.Anything).Return(nil).Once()
s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil)
s.mockMeta.EXPECT().SetSegmentCompacting(mock.Anything, mock.Anything).Return().Twice()
s.mockMeta.EXPECT().SetSegmentsCompacting(mock.Anything, mock.Anything).Return().Twice()
segment := NewSegmentInfo(&datapb.SegmentInfo{ID: 100})
s.mockMeta.EXPECT().CompleteCompactionMutation(mock.Anything, mock.Anything).Return(
[]*SegmentInfo{segment},
Expand Down
Loading

0 comments on commit 25c0e87

Please sign in to comment.