Skip to content

Commit

Permalink
fix: avoid rw datarace on CompactionTask
Browse files Browse the repository at this point in the history
Signed-off-by: zhenshan.cao <[email protected]>
  • Loading branch information
czs007 committed Jul 16, 2024
1 parent 804dd54 commit 169a855
Show file tree
Hide file tree
Showing 10 changed files with 449 additions and 398 deletions.
24 changes: 13 additions & 11 deletions internal/datacoord/compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"time"

"github.com/cockroachdb/errors"
"github.com/golang/protobuf/proto"
"github.com/samber/lo"
"go.opentelemetry.io/otel"
"go.uber.org/atomic"
Expand Down Expand Up @@ -551,7 +552,6 @@ func (c *compactionPlanHandler) enqueueCompaction(task *datapb.CompactionTask) e
if err != nil {
return err
}
t.SetTask(t.ShadowClone(setStartTime(time.Now().Unix())))
err = t.SaveTaskMeta()
if err != nil {
c.meta.SetSegmentsCompacting(t.GetInputSegments(), false)
Expand All @@ -563,24 +563,26 @@ func (c *compactionPlanHandler) enqueueCompaction(task *datapb.CompactionTask) e
}

// set segments compacting, one segment can only participate one compactionTask
func (c *compactionPlanHandler) createCompactTask(t *datapb.CompactionTask) (CompactionTask, error) {
func (c *compactionPlanHandler) createCompactTask(pb *datapb.CompactionTask) (CompactionTask, error) {
var task CompactionTask
switch t.GetType() {
pbCloned := proto.Clone(pb).(*datapb.CompactionTask)
pbCloned.StartTime = time.Now().Unix()
switch pb.GetType() {
case datapb.CompactionType_MixCompaction:
task = &mixCompactionTask{
CompactionTask: t,
meta: c.meta,
sessions: c.sessions,
pb: pbCloned,
meta: c.meta,
sessions: c.sessions,
}
case datapb.CompactionType_Level0DeleteCompaction:
task = &l0CompactionTask{
CompactionTask: t,
meta: c.meta,
sessions: c.sessions,
pb: pbCloned,
meta: c.meta,
sessions: c.sessions,
}
case datapb.CompactionType_ClusteringCompaction:
task = &clusteringCompactionTask{
CompactionTask: t,
pb: pbCloned,
meta: c.meta,
sessions: c.sessions,
handler: c.handler,
Expand All @@ -589,7 +591,7 @@ func (c *compactionPlanHandler) createCompactTask(t *datapb.CompactionTask) (Com
default:
return nil, merr.WrapErrIllegalCompactionPlan("illegal compaction type")
}
exist, succeed := c.meta.CheckAndSetSegmentsCompacting(t.GetInputSegments())
exist, succeed := c.meta.CheckAndSetSegmentsCompacting(pbCloned.GetInputSegments())
if !exist {
return nil, merr.WrapErrIllegalCompactionPlan("segment not exist")
}
Expand Down
11 changes: 8 additions & 3 deletions internal/datacoord/compaction_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,12 @@ type CompactionTask interface {

GetNodeID() UniqueID
GetSpan() trace.Span
ShadowClone(opts ...compactionTaskOpt) *datapb.CompactionTask
CloneTaskPB(opts ...compactionTaskOpt) *datapb.CompactionTask
GetTaskPB() *datapb.CompactionTask
SetNodeID(UniqueID) error
SetTask(*datapb.CompactionTask)
SetSpan(trace.Span)
SetResult(*datapb.CompactionPlanResult)
EndSpan()
CleanLogPath()
NeedReAssignNodeID() bool
SaveTaskMeta() error
}
Expand Down Expand Up @@ -89,6 +88,12 @@ func setResultSegments(segments []int64) compactionTaskOpt {
}
}

func setAnalyzeVersion(version int64) compactionTaskOpt {
return func(task *datapb.CompactionTask) {
task.AnalyzeVersion = version
}
}

func setState(state datapb.CompactionTaskState) compactionTaskOpt {
return func(task *datapb.CompactionTask) {
task.State = state
Expand Down
Loading

0 comments on commit 169a855

Please sign in to comment.