From 1d87ce06dd3eb456bb66284c82f140ee149d476d Mon Sep 17 00:00:00 2001 From: yangxuan Date: Mon, 6 Jan 2025 15:09:57 +0800 Subject: [PATCH] enhance: Add configs for compaction schedule Signed-off-by: yangxuan --- configs/milvus.yaml | 1 + internal/datacoord/compaction.go | 5 ++-- pkg/util/paramtable/component_param.go | 36 +++++++++++++++++++++++++- 3 files changed, 39 insertions(+), 3 deletions(-) diff --git a/configs/milvus.yaml b/configs/milvus.yaml index 4dc1cca0dcfc1..8c1d05a7319e3 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -570,6 +570,7 @@ dataCoord: maxParallelTaskNum: -1 # Deprecated, see datanode.slot.slotCap dropTolerance: 86400 # Compaction task will be cleaned after finish longer than this time(in seconds) gcInterval: 1800 # The time interval in seconds for compaction gc + scheduleInterval: 500 # The time interval in milliseconds for scheduling compaction tasks. If the configuration setting is below 100ms, it will be ajusted upwards to 100ms mix: triggerInterval: 60 # The time interval in seconds to trigger mix compaction levelzero: diff --git a/internal/datacoord/compaction.go b/internal/datacoord/compaction.go index 4807ae0fab1d5..98f2df1f6c2e7 100644 --- a/internal/datacoord/compaction.go +++ b/internal/datacoord/compaction.go @@ -440,10 +440,11 @@ func (c *compactionPlanHandler) loadMeta() { } func (c *compactionPlanHandler) loopSchedule() { - log.Info("compactionPlanHandler start loop schedule") + interval := paramtable.Get().DataCoordCfg.CompactionScheduleInterval.GetAsDuration(time.Millisecond) + log.Info("compactionPlanHandler start loop schedule", zap.Duration("schedule interval", interval)) defer c.stopWg.Done() - scheduleTicker := time.NewTicker(3 * time.Second) + scheduleTicker := time.NewTicker(interval) defer scheduleTicker.Stop() for { select { diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index 7d9c329b49e59..c60d265b3dde8 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -3350,7 +3350,9 @@ type dataCoordConfig struct { CompactionTimeoutInSeconds ParamItem `refreshable:"true"` CompactionDropToleranceInSeconds ParamItem `refreshable:"true"` CompactionGCIntervalInSeconds ParamItem `refreshable:"true"` - CompactionCheckIntervalInSeconds ParamItem `refreshable:"false"` + CompactionCheckIntervalInSeconds ParamItem `refreshable:"false"` // deprecated + CompactionScheduleInterval ParamItem `refreshable:"false"` + CompactionCheckInterval ParamItem `refreshable:"false"` MixCompactionTriggerInterval ParamItem `refreshable:"false"` L0CompactionTriggerInterval ParamItem `refreshable:"false"` GlobalCompactionInterval ParamItem `refreshable:"false"` @@ -3735,6 +3737,38 @@ During compaction, the size of segment # of rows is able to exceed segment max # } p.CompactionCheckIntervalInSeconds.Init(base.mgr) + p.CompactionCheckInterval = ParamItem{ + Key: "dataCoord.compaction.checkInterval", + Version: "2.4.21", + DefaultValue: "500", + Export: true, + Formatter: func(value string) string { + ms := getAsInt64(value) + if ms < 100 { + ms = 100 + } + return strconv.FormatInt(ms, 10) + }, + Doc: "The time interval in milliseconds for checking compaction tasks. If the configuration setting is below 100ms, it will be ajusted upwards to 100ms", + } + p.CompactionCheckInterval.Init(base.mgr) + + p.CompactionScheduleInterval = ParamItem{ + Key: "dataCoord.compaction.scheduleInterval", + Version: "2.4.21", + DefaultValue: "500", + Export: true, + Formatter: func(value string) string { + ms := getAsInt64(value) + if ms < 100 { + ms = 100 + } + return strconv.FormatInt(ms, 10) + }, + Doc: "The time interval in milliseconds for scheduling compaction tasks. If the configuration setting is below 100ms, it will be ajusted upwards to 100ms", + } + p.CompactionScheduleInterval.Init(base.mgr) + p.SingleCompactionRatioThreshold = ParamItem{ Key: "dataCoord.compaction.single.ratio.threshold", Version: "2.0.0",