Skip to content

Commit

Permalink
fix: Record active collections for l0Policy
Browse files Browse the repository at this point in the history
By recording the active collection lists, The l0 compaction trigger
of view change and idle won't influence each other.

Also this pr replace the L0View cache with real L0 segments' change.
Save some memory and make L0 trigger more accurate.

See also: milvus-io#39187

Signed-off-by: yangxuan <[email protected]>
  • Loading branch information
XuanYang-cn committed Jan 13, 2025
1 parent 5c5948c commit cdbe1a0
Show file tree
Hide file tree
Showing 5 changed files with 208 additions and 212 deletions.
187 changes: 97 additions & 90 deletions internal/datacoord/compaction_policy_l0.go
Original file line number Diff line number Diff line change
@@ -1,121 +1,83 @@
package datacoord

import (
"sync"
"time"

"github.com/samber/lo"
"go.uber.org/atomic"
"go.uber.org/zap"

"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/proto/datapb"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)

type l0CompactionPolicy struct {
meta *meta
view *FullViews

emptyLoopCount *atomic.Int64
activeCollections *activeCollections
}

func newL0CompactionPolicy(meta *meta) *l0CompactionPolicy {
return &l0CompactionPolicy{
meta: meta,
// donot share views with other compaction policy
view: &FullViews{collections: make(map[int64][]*SegmentView)},
emptyLoopCount: atomic.NewInt64(0),
meta: meta,
activeCollections: newActiveCollections(),
}
}

func (policy *l0CompactionPolicy) Enable() bool {
return Params.DataCoordCfg.EnableAutoCompaction.GetAsBool()
}

func (policy *l0CompactionPolicy) Trigger() (map[CompactionTriggerType][]CompactionView, error) {
// support config hot refresh
events := policy.generateEventForLevelZeroViewChange()
if len(events) != 0 {
// each time when triggers a compaction, the idleTicker would reset
policy.emptyLoopCount.Store(0)
return events, nil
}
policy.emptyLoopCount.Inc()

if policy.emptyLoopCount.Load() >= 3 {
policy.emptyLoopCount.Store(0)
return policy.generateEventForLevelZeroViewIDLE(), nil
}

return make(map[CompactionTriggerType][]CompactionView), nil
// Notify policy to record the active updated(when adding a new L0 segment) collections.
func (policy *l0CompactionPolicy) OnCollectionUpdate(collectionID int64) {
policy.activeCollections.Record(collectionID)
}

func (policy *l0CompactionPolicy) generateEventForLevelZeroViewChange() (events map[CompactionTriggerType][]CompactionView) {
func (policy *l0CompactionPolicy) Trigger() (events map[CompactionTriggerType][]CompactionView, err error) {
events = make(map[CompactionTriggerType][]CompactionView)
latestCollSegs := policy.meta.GetCompactableSegmentGroupByCollection()
latestCollIDs := lo.Keys(latestCollSegs)
viewCollIDs := lo.Keys(policy.view.collections)

_, diffRemove := lo.Difference(latestCollIDs, viewCollIDs)
for _, collID := range diffRemove {
delete(policy.view.collections, collID)
}
// 1. Get active collections
activeColls := policy.activeCollections.GetActiveCollections()

refreshedL0Views := policy.RefreshLevelZeroViews(latestCollSegs)
if len(refreshedL0Views) > 0 {
events = make(map[CompactionTriggerType][]CompactionView)
events[TriggerTypeLevelZeroViewChange] = refreshedL0Views
}
// 2. Idle collections = all collections - active collections
missCached, idleColls := lo.Difference(activeColls, lo.Keys(latestCollSegs))
policy.activeCollections.ClearMissCached(missCached...)

return events
}

func (policy *l0CompactionPolicy) RefreshLevelZeroViews(latestCollSegs map[int64][]*SegmentInfo) []CompactionView {
var allRefreshedL0Veiws []CompactionView
idleCollsSet := typeutil.NewUniqueSet(idleColls...)
activeL0Views, idleL0Views := []CompactionView{}, []CompactionView{}
for collID, segments := range latestCollSegs {
policy.activeCollections.Read(collID)

levelZeroSegments := lo.Filter(segments, func(info *SegmentInfo, _ int) bool {
return info.GetLevel() == datapb.SegmentLevel_L0
})
latestL0Segments := GetViewsByInfo(levelZeroSegments...)
needRefresh, collRefreshedViews := policy.getChangedLevelZeroViews(collID, latestL0Segments)
if needRefresh {
log.Info("Refresh compaction level zero views",
zap.Int64("collectionID", collID),
zap.Strings("views", lo.Map(collRefreshedViews, func(view CompactionView, _ int) string {
return view.String()
})))
policy.view.collections[collID] = latestL0Segments
if len(levelZeroSegments) == 0 {
continue
}

if len(collRefreshedViews) > 0 {
allRefreshedL0Veiws = append(allRefreshedL0Veiws, collRefreshedViews...)
labelViews := policy.groupL0ViewsByPartChan(collID, GetViewsByInfo(levelZeroSegments...))
if idleCollsSet.Contain(collID) {
idleL0Views = append(idleL0Views, labelViews...)
} else {
activeL0Views = append(activeL0Views, labelViews...)
}
}

return allRefreshedL0Veiws
}

func (policy *l0CompactionPolicy) getChangedLevelZeroViews(collID UniqueID, LevelZeroViews []*SegmentView) (needRefresh bool, refreshed []CompactionView) {
cachedViews := policy.view.GetSegmentViewBy(collID, func(v *SegmentView) bool {
return v.Level == datapb.SegmentLevel_L0
})

if len(LevelZeroViews) == 0 && len(cachedViews) != 0 {
needRefresh = true
return
}
if len(activeL0Views) > 0 {
events[TriggerTypeLevelZeroViewChange] = activeL0Views
}

latestViews := policy.groupL0ViewsByPartChan(collID, LevelZeroViews)
for _, latestView := range latestViews {
views := lo.Filter(cachedViews, func(v *SegmentView, _ int) bool {
return v.label.Equal(latestView.GetGroupLabel())
})

if !latestView.Equal(views) {
refreshed = append(refreshed, latestView)
needRefresh = true
}
if len(idleL0Views) > 0 {
events[TriggerTypeLevelZeroViewIDLE] = idleL0Views
}
return
}

func (policy *l0CompactionPolicy) groupL0ViewsByPartChan(collectionID UniqueID, levelZeroSegments []*SegmentView) map[string]*LevelZeroSegmentsView {
func (policy *l0CompactionPolicy) groupL0ViewsByPartChan(collectionID UniqueID, levelZeroSegments []*SegmentView) []CompactionView {
partChanView := make(map[string]*LevelZeroSegmentsView) // "part-chan" as key
for _, view := range levelZeroSegments {
key := view.label.Key()
Expand All @@ -130,26 +92,71 @@ func (policy *l0CompactionPolicy) groupL0ViewsByPartChan(collectionID UniqueID,
}
}

return partChanView
return lo.Map(lo.Values(partChanView), func(view *LevelZeroSegmentsView, _ int) CompactionView {
return view
})
}

func (policy *l0CompactionPolicy) generateEventForLevelZeroViewIDLE() map[CompactionTriggerType][]CompactionView {
events := make(map[CompactionTriggerType][]CompactionView, 0)
for collID := range policy.view.collections {
cachedViews := policy.view.GetSegmentViewBy(collID, func(v *SegmentView) bool {
return v.Level == datapb.SegmentLevel_L0
})
if len(cachedViews) > 0 {
log.Info("Views idle for a long time, try to trigger a TriggerTypeLevelZeroViewIDLE compaction event")
grouped := policy.groupL0ViewsByPartChan(collID, cachedViews)
events[TriggerTypeLevelZeroViewIDLE] = lo.Map(lo.Values(grouped),
func(l0View *LevelZeroSegmentsView, _ int) CompactionView {
return l0View
})
log.Info("Generate TriggerTypeLevelZeroViewIDLE compaction event", zap.Int64("collectionID", collID))
break
type activeCollection struct {
ID int64
lastRefresh time.Time
readCount *atomic.Int64
}

func newActiveCollection(ID int64) *activeCollection {
return &activeCollection{
ID: ID,
lastRefresh: time.Now(),
readCount: atomic.NewInt64(0),
}
}

type activeCollections struct {
collections map[int64]*activeCollection
collGuard sync.RWMutex
}

func newActiveCollections() *activeCollections {
return &activeCollections{
collections: make(map[int64]*activeCollection),
}
}

func (ac *activeCollections) ClearMissCached(collectionIDs ...int64) {
ac.collGuard.Lock()
defer ac.collGuard.Unlock()
lo.ForEach(collectionIDs, func(collID int64, _ int) {
delete(ac.collections, collID)
})
}

func (ac *activeCollections) Record(collectionID int64) {
ac.collGuard.Lock()
defer ac.collGuard.Unlock()
if _, ok := ac.collections[collectionID]; !ok {
ac.collections[collectionID] = newActiveCollection(collectionID)
} else {
ac.collections[collectionID].lastRefresh = time.Now()
ac.collections[collectionID].readCount.Store(0)
}
}

func (ac *activeCollections) Read(collectionID int64) {
ac.collGuard.Lock()
defer ac.collGuard.Unlock()
if _, ok := ac.collections[collectionID]; ok {
ac.collections[collectionID].readCount.Inc()
if ac.collections[collectionID].readCount.Load() >= 3 &&
time.Since(ac.collections[collectionID].lastRefresh) > 3*paramtable.Get().DataCoordCfg.L0CompactionTriggerInterval.GetAsDuration(time.Second) {
log.Info("Active(of deletions) collections become idle", zap.Int64("collectionID", collectionID))
delete(ac.collections, collectionID)
}
}
}

func (ac *activeCollections) GetActiveCollections() []int64 {
ac.collGuard.RLock()
defer ac.collGuard.RUnlock()

return events
return lo.Keys(ac.collections)
}
Loading

0 comments on commit cdbe1a0

Please sign in to comment.