From 24fb10114b85e270819dd752c119539feaf37f80 Mon Sep 17 00:00:00 2001 From: jaime Date: Wed, 4 Sep 2024 14:39:10 +0800 Subject: [PATCH] enhance: remove cooling off in rate limiter for read requests (#35935) issue: #35934 Signed-off-by: jaime --- configs/milvus.yaml | 25 -- internal/proxy/impl.go | 14 - internal/proxy/metrics_info.go | 2 - internal/proxy/proxy.go | 1 - internal/querynodev2/collector/collector.go | 25 -- internal/querynodev2/metrics_info.go | 47 --- internal/querynodev2/services.go | 5 - internal/querynodev2/tasks/query_task.go | 4 - internal/querynodev2/tasks/search_task.go | 5 - internal/rootcoord/quota_center.go | 307 -------------------- internal/rootcoord/quota_center_test.go | 254 +--------------- pkg/util/metricsinfo/quota_metric.go | 20 -- pkg/util/paramtable/quota_param.go | 164 +---------- pkg/util/paramtable/quota_param_test.go | 7 - 14 files changed, 3 insertions(+), 877 deletions(-) diff --git a/configs/milvus.yaml b/configs/milvus.yaml index f931e3cbde1dc..1c44c484e1636 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -983,31 +983,6 @@ quotaAndLimits: # forceDeny false means dql requests are allowed (except for some # specific conditions, such as collection has been dropped), true means always reject all dql requests. forceDeny: false - queueProtection: - enabled: false - # nqInQueueThreshold indicated that the system was under backpressure for Search/Query path. - # If NQ in any QueryNode's queue is greater than nqInQueueThreshold, search&query rates would gradually cool off - # until the NQ in queue no longer exceeds nqInQueueThreshold. We think of the NQ of query request as 1. - # int, default no limit - nqInQueueThreshold: -1 - # queueLatencyThreshold indicated that the system was under backpressure for Search/Query path. - # If dql latency of queuing is greater than queueLatencyThreshold, search&query rates would gradually cool off - # until the latency of queuing no longer exceeds queueLatencyThreshold. - # The latency here refers to the averaged latency over a period of time. - # milliseconds, default no limit - queueLatencyThreshold: -1 - resultProtection: - enabled: false - # maxReadResultRate indicated that the system was under backpressure for Search/Query path. - # If dql result rate is greater than maxReadResultRate, search&query rates would gradually cool off - # until the read result rate no longer exceeds maxReadResultRate. - # MB/s, default no limit - maxReadResultRate: -1 - maxReadResultRatePerDB: -1 - maxReadResultRatePerCollection: -1 - # colOffSpeed is the speed of search&query rates cool off. - # (0, 1] - coolOffSpeed: 0.9 trace: # trace exporter type, default is stdout, diff --git a/internal/proxy/impl.go b/internal/proxy/impl.go index 1ee1a0f214fc9..8de788d9b8d20 100644 --- a/internal/proxy/impl.go +++ b/internal/proxy/impl.go @@ -2921,9 +2921,6 @@ func (node *Proxy) search(ctx context.Context, request *milvuspb.SearchRequest) request.GetCollectionName(), ).Add(float64(request.GetNq())) - subLabel := GetCollectionRateSubLabel(request) - rateCol.Add(internalpb.RateType_DQLSearch.String(), float64(request.GetNq()), subLabel) - if err := merr.CheckHealthy(node.GetStateCode()); err != nil { return &milvuspb.SearchResults{ Status: merr.Status(err), @@ -3100,7 +3097,6 @@ func (node *Proxy) search(ctx context.Context, request *milvuspb.SearchRequest) } metrics.ProxyReadReqSendBytes.WithLabelValues(nodeID).Add(float64(sentSize)) - rateCol.Add(metricsinfo.ReadResultThroughput, float64(sentSize), subLabel) } return qt.result, nil } @@ -3131,13 +3127,6 @@ func (node *Proxy) hybridSearch(ctx context.Context, request *milvuspb.HybridSea request.GetCollectionName(), ).Add(float64(receiveSize)) - subLabel := GetCollectionRateSubLabel(request) - allNQ := int64(0) - for _, searchRequest := range request.Requests { - allNQ += searchRequest.GetNq() - } - rateCol.Add(internalpb.RateType_DQLSearch.String(), float64(allNQ), subLabel) - if err := merr.CheckHealthy(node.GetStateCode()); err != nil { return &milvuspb.SearchResults{ Status: merr.Status(err), @@ -3297,7 +3286,6 @@ func (node *Proxy) hybridSearch(ctx context.Context, request *milvuspb.HybridSea } metrics.ProxyReadReqSendBytes.WithLabelValues(nodeID).Add(float64(sentSize)) - rateCol.Add(metricsinfo.ReadResultThroughput, float64(sentSize), subLabel) } return qt.result, nil } @@ -3612,7 +3600,6 @@ func (node *Proxy) Query(ctx context.Context, request *milvuspb.QueryRequest) (* ).Inc() sentSize := proto.Size(qt.result) - rateCol.Add(metricsinfo.ReadResultThroughput, float64(sentSize), subLabel) metrics.ProxyReadReqSendBytes.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10)).Add(float64(sentSize)) username := GetCurUserFromContextOrDefault(ctx) @@ -6438,5 +6425,4 @@ func (node *Proxy) ListImports(ctx context.Context, req *internalpb.ListImportsR func DeregisterSubLabel(subLabel string) { rateCol.DeregisterSubLabel(internalpb.RateType_DQLQuery.String(), subLabel) rateCol.DeregisterSubLabel(internalpb.RateType_DQLSearch.String(), subLabel) - rateCol.DeregisterSubLabel(metricsinfo.ReadResultThroughput, subLabel) } diff --git a/internal/proxy/metrics_info.go b/internal/proxy/metrics_info.go index 109ca02211ee8..ba1bad9f15202 100644 --- a/internal/proxy/metrics_info.go +++ b/internal/proxy/metrics_info.go @@ -71,8 +71,6 @@ func getQuotaMetrics() (*metricsinfo.ProxyQuotaMetrics, error) { getSubLabelRateMetric(internalpb.RateType_DQLSearch.String()) getRateMetric(internalpb.RateType_DQLQuery.String()) getSubLabelRateMetric(internalpb.RateType_DQLQuery.String()) - getRateMetric(metricsinfo.ReadResultThroughput) - getSubLabelRateMetric(metricsinfo.ReadResultThroughput) if err != nil { return nil, err } diff --git a/internal/proxy/proxy.go b/internal/proxy/proxy.go index 241ec76783428..acb4222294cb1 100644 --- a/internal/proxy/proxy.go +++ b/internal/proxy/proxy.go @@ -207,7 +207,6 @@ func (node *Proxy) initRateCollector() error { // TODO: add bulkLoad rate rateCol.Register(internalpb.RateType_DQLSearch.String()) rateCol.Register(internalpb.RateType_DQLQuery.String()) - rateCol.Register(metricsinfo.ReadResultThroughput) return nil } diff --git a/internal/querynodev2/collector/collector.go b/internal/querynodev2/collector/collector.go index 66e48ba20ed31..90e43d73a01bf 100644 --- a/internal/querynodev2/collector/collector.go +++ b/internal/querynodev2/collector/collector.go @@ -32,31 +32,11 @@ var Counter *counter func RateMetrics() []string { return []string{ - metricsinfo.NQPerSecond, - metricsinfo.SearchThroughput, metricsinfo.InsertConsumeThroughput, metricsinfo.DeleteConsumeThroughput, } } -func AverageMetrics() []string { - return []string{ - metricsinfo.QueryQueueMetric, - metricsinfo.SearchQueueMetric, - } -} - -func ConstructLabel(subs ...string) string { - label := "" - for id, sub := range subs { - label += sub - if id != len(subs)-1 { - label += "-" - } - } - return label -} - func init() { var err error Rate, err = ratelimitutil.NewRateCollector(ratelimitutil.DefaultWindow, ratelimitutil.DefaultGranularity, false) @@ -70,9 +50,4 @@ func init() { for _, label := range RateMetrics() { Rate.Register(label) } - // init average metric - - for _, label := range AverageMetrics() { - Average.Register(label) - } } diff --git a/internal/querynodev2/metrics_info.go b/internal/querynodev2/metrics_info.go index b4c50a5d1b9fb..bbd8b61913182 100644 --- a/internal/querynodev2/metrics_info.go +++ b/internal/querynodev2/metrics_info.go @@ -19,7 +19,6 @@ package querynodev2 import ( "context" "fmt" - "time" "github.com/samber/lo" @@ -51,40 +50,6 @@ func getRateMetric() ([]metricsinfo.RateMetric, error) { return rms, nil } -func getSearchNQInQueue() (metricsinfo.ReadInfoInQueue, error) { - average, err := collector.Average.Average(metricsinfo.SearchQueueMetric) - if err != nil { - return metricsinfo.ReadInfoInQueue{}, err - } - defer collector.Average.Reset(metricsinfo.SearchQueueMetric) - - readyQueueLabel := collector.ConstructLabel(metricsinfo.ReadyQueueType, metricsinfo.SearchQueueMetric) - executeQueueLabel := collector.ConstructLabel(metricsinfo.ExecuteQueueType, metricsinfo.SearchQueueMetric) - - return metricsinfo.ReadInfoInQueue{ - ReadyQueue: collector.Counter.Get(readyQueueLabel), - ExecuteChan: collector.Counter.Get(executeQueueLabel), - AvgQueueDuration: time.Duration(int64(average)), - }, nil -} - -func getQueryTasksInQueue() (metricsinfo.ReadInfoInQueue, error) { - average, err := collector.Average.Average(metricsinfo.QueryQueueMetric) - if err != nil { - return metricsinfo.ReadInfoInQueue{}, err - } - defer collector.Average.Reset(metricsinfo.QueryQueueMetric) - - readyQueueLabel := collector.ConstructLabel(metricsinfo.ReadyQueueType, metricsinfo.QueryQueueMetric) - executeQueueLabel := collector.ConstructLabel(metricsinfo.ExecuteQueueType, metricsinfo.QueryQueueMetric) - - return metricsinfo.ReadInfoInQueue{ - ReadyQueue: collector.Counter.Get(readyQueueLabel), - ExecuteChan: collector.Counter.Get(executeQueueLabel), - AvgQueueDuration: time.Duration(int64(average)), - }, nil -} - // getQuotaMetrics returns QueryNodeQuotaMetrics. func getQuotaMetrics(node *QueryNode) (*metricsinfo.QueryNodeQuotaMetrics, error) { rms, err := getRateMetric() @@ -92,16 +57,6 @@ func getQuotaMetrics(node *QueryNode) (*metricsinfo.QueryNodeQuotaMetrics, error return nil, err } - sqms, err := getSearchNQInQueue() - if err != nil { - return nil, err - } - - qqms, err := getQueryTasksInQueue() - if err != nil { - return nil, err - } - minTsafeChannel, minTsafe := node.tSafeManager.Min() collections := node.manager.Collection.List() nodeID := fmt.Sprint(node.GetNodeID()) @@ -178,8 +133,6 @@ func getQuotaMetrics(node *QueryNode) (*metricsinfo.QueryNodeQuotaMetrics, error MinFlowGraphTt: minTsafe, NumFlowGraph: node.pipelineManager.Num(), }, - SearchQueue: sqms, - QueryQueue: qqms, GrowingSegmentsSize: totalGrowingSize, Effect: metricsinfo.NodeEffect{ NodeID: node.GetNodeID(), diff --git a/internal/querynodev2/services.go b/internal/querynodev2/services.go index d3b5dad0489c1..b471b37278793 100644 --- a/internal/querynodev2/services.go +++ b/internal/querynodev2/services.go @@ -38,7 +38,6 @@ import ( "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/proto/segcorepb" - "github.com/milvus-io/milvus/internal/querynodev2/collector" "github.com/milvus-io/milvus/internal/querynodev2/delegator" "github.com/milvus-io/milvus/internal/querynodev2/segments" "github.com/milvus-io/milvus/internal/querynodev2/tasks" @@ -810,8 +809,6 @@ func (node *QueryNode) Search(ctx context.Context, req *querypb.SearchRequest) ( WithLabelValues(fmt.Sprint(node.GetNodeID()), metrics.SearchLabel, metrics.ReduceShards, metrics.BatchReduce). Observe(float64(reduceLatency.Milliseconds())) - collector.Rate.Add(metricsinfo.NQPerSecond, float64(req.GetReq().GetNq())) - collector.Rate.Add(metricsinfo.SearchThroughput, float64(proto.Size(req))) metrics.QueryNodeExecuteCounter.WithLabelValues(strconv.FormatInt(node.GetNodeID(), 10), metrics.SearchLabel). Add(float64(proto.Size(req))) @@ -958,7 +955,6 @@ func (node *QueryNode) Query(ctx context.Context, req *querypb.QueryRequest) (*i metrics.QueryLabel, metrics.ReduceShards, metrics.BatchReduce). Observe(float64(reduceLatency.Milliseconds())) - collector.Rate.Add(metricsinfo.NQPerSecond, 1) metrics.QueryNodeExecuteCounter.WithLabelValues(strconv.FormatInt(node.GetNodeID(), 10), metrics.QueryLabel).Add(float64(proto.Size(req))) relatedDataSize := lo.Reduce(toMergeResults, func(acc int64, result *internalpb.RetrieveResults, _ int) int64 { return acc + result.GetCostAggregation().GetTotalRelatedDataSize() @@ -1021,7 +1017,6 @@ func (node *QueryNode) QueryStream(req *querypb.QueryRequest, srv querypb.QueryN return nil } - collector.Rate.Add(metricsinfo.NQPerSecond, 1) metrics.QueryNodeExecuteCounter.WithLabelValues(strconv.FormatInt(node.GetNodeID(), 10), metrics.QueryLabel).Add(float64(proto.Size(req))) return nil } diff --git a/internal/querynodev2/tasks/query_task.go b/internal/querynodev2/tasks/query_task.go index d4b0ec5c8061e..2a655460a8aa2 100644 --- a/internal/querynodev2/tasks/query_task.go +++ b/internal/querynodev2/tasks/query_task.go @@ -14,11 +14,9 @@ import ( "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/proto/segcorepb" - "github.com/milvus-io/milvus/internal/querynodev2/collector" "github.com/milvus-io/milvus/internal/querynodev2/segments" "github.com/milvus-io/milvus/pkg/metrics" "github.com/milvus-io/milvus/pkg/util/merr" - "github.com/milvus-io/milvus/pkg/util/metricsinfo" "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/timerecord" "github.com/milvus-io/milvus/pkg/util/typeutil" @@ -87,8 +85,6 @@ func (t *QueryTask) PreExecute() error { username). Observe(inQueueDurationMS) - // Update collector for query node quota. - collector.Average.Add(metricsinfo.QueryQueueMetric, float64(inQueueDuration.Microseconds())) return nil } diff --git a/internal/querynodev2/tasks/search_task.go b/internal/querynodev2/tasks/search_task.go index 497bf14d73919..a7423ac716d39 100644 --- a/internal/querynodev2/tasks/search_task.go +++ b/internal/querynodev2/tasks/search_task.go @@ -19,13 +19,11 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/proto/querypb" - "github.com/milvus-io/milvus/internal/querynodev2/collector" "github.com/milvus-io/milvus/internal/querynodev2/segments" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/metrics" "github.com/milvus-io/milvus/pkg/util/funcutil" "github.com/milvus-io/milvus/pkg/util/merr" - "github.com/milvus-io/milvus/pkg/util/metricsinfo" "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/timerecord" "github.com/milvus-io/milvus/pkg/util/typeutil" @@ -120,9 +118,6 @@ func (t *SearchTask) PreExecute() error { username). Observe(inQueueDurationMS) - // Update collector for query node quota. - collector.Average.Add(metricsinfo.SearchQueueMetric, float64(inQueueDuration.Microseconds())) - // Execute merged task's PreExecute. for _, subTask := range t.others { err := subTask.PreExecute() diff --git a/internal/rootcoord/quota_center.go b/internal/rootcoord/quota_center.go index 49aa5965671cf..4d90be4fe342d 100644 --- a/internal/rootcoord/quota_center.go +++ b/internal/rootcoord/quota_center.go @@ -685,236 +685,6 @@ func (q *QuotaCenter) getDenyReadingDBs() map[int64]struct{} { return dbIDs } -// getReadRates get rate information of collections and databases from proxy metrics -func (q *QuotaCenter) getReadRates() (map[string]float64, map[string]map[string]map[string]float64) { - // label metric - metricMap := make(map[string]float64) - // sub label metric, label -> db -> collection -> value - collectionMetricMap := make(map[string]map[string]map[string]float64) - for _, metric := range q.proxyMetrics { - for _, rm := range metric.Rms { - if !ratelimitutil.IsSubLabel(rm.Label) { - metricMap[rm.Label] += rm.Rate - continue - } - mainLabel, database, collection, ok := ratelimitutil.SplitCollectionSubLabel(rm.Label) - if !ok { - continue - } - labelMetric, ok := collectionMetricMap[mainLabel] - if !ok { - labelMetric = make(map[string]map[string]float64) - collectionMetricMap[mainLabel] = labelMetric - } - databaseMetric, ok := labelMetric[database] - if !ok { - databaseMetric = make(map[string]float64) - labelMetric[database] = databaseMetric - } - databaseMetric[collection] += rm.Rate - } - } - return metricMap, collectionMetricMap -} - -func (q *QuotaCenter) getLimitedDBAndCollections(metricMap map[string]float64, - collectionMetricMap map[string]map[string]map[string]float64, -) (bool, *typeutil.Set[string], *typeutil.Set[string]) { - limitDBNameSet := typeutil.NewSet[string]() - limitCollectionNameSet := typeutil.NewSet[string]() - clusterLimit := false - - formatCollctionRateKey := func(dbName, collectionName string) string { - return fmt.Sprintf("%s.%s", dbName, collectionName) - } - - enableResultProtection := Params.QuotaConfig.ResultProtectionEnabled.GetAsBool() - if enableResultProtection { - maxRate := Params.QuotaConfig.MaxReadResultRate.GetAsFloat() - maxDBRate := Params.QuotaConfig.MaxReadResultRatePerDB.GetAsFloat() - maxCollectionRate := Params.QuotaConfig.MaxReadResultRatePerCollection.GetAsFloat() - - dbRateCount := make(map[string]float64) - collectionRateCount := make(map[string]float64) - rateCount := metricMap[metricsinfo.ReadResultThroughput] - for mainLabel, labelMetric := range collectionMetricMap { - if mainLabel != metricsinfo.ReadResultThroughput { - continue - } - for database, databaseMetric := range labelMetric { - for collection, metricValue := range databaseMetric { - dbRateCount[database] += metricValue - collectionRateCount[formatCollctionRateKey(database, collection)] = metricValue - } - } - } - if rateCount >= maxRate { - clusterLimit = true - } - for s, f := range dbRateCount { - if f >= maxDBRate { - limitDBNameSet.Insert(s) - } - } - for s, f := range collectionRateCount { - if f >= maxCollectionRate { - limitCollectionNameSet.Insert(s) - } - } - } - return clusterLimit, &limitDBNameSet, &limitCollectionNameSet -} - -func (q *QuotaCenter) coolOffDatabaseReading(deniedDatabaseIDs map[int64]struct{}, limitDBNameSet *typeutil.Set[string], - collectionMetricMap map[string]map[string]map[string]float64, log *log.MLogger, -) error { - if limitDBNameSet.Len() > 0 { - databaseSearchRate := make(map[string]float64) - databaseQueryRate := make(map[string]float64) - for mainLabel, labelMetric := range collectionMetricMap { - var databaseRate map[string]float64 - if mainLabel == internalpb.RateType_DQLSearch.String() { - databaseRate = databaseSearchRate - } else if mainLabel == internalpb.RateType_DQLQuery.String() { - databaseRate = databaseQueryRate - } else { - continue - } - for database, databaseMetric := range labelMetric { - for _, metricValue := range databaseMetric { - databaseRate[database] += metricValue - } - } - } - - coolOffSpeed := Params.QuotaConfig.CoolOffSpeed.GetAsFloat() - limitDBNameSet.Range(func(name string) bool { - dbID, ok := q.dbs.Get(name) - if !ok { - log.Warn("db not found", zap.String("dbName", name)) - return true - } - - // skip this database because it has been denied access for reading - _, ok = deniedDatabaseIDs[dbID] - if ok { - return true - } - - dbLimiter := q.rateLimiter.GetDatabaseLimiters(dbID) - if dbLimiter == nil { - log.Warn("database limiter not found", zap.Int64("dbID", dbID)) - return true - } - - realTimeSearchRate := databaseSearchRate[name] - realTimeQueryRate := databaseQueryRate[name] - q.coolOffReading(realTimeSearchRate, realTimeQueryRate, coolOffSpeed, dbLimiter, log) - return true - }) - } - return nil -} - -func (q *QuotaCenter) coolOffCollectionReading(deniedDatabaseIDs map[int64]struct{}, limitCollectionSet *typeutil.UniqueSet, limitCollectionNameSet *typeutil.Set[string], - collectionMetricMap map[string]map[string]map[string]float64, log *log.MLogger, -) error { - var updateLimitErr error - coolOffSpeed := Params.QuotaConfig.CoolOffSpeed.GetAsFloat() - - splitCollctionRateKey := func(key string) (string, string) { - parts := strings.Split(key, ".") - return parts[0], parts[1] - } - - dbIDs := make(map[int64]string, q.dbs.Len()) - collectionIDs := make(map[int64]string, q.collections.Len()) - q.dbs.Range(func(name string, id int64) bool { - dbIDs[id] = name - return true - }) - q.collections.Range(func(name string, id int64) bool { - _, collectionName := SplitCollectionKey(name) - collectionIDs[id] = collectionName - return true - }) - - limitCollectionNameSet.Range(func(name string) bool { - dbName, collectionName := splitCollctionRateKey(name) - dbID, ok := q.dbs.Get(dbName) - if !ok { - log.Warn("db not found", zap.String("dbName", dbName)) - updateLimitErr = fmt.Errorf("db not found: %s", dbName) - return true - } - collectionID, ok := q.collections.Get(FormatCollectionKey(dbID, collectionName)) - if !ok { - log.Warn("collection not found", zap.String("collectionName", name)) - updateLimitErr = fmt.Errorf("collection not found: %s", name) - return true - } - limitCollectionSet.Insert(collectionID) - return true - }) - if updateLimitErr != nil { - return updateLimitErr - } - - safeGetCollectionRate := func(label, dbName, collectionName string) float64 { - if labelMetric, ok := collectionMetricMap[label]; ok { - if dbMetric, ok := labelMetric[dbName]; ok { - if rate, ok := dbMetric[collectionName]; ok { - return rate - } - } - } - return 0 - } - - coolOffCollectionID := func(collections ...int64) error { - for _, collection := range collections { - dbID, ok := q.collectionIDToDBID.Get(collection) - if !ok { - return fmt.Errorf("db ID not found of collection ID: %d", collection) - } - // skip this database because it has been denied access for reading - _, ok = deniedDatabaseIDs[dbID] - if ok { - continue - } - - collectionLimiter := q.rateLimiter.GetCollectionLimiters(dbID, collection) - if collectionLimiter == nil { - return fmt.Errorf("collection limiter not found: %d", collection) - } - dbName, ok := dbIDs[dbID] - if !ok { - return fmt.Errorf("db name not found of db ID: %d", dbID) - } - collectionName, ok := collectionIDs[collection] - if !ok { - return fmt.Errorf("collection name not found of collection ID: %d", collection) - } - - realTimeSearchRate := safeGetCollectionRate(internalpb.RateType_DQLSearch.String(), dbName, collectionName) - realTimeQueryRate := safeGetCollectionRate(internalpb.RateType_DQLQuery.String(), dbName, collectionName) - q.coolOffReading(realTimeSearchRate, realTimeQueryRate, coolOffSpeed, collectionLimiter, log) - - collectionProps := q.getCollectionLimitProperties(collection) - q.guaranteeMinRate(getCollectionRateLimitConfig(collectionProps, common.CollectionSearchRateMinKey), - internalpb.RateType_DQLSearch, collectionLimiter) - q.guaranteeMinRate(getCollectionRateLimitConfig(collectionProps, common.CollectionQueryRateMinKey), - internalpb.RateType_DQLQuery, collectionLimiter) - } - return nil - } - - if updateLimitErr = coolOffCollectionID(limitCollectionSet.Collect()...); updateLimitErr != nil { - return updateLimitErr - } - return nil -} - // calculateReadRates calculates and sets dql rates. func (q *QuotaCenter) calculateReadRates() error { log := log.Ctx(context.Background()).WithRateGroup("rootcoord.QuotaCenter", 1.0, 60.0) @@ -927,86 +697,9 @@ func (q *QuotaCenter) calculateReadRates() error { if len(deniedDatabaseIDs) != 0 { q.forceDenyReading(commonpb.ErrorCode_ForceDeny, false, maps.Keys(deniedDatabaseIDs), log) } - - queueLatencyThreshold := Params.QuotaConfig.QueueLatencyThreshold.GetAsDuration(time.Second) - limitCollectionSet := typeutil.NewUniqueSet() - - // enableQueueProtection && queueLatencyThreshold >= 0 means enable queue latency protection - if queueLatencyThreshold >= 0 { - for _, metric := range q.queryNodeMetrics { - searchLatency := metric.SearchQueue.AvgQueueDuration - queryLatency := metric.QueryQueue.AvgQueueDuration - if searchLatency >= queueLatencyThreshold || queryLatency >= queueLatencyThreshold { - limitCollectionSet.Insert(metric.Effect.CollectionIDs...) - } - } - } - - // queue length - enableQueueProtection := Params.QuotaConfig.QueueProtectionEnabled.GetAsBool() - nqInQueueThreshold := Params.QuotaConfig.NQInQueueThreshold.GetAsInt64() - if enableQueueProtection && nqInQueueThreshold >= 0 { - // >= 0 means enable queue length protection - sum := func(ri metricsinfo.ReadInfoInQueue) int64 { - return ri.UnsolvedQueue + ri.ReadyQueue + ri.ReceiveChan + ri.ExecuteChan - } - for _, metric := range q.queryNodeMetrics { - // We think of the NQ of query request as 1. - // search use same queue length counter with query - if sum(metric.SearchQueue) >= nqInQueueThreshold { - limitCollectionSet.Insert(metric.Effect.CollectionIDs...) - } - } - } - - metricMap, collectionMetricMap := q.getReadRates() - clusterLimit, limitDBNameSet, limitCollectionNameSet := q.getLimitedDBAndCollections(metricMap, collectionMetricMap) - - coolOffSpeed := Params.QuotaConfig.CoolOffSpeed.GetAsFloat() - - if clusterLimit { - realTimeClusterSearchRate := metricMap[internalpb.RateType_DQLSearch.String()] - realTimeClusterQueryRate := metricMap[internalpb.RateType_DQLQuery.String()] - q.coolOffReading(realTimeClusterSearchRate, realTimeClusterQueryRate, coolOffSpeed, q.rateLimiter.GetRootLimiters(), log) - } - - if updateLimitErr := q.coolOffDatabaseReading(deniedDatabaseIDs, limitDBNameSet, collectionMetricMap, - log); updateLimitErr != nil { - return updateLimitErr - } - - if updateLimitErr := q.coolOffCollectionReading(deniedDatabaseIDs, &limitCollectionSet, limitCollectionNameSet, - collectionMetricMap, log); updateLimitErr != nil { - return updateLimitErr - } - return nil } -func (q *QuotaCenter) coolOffReading(realTimeSearchRate, realTimeQueryRate, coolOffSpeed float64, - node *rlinternal.RateLimiterNode, mlog *log.MLogger, -) { - limiter := node.GetLimiters() - - v, ok := limiter.Get(internalpb.RateType_DQLSearch) - if ok && v.Limit() != Inf && realTimeSearchRate > 0 { - v.SetLimit(Limit(realTimeSearchRate * coolOffSpeed)) - mlog.RatedWarn(10, "QuotaCenter cool read rates off done", - zap.Any("level", node.Level()), - zap.Any("id", node.GetID()), - zap.Any("searchRate", v.Limit())) - } - - v, ok = limiter.Get(internalpb.RateType_DQLQuery) - if ok && v.Limit() != Inf && realTimeQueryRate > 0 { - v.SetLimit(Limit(realTimeQueryRate * coolOffSpeed)) - mlog.RatedWarn(10, "QuotaCenter cool read rates off done", - zap.Any("level", node.Level()), - zap.Any("id", node.GetID()), - zap.Any("queryRate", v.Limit())) - } -} - func (q *QuotaCenter) getDenyWritingDBs() map[int64]struct{} { dbIDs := make(map[int64]struct{}) for _, dbID := range lo.Uniq(q.collectionIDToDBID.Values()) { diff --git a/internal/rootcoord/quota_center_test.go b/internal/rootcoord/quota_center_test.go index eb30fefcb1211..cb691652db192 100644 --- a/internal/rootcoord/quota_center_test.go +++ b/internal/rootcoord/quota_center_test.go @@ -574,95 +574,13 @@ func TestQuotaCenter(t *testing.T) { quotaCenter.clearMetrics() quotaCenter.collectionIDToDBID = collectionIDToDBID quotaCenter.readableCollections = map[int64]map[int64][]int64{ - 0: {1: {}, 2: {}, 3: {}}, - 1: {4: {}}, + 0: {1: {}}, + 1: {2: {}}, } quotaCenter.dbs.Insert("default", 0) quotaCenter.dbs.Insert("db1", 1) - quotaCenter.collections.Insert("0.col1", 1) - quotaCenter.collections.Insert("0.col2", 2) - quotaCenter.collections.Insert("0.col3", 3) - quotaCenter.collections.Insert("1.col4", 4) - - colSubLabel := ratelimitutil.GetCollectionSubLabel("default", "col1") - quotaCenter.proxyMetrics = map[UniqueID]*metricsinfo.ProxyQuotaMetrics{ - 1: {Rms: []metricsinfo.RateMetric{ - {Label: internalpb.RateType_DQLSearch.String(), Rate: 100}, - {Label: internalpb.RateType_DQLQuery.String(), Rate: 100}, - {Label: ratelimitutil.FormatSubLabel(internalpb.RateType_DQLSearch.String(), colSubLabel), Rate: 100}, - {Label: ratelimitutil.FormatSubLabel(internalpb.RateType_DQLQuery.String(), colSubLabel), Rate: 100}, - }}, - } paramtable.Get().Save(Params.QuotaConfig.ForceDenyReading.Key, "false") - paramtable.Get().Save(Params.QuotaConfig.QueueProtectionEnabled.Key, "true") - paramtable.Get().Save(Params.QuotaConfig.QueueLatencyThreshold.Key, "100") - paramtable.Get().Save(Params.QuotaConfig.DQLLimitEnabled.Key, "true") - paramtable.Get().Save(Params.QuotaConfig.DQLMaxQueryRatePerCollection.Key, "500") - paramtable.Get().Save(Params.QuotaConfig.DQLMaxSearchRatePerCollection.Key, "500") - - checkLimiter := func() { - for db, collections := range quotaCenter.readableCollections { - for collection := range collections { - if collection != 1 { - continue - } - limiters := quotaCenter.rateLimiter.GetCollectionLimiters(db, collection).GetLimiters() - searchLimit, _ := limiters.Get(internalpb.RateType_DQLSearch) - assert.Equal(t, Limit(100.0*0.9), searchLimit.Limit()) - - queryLimit, _ := limiters.Get(internalpb.RateType_DQLQuery) - assert.Equal(t, Limit(100.0*0.9), queryLimit.Limit()) - } - } - } - - err := quotaCenter.resetAllCurrentRates() - assert.NoError(t, err) - - quotaCenter.queryNodeMetrics = map[UniqueID]*metricsinfo.QueryNodeQuotaMetrics{ - 1: {SearchQueue: metricsinfo.ReadInfoInQueue{ - AvgQueueDuration: Params.QuotaConfig.QueueLatencyThreshold.GetAsDuration(time.Second), - }, Effect: metricsinfo.NodeEffect{ - NodeID: 1, - CollectionIDs: []int64{1, 2, 3}, - }}, - } - - err = quotaCenter.calculateReadRates() - assert.NoError(t, err) - checkLimiter() - - paramtable.Get().Save(Params.QuotaConfig.NQInQueueThreshold.Key, "100") - quotaCenter.queryNodeMetrics = map[UniqueID]*metricsinfo.QueryNodeQuotaMetrics{ - 1: { - SearchQueue: metricsinfo.ReadInfoInQueue{ - UnsolvedQueue: Params.QuotaConfig.NQInQueueThreshold.GetAsInt64(), - }, - }, - } - err = quotaCenter.calculateReadRates() - assert.NoError(t, err) - checkLimiter() - - paramtable.Get().Save(Params.QuotaConfig.ResultProtectionEnabled.Key, "true") - paramtable.Get().Save(Params.QuotaConfig.MaxReadResultRate.Key, "1") - quotaCenter.proxyMetrics = map[UniqueID]*metricsinfo.ProxyQuotaMetrics{ - 1: { - Rms: []metricsinfo.RateMetric{ - {Label: internalpb.RateType_DQLSearch.String(), Rate: 100}, - {Label: internalpb.RateType_DQLQuery.String(), Rate: 100}, - {Label: ratelimitutil.FormatSubLabel(internalpb.RateType_DQLSearch.String(), colSubLabel), Rate: 100}, - {Label: ratelimitutil.FormatSubLabel(internalpb.RateType_DQLQuery.String(), colSubLabel), Rate: 100}, - {Label: metricsinfo.ReadResultThroughput, Rate: 1.2}, - }, - }, - } - quotaCenter.queryNodeMetrics = map[UniqueID]*metricsinfo.QueryNodeQuotaMetrics{1: {SearchQueue: metricsinfo.ReadInfoInQueue{}}} - err = quotaCenter.calculateReadRates() - assert.NoError(t, err) - checkLimiter() - meta.EXPECT().GetDatabaseByID(mock.Anything, mock.Anything, mock.Anything).Unset() meta.EXPECT().GetDatabaseByID(mock.Anything, mock.Anything, mock.Anything). RunAndReturn(func(ctx context.Context, i int64, u uint64) (*model.Database, error) { @@ -1582,174 +1500,6 @@ func TestGetRateType(t *testing.T) { }) } -func TestCalculateReadRates(t *testing.T) { - paramtable.Init() - ctx := context.Background() - - t.Run("cool off db", func(t *testing.T) { - qc := mocks.NewMockQueryCoordClient(t) - meta := mockrootcoord.NewIMetaTable(t) - meta.EXPECT().GetDatabaseByID(mock.Anything, mock.Anything, mock.Anything).Return(nil, merr.ErrDatabaseNotFound).Maybe() - - pcm := proxyutil.NewMockProxyClientManager(t) - dc := mocks.NewMockDataCoordClient(t) - core, _ := NewCore(ctx, nil) - core.tsoAllocator = newMockTsoAllocator() - - meta.EXPECT().GetCollectionByIDWithMaxTs(mock.Anything, mock.Anything).Return(nil, errors.New("mock error")) - - Params.Save(Params.QuotaConfig.ForceDenyReading.Key, "false") - defer Params.Reset(Params.QuotaConfig.ForceDenyReading.Key) - - Params.Save(Params.QuotaConfig.ResultProtectionEnabled.Key, "true") - defer Params.Reset(Params.QuotaConfig.ResultProtectionEnabled.Key) - Params.Save(Params.QuotaConfig.MaxReadResultRate.Key, "50") - defer Params.Reset(Params.QuotaConfig.MaxReadResultRate.Key) - Params.Save(Params.QuotaConfig.MaxReadResultRatePerDB.Key, "30") - defer Params.Reset(Params.QuotaConfig.MaxReadResultRatePerDB.Key) - Params.Save(Params.QuotaConfig.MaxReadResultRatePerCollection.Key, "20") - defer Params.Reset(Params.QuotaConfig.MaxReadResultRatePerCollection.Key) - Params.Save(Params.QuotaConfig.CoolOffSpeed.Key, "0.8") - defer Params.Reset(Params.QuotaConfig.CoolOffSpeed.Key) - - Params.Save(Params.QuotaConfig.DQLLimitEnabled.Key, "true") - defer Params.Reset(Params.QuotaConfig.DQLLimitEnabled.Key) - Params.Save(Params.QuotaConfig.DQLMaxSearchRate.Key, "500") - defer Params.Reset(Params.QuotaConfig.DQLMaxSearchRate.Key) - Params.Save(Params.QuotaConfig.DQLMaxSearchRatePerDB.Key, "500") - defer Params.Reset(Params.QuotaConfig.DQLMaxSearchRatePerDB.Key) - Params.Save(Params.QuotaConfig.DQLMaxSearchRatePerCollection.Key, "500") - defer Params.Reset(Params.QuotaConfig.DQLMaxSearchRatePerCollection.Key) - - quotaCenter := NewQuotaCenter(pcm, qc, dc, core.tsoAllocator, meta) - quotaCenter.dbs = typeutil.NewConcurrentMap[string, int64]() - quotaCenter.collections = typeutil.NewConcurrentMap[string, int64]() - quotaCenter.collectionIDToDBID = typeutil.NewConcurrentMap[int64, int64]() - quotaCenter.dbs.Insert("default", 1) - quotaCenter.dbs.Insert("test", 2) - quotaCenter.collections.Insert("1.col1", 10) - quotaCenter.collections.Insert("2.col2", 20) - quotaCenter.collections.Insert("2.col3", 30) - quotaCenter.collectionIDToDBID.Insert(10, 1) - quotaCenter.collectionIDToDBID.Insert(20, 2) - quotaCenter.collectionIDToDBID.Insert(30, 2) - - searchLabel := internalpb.RateType_DQLSearch.String() - quotaCenter.queryNodeMetrics = map[UniqueID]*metricsinfo.QueryNodeQuotaMetrics{} - quotaCenter.proxyMetrics = map[UniqueID]*metricsinfo.ProxyQuotaMetrics{ - 1: { - Rms: []metricsinfo.RateMetric{ - { - Label: metricsinfo.ReadResultThroughput, - Rate: 40 * 1024 * 1024, - }, - //{ - // Label: ratelimitutil.FormatSubLabel(metricsinfo.ReadResultThroughput, ratelimitutil.GetDBSubLabel("default")), - // Rate: 20 * 1024 * 1024, - //}, - { - Label: ratelimitutil.FormatSubLabel(metricsinfo.ReadResultThroughput, ratelimitutil.GetCollectionSubLabel("default", "col1")), - Rate: 15 * 1024 * 1024, - }, - //{ - // Label: ratelimitutil.FormatSubLabel(metricsinfo.ReadResultThroughput, ratelimitutil.GetDBSubLabel("test")), - // Rate: 20 * 1024 * 1024, - //}, - { - Label: ratelimitutil.FormatSubLabel(metricsinfo.ReadResultThroughput, ratelimitutil.GetCollectionSubLabel("test", "col2")), - Rate: 10 * 1024 * 1024, - }, - { - Label: ratelimitutil.FormatSubLabel(metricsinfo.ReadResultThroughput, ratelimitutil.GetCollectionSubLabel("test", "col3")), - Rate: 10 * 1024 * 1024, - }, - { - Label: searchLabel, - Rate: 20, - }, - { - Label: ratelimitutil.FormatSubLabel(searchLabel, ratelimitutil.GetDBSubLabel("default")), - Rate: 10, - }, - //{ - // Label: ratelimitutil.FormatSubLabel(searchLabel, ratelimitutil.GetDBSubLabel("test")), - // Rate: 10, - //}, - { - Label: ratelimitutil.FormatSubLabel(searchLabel, ratelimitutil.GetCollectionSubLabel("default", "col1")), - Rate: 10, - }, - { - Label: ratelimitutil.FormatSubLabel(searchLabel, ratelimitutil.GetCollectionSubLabel("test", "col2")), - Rate: 5, - }, - { - Label: ratelimitutil.FormatSubLabel(searchLabel, ratelimitutil.GetCollectionSubLabel("test", "col3")), - Rate: 5, - }, - }, - }, - 2: { - Rms: []metricsinfo.RateMetric{ - { - Label: metricsinfo.ReadResultThroughput, - Rate: 20 * 1024 * 1024, - }, - //{ - // Label: ratelimitutil.FormatSubLabel(metricsinfo.ReadResultThroughput, ratelimitutil.GetDBSubLabel("default")), - // Rate: 20 * 1024 * 1024, - //}, - { - Label: ratelimitutil.FormatSubLabel(metricsinfo.ReadResultThroughput, ratelimitutil.GetCollectionSubLabel("default", "col1")), - Rate: 20 * 1024 * 1024, - }, - { - Label: searchLabel, - Rate: 20, - }, - //{ - // Label: ratelimitutil.FormatSubLabel(searchLabel, ratelimitutil.GetDBSubLabel("default")), - // Rate: 20, - //}, - { - Label: ratelimitutil.FormatSubLabel(searchLabel, ratelimitutil.GetCollectionSubLabel("default", "col1")), - Rate: 20, - }, - }, - }, - } - - quotaCenter.rateLimiter.GetRootLimiters().GetLimiters().Insert(internalpb.RateType_DQLSearch, ratelimitutil.NewLimiter(1000, 1000)) - quotaCenter.rateLimiter.GetOrCreateCollectionLimiters(1, 10, - newParamLimiterFunc(internalpb.RateScope_Database, allOps), - newParamLimiterFunc(internalpb.RateScope_Collection, allOps)) - quotaCenter.rateLimiter.GetOrCreateCollectionLimiters(2, 20, - newParamLimiterFunc(internalpb.RateScope_Database, allOps), - newParamLimiterFunc(internalpb.RateScope_Collection, allOps)) - quotaCenter.rateLimiter.GetOrCreateCollectionLimiters(2, 30, - newParamLimiterFunc(internalpb.RateScope_Database, allOps), - newParamLimiterFunc(internalpb.RateScope_Collection, allOps)) - - err := quotaCenter.calculateReadRates() - assert.NoError(t, err) - - checkRate := func(rateNode *interalratelimitutil.RateLimiterNode, expectValue float64) { - searchRate, ok := rateNode.GetLimiters().Get(internalpb.RateType_DQLSearch) - assert.True(t, ok) - assert.EqualValues(t, expectValue, searchRate.Limit()) - } - - { - checkRate(quotaCenter.rateLimiter.GetRootLimiters(), float64(32)) // (20 + 20) * 0.8 - checkRate(quotaCenter.rateLimiter.GetDatabaseLimiters(1), float64(24)) // (20 + 10) * 0.8 - checkRate(quotaCenter.rateLimiter.GetDatabaseLimiters(2), float64(500)) // not cool off - checkRate(quotaCenter.rateLimiter.GetCollectionLimiters(1, 10), float64(24)) // (20 + 10) * 0.8 - checkRate(quotaCenter.rateLimiter.GetCollectionLimiters(2, 20), float64(500)) // not cool off - checkRate(quotaCenter.rateLimiter.GetCollectionLimiters(2, 30), float64(500)) // not cool off - } - }) -} - func TestResetAllCurrentRates(t *testing.T) { paramtable.Init() ctx := context.Background() diff --git a/pkg/util/metricsinfo/quota_metric.go b/pkg/util/metricsinfo/quota_metric.go index 290da1d473f62..108a5e50a00a1 100644 --- a/pkg/util/metricsinfo/quota_metric.go +++ b/pkg/util/metricsinfo/quota_metric.go @@ -17,8 +17,6 @@ package metricsinfo import ( - "time" - "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -26,18 +24,11 @@ import ( type RateMetricLabel = string const ( - NQPerSecond RateMetricLabel = "NQPerSecond" - SearchThroughput RateMetricLabel = "SearchThroughput" ReadResultThroughput RateMetricLabel = "ReadResultThroughput" InsertConsumeThroughput RateMetricLabel = "InsertConsumeThroughput" DeleteConsumeThroughput RateMetricLabel = "DeleteConsumeThroughput" ) -const ( - SearchQueueMetric string = "SearchQueue" - QueryQueueMetric string = "QueryQueue" -) - const ( UnsolvedQueueType string = "Unsolved" ReadyQueueType string = "Ready" @@ -58,15 +49,6 @@ type FlowGraphMetric struct { NumFlowGraph int } -// ReadInfoInQueue contains NQ num or task num in QueryNode's task queue. -type ReadInfoInQueue struct { - UnsolvedQueue int64 - ReadyQueue int64 - ReceiveChan int64 - ExecuteChan int64 - AvgQueueDuration time.Duration -} - // NodeEffect contains the a node and its effected collection info. type NodeEffect struct { NodeID int64 @@ -78,8 +60,6 @@ type QueryNodeQuotaMetrics struct { Hms HardwareMetrics Rms []RateMetric Fgm FlowGraphMetric - SearchQueue ReadInfoInQueue - QueryQueue ReadInfoInQueue GrowingSegmentsSize int64 Effect NodeEffect } diff --git a/pkg/util/paramtable/quota_param.go b/pkg/util/paramtable/quota_param.go index 74e6b8d4a1d9b..73516fbb13ca3 100644 --- a/pkg/util/paramtable/quota_param.go +++ b/pkg/util/paramtable/quota_param.go @@ -19,7 +19,6 @@ package paramtable import ( "fmt" "math" - "strconv" "go.uber.org/zap" @@ -157,15 +156,7 @@ type quotaConfig struct { L0SegmentRowCountHighWaterLevel ParamItem `refreshable:"true"` // limit reading - ForceDenyReading ParamItem `refreshable:"true"` - QueueProtectionEnabled ParamItem `refreshable:"true"` - NQInQueueThreshold ParamItem `refreshable:"true"` - QueueLatencyThreshold ParamItem `refreshable:"true"` - ResultProtectionEnabled ParamItem `refreshable:"true"` - MaxReadResultRate ParamItem `refreshable:"true"` - MaxReadResultRatePerDB ParamItem `refreshable:"true"` - MaxReadResultRatePerCollection ParamItem `refreshable:"true"` - CoolOffSpeed ParamItem `refreshable:"true"` + ForceDenyReading ParamItem `refreshable:"true"` } func (p *quotaConfig) init(base *BaseTable) { @@ -1926,159 +1917,6 @@ specific conditions, such as collection has been dropped), ` + "true" + ` means } p.ForceDenyReading.Init(base.mgr) - p.QueueProtectionEnabled = ParamItem{ - Key: "quotaAndLimits.limitReading.queueProtection.enabled", - Version: "2.2.0", - DefaultValue: "false", - Export: true, - } - p.QueueProtectionEnabled.Init(base.mgr) - - p.NQInQueueThreshold = ParamItem{ - Key: "quotaAndLimits.limitReading.queueProtection.nqInQueueThreshold", - Version: "2.2.0", - DefaultValue: strconv.FormatInt(math.MaxInt64, 10), - Formatter: func(v string) string { - if !p.QueueProtectionEnabled.GetAsBool() { - return strconv.FormatInt(math.MaxInt64, 10) - } - threshold := getAsFloat(v) - // [0, inf) - if threshold < 0 { - return strconv.FormatInt(math.MaxInt64, 10) - } - return v - }, - Doc: `nqInQueueThreshold indicated that the system was under backpressure for Search/Query path. -If NQ in any QueryNode's queue is greater than nqInQueueThreshold, search&query rates would gradually cool off -until the NQ in queue no longer exceeds nqInQueueThreshold. We think of the NQ of query request as 1. -int, default no limit`, - Export: true, - } - p.NQInQueueThreshold.Init(base.mgr) - - p.QueueLatencyThreshold = ParamItem{ - Key: "quotaAndLimits.limitReading.queueProtection.queueLatencyThreshold", - Version: "2.2.0", - DefaultValue: max, - Formatter: func(v string) string { - if !p.QueueProtectionEnabled.GetAsBool() { - return max - } - level := getAsFloat(v) - // [0, inf) - if level < 0 { - return max - } - return v - }, - Doc: `queueLatencyThreshold indicated that the system was under backpressure for Search/Query path. -If dql latency of queuing is greater than queueLatencyThreshold, search&query rates would gradually cool off -until the latency of queuing no longer exceeds queueLatencyThreshold. -The latency here refers to the averaged latency over a period of time. -milliseconds, default no limit`, - Export: true, - } - p.QueueLatencyThreshold.Init(base.mgr) - - p.ResultProtectionEnabled = ParamItem{ - Key: "quotaAndLimits.limitReading.resultProtection.enabled", - Version: "2.2.0", - DefaultValue: "false", - Export: true, - } - p.ResultProtectionEnabled.Init(base.mgr) - - p.MaxReadResultRate = ParamItem{ - Key: "quotaAndLimits.limitReading.resultProtection.maxReadResultRate", - Version: "2.2.0", - DefaultValue: max, - Formatter: func(v string) string { - if !p.ResultProtectionEnabled.GetAsBool() { - return max - } - rate := getAsFloat(v) - if math.Abs(rate-defaultMax) > 0.001 { // maxRate != defaultMax - return fmt.Sprintf("%f", megaBytes2Bytes(rate)) - } - // [0, inf) - if rate < 0 { - return max - } - return v - }, - Doc: `maxReadResultRate indicated that the system was under backpressure for Search/Query path. -If dql result rate is greater than maxReadResultRate, search&query rates would gradually cool off -until the read result rate no longer exceeds maxReadResultRate. -MB/s, default no limit`, - Export: true, - } - p.MaxReadResultRate.Init(base.mgr) - - p.MaxReadResultRatePerDB = ParamItem{ - Key: "quotaAndLimits.limitReading.resultProtection.maxReadResultRatePerDB", - Version: "2.4.1", - DefaultValue: max, - Formatter: func(v string) string { - if !p.ResultProtectionEnabled.GetAsBool() { - return max - } - rate := getAsFloat(v) - if math.Abs(rate-defaultMax) > 0.001 { // maxRate != defaultMax - return fmt.Sprintf("%f", megaBytes2Bytes(rate)) - } - // [0, inf) - if rate < 0 { - return max - } - return v - }, - Export: true, - } - p.MaxReadResultRatePerDB.Init(base.mgr) - - p.MaxReadResultRatePerCollection = ParamItem{ - Key: "quotaAndLimits.limitReading.resultProtection.maxReadResultRatePerCollection", - Version: "2.4.1", - DefaultValue: max, - Formatter: func(v string) string { - if !p.ResultProtectionEnabled.GetAsBool() { - return max - } - rate := getAsFloat(v) - if math.Abs(rate-defaultMax) > 0.001 { // maxRate != defaultMax - return fmt.Sprintf("%f", megaBytes2Bytes(rate)) - } - // [0, inf) - if rate < 0 { - return max - } - return v - }, - Export: true, - } - p.MaxReadResultRatePerCollection.Init(base.mgr) - - const defaultSpeed = "0.9" - p.CoolOffSpeed = ParamItem{ - Key: "quotaAndLimits.limitReading.coolOffSpeed", - Version: "2.2.0", - DefaultValue: defaultSpeed, - Formatter: func(v string) string { - // (0, 1] - speed := getAsFloat(v) - if speed <= 0 || speed > 1 { - // log.Warn("CoolOffSpeed must in the range of `(0, 1]`, use default value", zap.Float64("speed", p.CoolOffSpeed), zap.Float64("default", defaultSpeed)) - return defaultSpeed - } - return v - }, - Doc: `colOffSpeed is the speed of search&query rates cool off. -(0, 1]`, - Export: true, - } - p.CoolOffSpeed.Init(base.mgr) - p.AllocRetryTimes = ParamItem{ Key: "quotaAndLimits.limits.allocRetryTimes", Version: "2.4.0", diff --git a/pkg/util/paramtable/quota_param_test.go b/pkg/util/paramtable/quota_param_test.go index 2d7d747c31b9b..fb1b868aacf8b 100644 --- a/pkg/util/paramtable/quota_param_test.go +++ b/pkg/util/paramtable/quota_param_test.go @@ -17,7 +17,6 @@ package paramtable import ( - "math" "testing" "github.com/stretchr/testify/assert" @@ -206,12 +205,6 @@ func TestQuotaParam(t *testing.T) { t.Run("test limit reading", func(t *testing.T) { assert.False(t, qc.ForceDenyReading.GetAsBool()) - assert.Equal(t, false, qc.QueueProtectionEnabled.GetAsBool()) - assert.Equal(t, int64(math.MaxInt64), qc.NQInQueueThreshold.GetAsInt64()) - assert.Equal(t, defaultMax, qc.QueueLatencyThreshold.GetAsFloat()) - assert.Equal(t, false, qc.ResultProtectionEnabled.GetAsBool()) - assert.Equal(t, defaultMax, qc.MaxReadResultRate.GetAsFloat()) - assert.Equal(t, 0.9, qc.CoolOffSpeed.GetAsFloat()) }) t.Run("test disk quota", func(t *testing.T) {