From 0930430a68d3dacce4c621f6eb58cbf457599a90 Mon Sep 17 00:00:00 2001 From: "yihao.dai" Date: Thu, 28 Nov 2024 10:45:46 +0800 Subject: [PATCH] enhance: [10kcp] Skip creating partition rate limiters when not enable (#38062) issue: https://github.com/milvus-io/milvus/issues/37630 Signed-off-by: bigsheeper --- internal/datacoord/dataview/view_manager.go | 22 ++++++++++--------- internal/rootcoord/quota_center.go | 24 ++++++++++++++------- pkg/util/paramtable/component_param.go | 13 ++++++++++- pkg/util/paramtable/component_param_test.go | 2 ++ 4 files changed, 42 insertions(+), 19 deletions(-) diff --git a/internal/datacoord/dataview/view_manager.go b/internal/datacoord/dataview/view_manager.go index f5638f4e277b1..899423f5bd2cd 100644 --- a/internal/datacoord/dataview/view_manager.go +++ b/internal/datacoord/dataview/view_manager.go @@ -25,6 +25,7 @@ import ( "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/funcutil" "github.com/milvus-io/milvus/pkg/util/paramtable" + "github.com/milvus-io/milvus/pkg/util/tsoutil" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -110,9 +111,9 @@ func (m *dataViewManager) Close() { }) } -func (m *dataViewManager) update(view *DataView) { +func (m *dataViewManager) update(view *DataView, reason string) { m.currentViews.Insert(view.CollectionID, view) - log.Info("update new data view", zap.Int64("collectionID", view.CollectionID), zap.Int64("version", view.Version)) + log.Info("update new data view", zap.Int64("collectionID", view.CollectionID), zap.Int64("version", view.Version), zap.String("reason", reason)) } func (m *dataViewManager) TryUpdateDataView(collectionID int64) { @@ -127,7 +128,7 @@ func (m *dataViewManager) TryUpdateDataView(collectionID int64) { currentView, ok := m.currentViews.Get(collectionID) if !ok { // update due to data view is empty - m.update(newView) + m.update(newView, "init data view") return } // no-op if the incoming version is less than the current version. @@ -141,7 +142,7 @@ func (m *dataViewManager) TryUpdateDataView(collectionID int64) { current, ok := currentView.Channels[channel] if !ok { // update due to channel info is empty - m.update(newView) + m.update(newView, "init channel info") return } if !funcutil.SliceSetEqual(new.GetLevelZeroSegmentIds(), current.GetLevelZeroSegmentIds()) || @@ -150,24 +151,25 @@ func (m *dataViewManager) TryUpdateDataView(collectionID int64) { !funcutil.SliceSetEqual(new.GetIndexedSegmentIds(), current.GetIndexedSegmentIds()) || !funcutil.SliceSetEqual(new.GetDroppedSegmentIds(), current.GetDroppedSegmentIds()) { // update due to segments list changed - m.update(newView) + m.update(newView, "channel segments list changed") return } if !typeutil.MapEqual(new.GetPartitionStatsVersions(), current.GetPartitionStatsVersions()) { // update due to partition stats changed - m.update(newView) + m.update(newView, "partition stats changed") return } - // TODO: It might be too frequent. - if new.GetSeekPosition().GetTimestamp() > current.GetSeekPosition().GetTimestamp() { + newTime := tsoutil.PhysicalTime(new.GetSeekPosition().GetTimestamp()) + curTime := tsoutil.PhysicalTime(current.GetSeekPosition().GetTimestamp()) + if newTime.Sub(curTime) > paramtable.Get().DataCoordCfg.CPIntervalToUpdateDataView.GetAsDuration(time.Second) { // update due to channel cp advanced - m.update(newView) + m.update(newView, "channel cp advanced") return } } if !typeutil.MapEqual(newView.Segments, currentView.Segments) { // update due to segments list changed - m.update(newView) + m.update(newView, "segment list changed") } } diff --git a/internal/rootcoord/quota_center.go b/internal/rootcoord/quota_center.go index cca525f8fbbea..67e99107c8f37 100644 --- a/internal/rootcoord/quota_center.go +++ b/internal/rootcoord/quota_center.go @@ -1170,6 +1170,15 @@ func (q *QuotaCenter) calculateRates() error { func (q *QuotaCenter) resetAllCurrentRates() error { clusterLimiter := newParamLimiterFunc(internalpb.RateScope_Cluster, allOps)() q.rateLimiter = rlinternal.NewRateLimiterTree(clusterLimiter) + + enablePartitionRateLimit := false + for rt := range getRateTypes(internalpb.RateScope_Partition, allOps) { + r := quota.GetQuotaValue(internalpb.RateScope_Partition, rt, Params) + if Limit(r) != Inf { + enablePartitionRateLimit = true + } + } + initLimiters := func(sourceCollections map[int64]map[int64][]int64) { for dbID, collections := range sourceCollections { for collectionID, partitionIDs := range collections { @@ -1180,21 +1189,20 @@ func (q *QuotaCenter) resetAllCurrentRates() error { } return limitVal } + q.rateLimiter.GetOrCreateDatabaseLimiters(dbID, newParamLimiterFunc(internalpb.RateScope_Database, allOps)) + q.rateLimiter.GetOrCreateCollectionLimiters(dbID, collectionID, + newParamLimiterFunc(internalpb.RateScope_Database, allOps), + newParamLimiterFuncWithLimitFunc(internalpb.RateScope_Collection, allOps, getCollectionLimitVal)) + if !enablePartitionRateLimit { + continue + } for _, partitionID := range partitionIDs { q.rateLimiter.GetOrCreatePartitionLimiters(dbID, collectionID, partitionID, newParamLimiterFunc(internalpb.RateScope_Database, allOps), newParamLimiterFuncWithLimitFunc(internalpb.RateScope_Collection, allOps, getCollectionLimitVal), newParamLimiterFunc(internalpb.RateScope_Partition, allOps)) } - if len(partitionIDs) == 0 { - q.rateLimiter.GetOrCreateCollectionLimiters(dbID, collectionID, - newParamLimiterFunc(internalpb.RateScope_Database, allOps), - newParamLimiterFuncWithLimitFunc(internalpb.RateScope_Collection, allOps, getCollectionLimitVal)) - } - } - if len(collections) == 0 { - q.rateLimiter.GetOrCreateDatabaseLimiters(dbID, newParamLimiterFunc(internalpb.RateScope_Database, allOps)) } } } diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index d75da1371e86a..e1bf5075c157e 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -3254,7 +3254,8 @@ type dataCoordConfig struct { L0DeleteCompactionSlotUsage ParamItem `refreshable:"true"` // data view - DataViewUpdateInterval ParamItem `refreshable:"true"` + DataViewUpdateInterval ParamItem `refreshable:"true"` + CPIntervalToUpdateDataView ParamItem `refreshable:"true"` } func (p *dataCoordConfig) init(base *BaseTable) { @@ -4107,6 +4108,16 @@ During compaction, the size of segment # of rows is able to exceed segment max # Export: false, } p.DataViewUpdateInterval.Init(base.mgr) + + p.CPIntervalToUpdateDataView = ParamItem{ + Key: "dataCoord.dataView.cpInterval", + Version: "2.5.0", + Doc: "cpInterval is a time interval in seconds. If the time interval between the new channel checkpoint and the current channel checkpoint exceeds cpInterval, it will trigger a data view update.", + DefaultValue: "600", + PanicIfEmpty: false, + Export: false, + } + p.CPIntervalToUpdateDataView.Init(base.mgr) } // ///////////////////////////////////////////////////////////////////////////// diff --git a/pkg/util/paramtable/component_param_test.go b/pkg/util/paramtable/component_param_test.go index bb0cc532caf20..4902b01847022 100644 --- a/pkg/util/paramtable/component_param_test.go +++ b/pkg/util/paramtable/component_param_test.go @@ -507,6 +507,8 @@ func TestComponentParam(t *testing.T) { assert.Equal(t, 4, Params.L0DeleteCompactionSlotUsage.GetAsInt()) params.Save("datacoord.scheduler.taskSlowThreshold", "1000") assert.Equal(t, 1000*time.Second, Params.TaskSlowThreshold.GetAsDuration(time.Second)) + assert.Equal(t, 10*time.Second, Params.DataViewUpdateInterval.GetAsDuration(time.Second)) + assert.Equal(t, 600*time.Second, Params.CPIntervalToUpdateDataView.GetAsDuration(time.Second)) }) t.Run("test dataNodeConfig", func(t *testing.T) {