Skip to content

Commit

Permalink
support to replicate import msg
Browse files Browse the repository at this point in the history
Signed-off-by: SimFG <[email protected]>
  • Loading branch information
SimFG committed Jan 16, 2025
1 parent 3a6408b commit 1fb0acb
Show file tree
Hide file tree
Showing 58 changed files with 3,535 additions and 1,778 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,7 @@ replace (
github.com/go-kit/kit => github.com/go-kit/kit v0.1.0
github.com/greatroar/blobloom => github.com/milvus-io/blobloom v0.0.0-20240603110411-471ae49f3b93
github.com/ianlancetaylor/cgosymbolizer => github.com/milvus-io/cgosymbolizer v0.0.0-20240722103217-b7dee0e50119
github.com/milvus-io/milvus-proto/go-api/v2 => github.com/SimFG/milvus-proto/go-api/v2 v2.0.0-20250109072021-b66909fa356b
github.com/milvus-io/milvus/pkg => ./pkg
github.com/streamnative/pulsarctl => github.com/xiaofan-luan/pulsarctl v0.5.1
github.com/tecbot/gorocksdb => github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAE
github.com/Shopify/goreferrer v0.0.0-20181106222321-ec9c9a553398/go.mod h1:a1uqRtAwp2Xwc6WNPJEufxJ7fx3npB4UV/JOLmbu5I0=
github.com/SimFG/expr v0.0.0-20241226082220-a9a764953bf8 h1:boN3QhAWQU9O8EYQWxN7AEYav39PuD29QzZwTiI8Ca0=
github.com/SimFG/expr v0.0.0-20241226082220-a9a764953bf8/go.mod h1:8/vRC7+7HBzESEqt5kKpYXxrxkr31SaO8r40VO/1IT4=
github.com/SimFG/milvus-proto/go-api/v2 v2.0.0-20250109072021-b66909fa356b h1:O4zGq8JOuCY9k9srUd4voUsdJItMy/gtVFkAosfyQgk=
github.com/SimFG/milvus-proto/go-api/v2 v2.0.0-20250109072021-b66909fa356b/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
github.com/actgardner/gogen-avro/v10 v10.1.0/go.mod h1:o+ybmVjEa27AAr35FRqU98DJu1fXES56uXniYFv4yDA=
github.com/actgardner/gogen-avro/v10 v10.2.1/go.mod h1:QUhjeHPchheYmMDni/Nx7VB0RsT/ee8YIgGY/xpEQgQ=
github.com/actgardner/gogen-avro/v9 v9.1.0/go.mod h1:nyTj6wPqDJoxM3qdnjcLv+EnMDSDFqE0qDpva2QRmKc=
Expand Down Expand Up @@ -630,8 +632,6 @@ github.com/milvus-io/cgosymbolizer v0.0.0-20240722103217-b7dee0e50119 h1:9VXijWu
github.com/milvus-io/cgosymbolizer v0.0.0-20240722103217-b7dee0e50119/go.mod h1:DvXTE/K/RtHehxU8/GtDs4vFtfw64jJ3PaCnFri8CRg=
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b h1:TfeY0NxYxZzUfIfYe5qYDBzt4ZYRqzUjTR6CvUzjat8=
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b/go.mod h1:iwW+9cWfIzzDseEBCCeDSN5SD16Tidvy8cwQ7ZY8Qj4=
github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20250102080446-c3ba3d26a90f h1:So6RKU5wqP/8EaKogicJP8gZ2SrzzS/JprusBaE3RKc=
github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20250102080446-c3ba3d26a90f/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
github.com/milvus-io/pulsar-client-go v0.12.1 h1:O2JZp1tsYiO7C0MQ4hrUY/aJXnn2Gry6hpm7UodghmE=
github.com/milvus-io/pulsar-client-go v0.12.1/go.mod h1:dkutuH4oS2pXiGm+Ti7fQZ4MRjrMPZ8IJeEGAWMeckk=
github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 h1:AMFGa4R4MiIpspGNG7Z948v4n35fFGB3RR3G/ry4FWs=
Expand Down
4 changes: 3 additions & 1 deletion internal/.mockery.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ packages:
github.com/milvus-io/milvus/internal/streamingnode/server/flusher:
interfaces:
Flusher:
FlushMsgHandler:
github.com/milvus-io/milvus/internal/streamingnode/server/wal:
interfaces:
OpenerBuilder:
Expand Down Expand Up @@ -82,6 +81,9 @@ packages:
github.com/milvus-io/milvus/internal/util/searchutil/optimizers:
interfaces:
QueryHook:
# github.com/milvus-io/milvus/internal/flushcommon/util:
# interfaces:
# MsgHandler:
google.golang.org/grpc/resolver:
interfaces:
ClientConn:
Expand Down
44 changes: 42 additions & 2 deletions internal/datacoord/compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ type compactionPlanContext interface {
getCompactionTasksNumBySignalID(signalID int64) int
getCompactionInfo(ctx context.Context, signalID int64) *compactionInfo
removeTasksByChannel(channel string)
getCompactionTasksNum(filters ...compactionTaskFilter) int
}

var (
Expand Down Expand Up @@ -145,7 +146,7 @@ func (sna *SlotBasedNodeAssigner) pickAnyNode(task CompactionTask) (nodeID int64
}

type compactionPlanHandler struct {
queueTasks CompactionQueue
queueTasks *CompactionQueue

executingGuard lock.RWMutex
executingTasks map[int64]CompactionTask // planID -> task
Expand Down Expand Up @@ -255,7 +256,7 @@ func newCompactionPlanHandler(cluster Cluster, sessions session.DataNodeManager,
// TODO[GOOSE]: Higher capacity makes tasks waiting longer, which need to be get rid of.
capacity := paramtable.Get().DataCoordCfg.CompactionTaskQueueCapacity.GetAsInt()
return &compactionPlanHandler{
queueTasks: *NewCompactionQueue(capacity, getPrioritizer()),
queueTasks: NewCompactionQueue(capacity, getPrioritizer()),
meta: meta,
sessions: sessions,
allocator: allocator,
Expand Down Expand Up @@ -782,6 +783,45 @@ func (c *compactionPlanHandler) checkDelay(t CompactionTask) {
}
}

func (c *compactionPlanHandler) getCompactionTasksNum(filters ...compactionTaskFilter) int {
cnt := 0
isMatch := func(task CompactionTask) bool {
for _, f := range filters {
if !f(task) {
return false
}
}
return true
}
c.queueTasks.ForEach(func(task CompactionTask) {
if isMatch(task) {
cnt += 1
}
})
c.executingGuard.RLock()
for _, t := range c.executingTasks {
if isMatch(t) {
cnt += 1
}
}
c.executingGuard.RUnlock()
return cnt
}

type compactionTaskFilter func(task CompactionTask) bool

func CollectionIDCompactionTaskFilter(collectionID int64) compactionTaskFilter {
return func(task CompactionTask) bool {
return task.GetTaskProto().GetCollectionID() == collectionID
}
}

func L0CompactionCompactionTaskFilter() compactionTaskFilter {
return func(task CompactionTask) bool {
return task.GetTaskProto().GetType() == datapb.CompactionType_Level0DeleteCompaction
}
}

var (
ioPool *conc.Pool[any]
ioPoolInitOnce sync.Once
Expand Down
45 changes: 43 additions & 2 deletions internal/datacoord/compaction_policy_l0.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package datacoord

import (
"sync"

"github.com/samber/lo"
"go.uber.org/atomic"
"go.uber.org/zap"
Expand All @@ -14,21 +16,54 @@ type l0CompactionPolicy struct {
view *FullViews

emptyLoopCount *atomic.Int64

// key: collectionID, value: reference count
skipCompactionCollections map[int64]int
skipLocker sync.RWMutex
}

func newL0CompactionPolicy(meta *meta) *l0CompactionPolicy {
return &l0CompactionPolicy{
meta: meta,
// donot share views with other compaction policy
view: &FullViews{collections: make(map[int64][]*SegmentView)},
emptyLoopCount: atomic.NewInt64(0),
view: &FullViews{collections: make(map[int64][]*SegmentView)},
emptyLoopCount: atomic.NewInt64(0),
skipCompactionCollections: make(map[int64]int),
}
}

func (policy *l0CompactionPolicy) Enable() bool {
return Params.DataCoordCfg.EnableAutoCompaction.GetAsBool()
}

func (policy *l0CompactionPolicy) AddSkipCollection(collectionID UniqueID) {
policy.skipLocker.Lock()
defer policy.skipLocker.Unlock()

if _, ok := policy.skipCompactionCollections[collectionID]; !ok {
policy.skipCompactionCollections[collectionID] = 1
} else {
policy.skipCompactionCollections[collectionID]++
}
}

func (policy *l0CompactionPolicy) RemoveSkipCollection(collectionID UniqueID) {
policy.skipLocker.Lock()
defer policy.skipLocker.Unlock()
refCount := policy.skipCompactionCollections[collectionID]
if refCount > 1 {
policy.skipCompactionCollections[collectionID]--
} else {
delete(policy.skipCompactionCollections, collectionID)
}
}

func (policy *l0CompactionPolicy) isSkipCollection(collectionID UniqueID) bool {
policy.skipLocker.RLock()
defer policy.skipLocker.RUnlock()
return policy.skipCompactionCollections[collectionID] > 0
}

func (policy *l0CompactionPolicy) Trigger() (map[CompactionTriggerType][]CompactionView, error) {
// support config hot refresh
events := policy.generateEventForLevelZeroViewChange()
Expand Down Expand Up @@ -69,6 +104,9 @@ func (policy *l0CompactionPolicy) generateEventForLevelZeroViewChange() (events
func (policy *l0CompactionPolicy) RefreshLevelZeroViews(latestCollSegs map[int64][]*SegmentInfo) []CompactionView {
var allRefreshedL0Veiws []CompactionView
for collID, segments := range latestCollSegs {
if policy.isSkipCollection(collID) {
continue
}
levelZeroSegments := lo.Filter(segments, func(info *SegmentInfo, _ int) bool {
return info.GetLevel() == datapb.SegmentLevel_L0
})
Expand Down Expand Up @@ -136,6 +174,9 @@ func (policy *l0CompactionPolicy) groupL0ViewsByPartChan(collectionID UniqueID,
func (policy *l0CompactionPolicy) generateEventForLevelZeroViewIDLE() map[CompactionTriggerType][]CompactionView {
events := make(map[CompactionTriggerType][]CompactionView, 0)
for collID := range policy.view.collections {
if policy.isSkipCollection(collID) {
continue
}
cachedViews := policy.view.GetSegmentViewBy(collID, func(v *SegmentView) bool {
return v.Level == datapb.SegmentLevel_L0
})
Expand Down
70 changes: 70 additions & 0 deletions internal/datacoord/compaction_policy_l0_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,48 @@ func (s *L0CompactionPolicySuite) TestTrigger() {
for _, view := range cView.GetSegmentsView() {
s.Equal(datapb.SegmentLevel_L0, view.Level)
}

// test for skip collection
s.l0_policy.AddSkipCollection(1)
s.l0_policy.AddSkipCollection(1)
// Test for idle trigger
for i := 0; i < 2; i++ {
events, err = s.l0_policy.Trigger()
s.NoError(err)
s.Equal(0, len(events))
}
s.EqualValues(2, s.l0_policy.emptyLoopCount.Load())
// Test for skip collection
events, err = s.l0_policy.Trigger()
s.NoError(err)
s.Equal(0, len(events))

// Test for skip collection with ref count
s.l0_policy.RemoveSkipCollection(1)
// Test for idle trigger
for i := 0; i < 2; i++ {
events, err = s.l0_policy.Trigger()
s.NoError(err)
s.Equal(0, len(events))
}
s.EqualValues(2, s.l0_policy.emptyLoopCount.Load())
// Test for skip collection
events, err = s.l0_policy.Trigger()
s.NoError(err)
s.Equal(0, len(events))

s.l0_policy.RemoveSkipCollection(1)
// Test for idle trigger
for i := 0; i < 2; i++ {
events, err = s.l0_policy.Trigger()
s.NoError(err)
s.Equal(0, len(events))
}
s.EqualValues(2, s.l0_policy.emptyLoopCount.Load())
events, err = s.l0_policy.Trigger()
s.NoError(err)
s.Equal(1, len(events))

log.Info("cView", zap.String("string", cView.String()))

segArgs := []struct {
Expand Down Expand Up @@ -144,6 +186,34 @@ func (s *L0CompactionPolicySuite) TestGenerateEventForLevelZeroViewChange() {
}
}

func (s *L0CompactionPolicySuite) TestGenerateEventForLevelZeroViewChangeWithSkip() {
s.Require().Empty(s.l0_policy.view.collections)

s.l0_policy.AddSkipCollection(1)
events := s.l0_policy.generateEventForLevelZeroViewChange()
s.Empty(events)

s.l0_policy.RemoveSkipCollection(1)
events = s.l0_policy.generateEventForLevelZeroViewChange()
s.NotEmpty(events)
s.NotEmpty(s.l0_policy.view.collections)

gotViews, ok := events[TriggerTypeLevelZeroViewChange]
s.True(ok)
s.NotNil(gotViews)
s.Equal(1, len(gotViews))

storedViews, ok := s.l0_policy.view.collections[s.testLabel.CollectionID]
s.True(ok)
s.NotNil(storedViews)
s.Equal(4, len(storedViews))

for _, view := range storedViews {
s.Equal(s.testLabel, view.label)
s.Equal(datapb.SegmentLevel_L0, view.Level)
}
}

func genSegmentsForMeta(label *CompactionGroupLabel) map[int64]*SegmentInfo {
segArgs := []struct {
ID UniqueID
Expand Down
58 changes: 58 additions & 0 deletions internal/datacoord/compaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"time"

"github.com/cockroachdb/errors"
"github.com/magiconair/properties/assert"
"github.com/samber/lo"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
Expand Down Expand Up @@ -1207,3 +1208,60 @@ func TestCheckDelay(t *testing.T) {
}, nil, nil, nil, nil, nil)
handler.checkDelay(t3)
}

func TestGetCompactionTasksNum(t *testing.T) {
queueTasks := NewCompactionQueue(10, DefaultPrioritizer)
queueTasks.Enqueue(
newMixCompactionTask(&datapb.CompactionTask{
StartTime: time.Now().Add(-100 * time.Minute).Unix(),
CollectionID: 1,
Type: datapb.CompactionType_MixCompaction,
}, nil, nil, nil),
)
queueTasks.Enqueue(
newL0CompactionTask(&datapb.CompactionTask{
StartTime: time.Now().Add(-100 * time.Minute).Unix(),
CollectionID: 1,
Type: datapb.CompactionType_Level0DeleteCompaction,
}, nil, nil, nil),
)
queueTasks.Enqueue(
newClusteringCompactionTask(&datapb.CompactionTask{
StartTime: time.Now().Add(-100 * time.Minute).Unix(),
CollectionID: 10,
Type: datapb.CompactionType_ClusteringCompaction,
}, nil, nil, nil, nil, nil),
)
executingTasks := make(map[int64]CompactionTask, 0)
executingTasks[1] = newMixCompactionTask(&datapb.CompactionTask{
StartTime: time.Now().Add(-100 * time.Minute).Unix(),
CollectionID: 1,
Type: datapb.CompactionType_MixCompaction,
}, nil, nil, nil)
executingTasks[2] = newL0CompactionTask(&datapb.CompactionTask{
StartTime: time.Now().Add(-100 * time.Minute).Unix(),
CollectionID: 10,
Type: datapb.CompactionType_Level0DeleteCompaction,
}, nil, nil, nil)

handler := &compactionPlanHandler{
queueTasks: queueTasks,
executingTasks: executingTasks,
}
t.Run("no filter", func(t *testing.T) {
i := handler.getCompactionTasksNum()
assert.Equal(t, 5, i)
})
t.Run("collection id filter", func(t *testing.T) {
i := handler.getCompactionTasksNum(CollectionIDCompactionTaskFilter(1))
assert.Equal(t, 3, i)
})
t.Run("l0 compaction filter", func(t *testing.T) {
i := handler.getCompactionTasksNum(L0CompactionCompactionTaskFilter())
assert.Equal(t, 2, i)
})
t.Run("collection id and l0 compaction filter", func(t *testing.T) {
i := handler.getCompactionTasksNum(CollectionIDCompactionTaskFilter(1), L0CompactionCompactionTaskFilter())
assert.Equal(t, 1, i)
})
}
5 changes: 5 additions & 0 deletions internal/datacoord/compaction_trigger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ type spyCompactionHandler struct {
meta *meta
}

// getCompactionTasksNum implements compactionPlanContext.
func (h *spyCompactionHandler) getCompactionTasksNum(filters ...compactionTaskFilter) int {
return 0
}

func (h *spyCompactionHandler) getCompactionTasksNumBySignalID(signalID int64) int {
return 0
}
Expand Down
Loading

0 comments on commit 1fb0acb

Please sign in to comment.