Skip to content

Commit

Permalink
enhance: [10kcp] Skip creating partition rate limiters when not enable (
Browse files Browse the repository at this point in the history
#38062)

issue: #37630

Signed-off-by: bigsheeper <[email protected]>
  • Loading branch information
bigsheeper authored Nov 28, 2024
1 parent 635d161 commit 0930430
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 19 deletions.
22 changes: 12 additions & 10 deletions internal/datacoord/dataview/view_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/funcutil"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/tsoutil"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)

Expand Down Expand Up @@ -110,9 +111,9 @@ func (m *dataViewManager) Close() {
})
}

func (m *dataViewManager) update(view *DataView) {
func (m *dataViewManager) update(view *DataView, reason string) {
m.currentViews.Insert(view.CollectionID, view)
log.Info("update new data view", zap.Int64("collectionID", view.CollectionID), zap.Int64("version", view.Version))
log.Info("update new data view", zap.Int64("collectionID", view.CollectionID), zap.Int64("version", view.Version), zap.String("reason", reason))
}

func (m *dataViewManager) TryUpdateDataView(collectionID int64) {
Expand All @@ -127,7 +128,7 @@ func (m *dataViewManager) TryUpdateDataView(collectionID int64) {
currentView, ok := m.currentViews.Get(collectionID)
if !ok {
// update due to data view is empty
m.update(newView)
m.update(newView, "init data view")
return
}
// no-op if the incoming version is less than the current version.
Expand All @@ -141,7 +142,7 @@ func (m *dataViewManager) TryUpdateDataView(collectionID int64) {
current, ok := currentView.Channels[channel]
if !ok {
// update due to channel info is empty
m.update(newView)
m.update(newView, "init channel info")
return
}
if !funcutil.SliceSetEqual(new.GetLevelZeroSegmentIds(), current.GetLevelZeroSegmentIds()) ||
Expand All @@ -150,24 +151,25 @@ func (m *dataViewManager) TryUpdateDataView(collectionID int64) {
!funcutil.SliceSetEqual(new.GetIndexedSegmentIds(), current.GetIndexedSegmentIds()) ||
!funcutil.SliceSetEqual(new.GetDroppedSegmentIds(), current.GetDroppedSegmentIds()) {
// update due to segments list changed
m.update(newView)
m.update(newView, "channel segments list changed")
return
}
if !typeutil.MapEqual(new.GetPartitionStatsVersions(), current.GetPartitionStatsVersions()) {
// update due to partition stats changed
m.update(newView)
m.update(newView, "partition stats changed")
return
}
// TODO: It might be too frequent.
if new.GetSeekPosition().GetTimestamp() > current.GetSeekPosition().GetTimestamp() {
newTime := tsoutil.PhysicalTime(new.GetSeekPosition().GetTimestamp())
curTime := tsoutil.PhysicalTime(current.GetSeekPosition().GetTimestamp())
if newTime.Sub(curTime) > paramtable.Get().DataCoordCfg.CPIntervalToUpdateDataView.GetAsDuration(time.Second) {
// update due to channel cp advanced
m.update(newView)
m.update(newView, "channel cp advanced")
return
}
}

if !typeutil.MapEqual(newView.Segments, currentView.Segments) {
// update due to segments list changed
m.update(newView)
m.update(newView, "segment list changed")
}
}
24 changes: 16 additions & 8 deletions internal/rootcoord/quota_center.go
Original file line number Diff line number Diff line change
Expand Up @@ -1170,6 +1170,15 @@ func (q *QuotaCenter) calculateRates() error {
func (q *QuotaCenter) resetAllCurrentRates() error {
clusterLimiter := newParamLimiterFunc(internalpb.RateScope_Cluster, allOps)()
q.rateLimiter = rlinternal.NewRateLimiterTree(clusterLimiter)

enablePartitionRateLimit := false
for rt := range getRateTypes(internalpb.RateScope_Partition, allOps) {
r := quota.GetQuotaValue(internalpb.RateScope_Partition, rt, Params)
if Limit(r) != Inf {
enablePartitionRateLimit = true
}
}

initLimiters := func(sourceCollections map[int64]map[int64][]int64) {
for dbID, collections := range sourceCollections {
for collectionID, partitionIDs := range collections {
Expand All @@ -1180,21 +1189,20 @@ func (q *QuotaCenter) resetAllCurrentRates() error {
}
return limitVal
}
q.rateLimiter.GetOrCreateDatabaseLimiters(dbID, newParamLimiterFunc(internalpb.RateScope_Database, allOps))
q.rateLimiter.GetOrCreateCollectionLimiters(dbID, collectionID,
newParamLimiterFunc(internalpb.RateScope_Database, allOps),
newParamLimiterFuncWithLimitFunc(internalpb.RateScope_Collection, allOps, getCollectionLimitVal))

if !enablePartitionRateLimit {
continue
}
for _, partitionID := range partitionIDs {
q.rateLimiter.GetOrCreatePartitionLimiters(dbID, collectionID, partitionID,
newParamLimiterFunc(internalpb.RateScope_Database, allOps),
newParamLimiterFuncWithLimitFunc(internalpb.RateScope_Collection, allOps, getCollectionLimitVal),
newParamLimiterFunc(internalpb.RateScope_Partition, allOps))
}
if len(partitionIDs) == 0 {
q.rateLimiter.GetOrCreateCollectionLimiters(dbID, collectionID,
newParamLimiterFunc(internalpb.RateScope_Database, allOps),
newParamLimiterFuncWithLimitFunc(internalpb.RateScope_Collection, allOps, getCollectionLimitVal))
}
}
if len(collections) == 0 {
q.rateLimiter.GetOrCreateDatabaseLimiters(dbID, newParamLimiterFunc(internalpb.RateScope_Database, allOps))
}
}
}
Expand Down
13 changes: 12 additions & 1 deletion pkg/util/paramtable/component_param.go
Original file line number Diff line number Diff line change
Expand Up @@ -3254,7 +3254,8 @@ type dataCoordConfig struct {
L0DeleteCompactionSlotUsage ParamItem `refreshable:"true"`

// data view
DataViewUpdateInterval ParamItem `refreshable:"true"`
DataViewUpdateInterval ParamItem `refreshable:"true"`
CPIntervalToUpdateDataView ParamItem `refreshable:"true"`
}

func (p *dataCoordConfig) init(base *BaseTable) {
Expand Down Expand Up @@ -4107,6 +4108,16 @@ During compaction, the size of segment # of rows is able to exceed segment max #
Export: false,
}
p.DataViewUpdateInterval.Init(base.mgr)

p.CPIntervalToUpdateDataView = ParamItem{
Key: "dataCoord.dataView.cpInterval",
Version: "2.5.0",
Doc: "cpInterval is a time interval in seconds. If the time interval between the new channel checkpoint and the current channel checkpoint exceeds cpInterval, it will trigger a data view update.",
DefaultValue: "600",
PanicIfEmpty: false,
Export: false,
}
p.CPIntervalToUpdateDataView.Init(base.mgr)
}

// /////////////////////////////////////////////////////////////////////////////
Expand Down
2 changes: 2 additions & 0 deletions pkg/util/paramtable/component_param_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,8 @@ func TestComponentParam(t *testing.T) {
assert.Equal(t, 4, Params.L0DeleteCompactionSlotUsage.GetAsInt())
params.Save("datacoord.scheduler.taskSlowThreshold", "1000")
assert.Equal(t, 1000*time.Second, Params.TaskSlowThreshold.GetAsDuration(time.Second))
assert.Equal(t, 10*time.Second, Params.DataViewUpdateInterval.GetAsDuration(time.Second))
assert.Equal(t, 600*time.Second, Params.CPIntervalToUpdateDataView.GetAsDuration(time.Second))
})

t.Run("test dataNodeConfig", func(t *testing.T) {
Expand Down

0 comments on commit 0930430

Please sign in to comment.