diff --git a/configs/milvus.yaml b/configs/milvus.yaml index 7023f7f04fc25..af448f080229b 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -572,6 +572,7 @@ dataCoord: maxParallelTaskNum: -1 # Deprecated, see datanode.slot.slotCap dropTolerance: 86400 # Compaction task will be cleaned after finish longer than this time(in seconds) gcInterval: 1800 # The time interval in seconds for compaction gc + scheduleInterval: 500 # The time interval in milliseconds for scheduling compaction tasks. If the configuration setting is below 100ms, it will be ajusted upwards to 100ms mix: triggerInterval: 60 # The time interval in seconds to trigger mix compaction levelzero: diff --git a/internal/datacoord/compaction.go b/internal/datacoord/compaction.go index 4807ae0fab1d5..98f2df1f6c2e7 100644 --- a/internal/datacoord/compaction.go +++ b/internal/datacoord/compaction.go @@ -440,10 +440,11 @@ func (c *compactionPlanHandler) loadMeta() { } func (c *compactionPlanHandler) loopSchedule() { - log.Info("compactionPlanHandler start loop schedule") + interval := paramtable.Get().DataCoordCfg.CompactionScheduleInterval.GetAsDuration(time.Millisecond) + log.Info("compactionPlanHandler start loop schedule", zap.Duration("schedule interval", interval)) defer c.stopWg.Done() - scheduleTicker := time.NewTicker(3 * time.Second) + scheduleTicker := time.NewTicker(interval) defer scheduleTicker.Stop() for { select { diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index 712668a9ca6c8..1c3e7b5f56971 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -3360,7 +3360,8 @@ type dataCoordConfig struct { CompactionTimeoutInSeconds ParamItem `refreshable:"true"` CompactionDropToleranceInSeconds ParamItem `refreshable:"true"` CompactionGCIntervalInSeconds ParamItem `refreshable:"true"` - CompactionCheckIntervalInSeconds ParamItem `refreshable:"false"` + CompactionCheckIntervalInSeconds ParamItem `refreshable:"false"` // deprecated + CompactionScheduleInterval ParamItem `refreshable:"false"` MixCompactionTriggerInterval ParamItem `refreshable:"false"` L0CompactionTriggerInterval ParamItem `refreshable:"false"` GlobalCompactionInterval ParamItem `refreshable:"false"` @@ -3736,6 +3737,22 @@ During compaction, the size of segment # of rows is able to exceed segment max # } p.CompactionCheckIntervalInSeconds.Init(base.mgr) + p.CompactionScheduleInterval = ParamItem{ + Key: "dataCoord.compaction.scheduleInterval", + Version: "2.4.21", + DefaultValue: "500", + Export: true, + Formatter: func(value string) string { + ms := getAsInt64(value) + if ms < 100 { + ms = 100 + } + return strconv.FormatInt(ms, 10) + }, + Doc: "The time interval in milliseconds for scheduling compaction tasks. If the configuration setting is below 100ms, it will be ajusted upwards to 100ms", + } + p.CompactionScheduleInterval.Init(base.mgr) + p.SingleCompactionRatioThreshold = ParamItem{ Key: "dataCoord.compaction.single.ratio.threshold", Version: "2.0.0",