diff --git a/internal/querynodev2/segments/disk_usage_fetcher.go b/internal/querynodev2/segments/disk_usage_fetcher.go new file mode 100644 index 0000000000000..ca4aeecaee250 --- /dev/null +++ b/internal/querynodev2/segments/disk_usage_fetcher.go @@ -0,0 +1,81 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package segments + +import ( + "context" + "fmt" + "time" + + "go.uber.org/atomic" + "go.uber.org/zap" + + "github.com/milvus-io/milvus/internal/util/segcore" + "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/metrics" + "github.com/milvus-io/milvus/pkg/util/paramtable" +) + +type diskUsageFetcher struct { + ctx context.Context + path string + usage *atomic.Int64 + err *atomic.Error +} + +func NewDiskUsageFetcher(ctx context.Context) *diskUsageFetcher { + return &diskUsageFetcher{ + ctx: ctx, + path: paramtable.Get().LocalStorageCfg.Path.GetValue(), + usage: atomic.NewInt64(0), + err: atomic.NewError(nil), + } +} + +func (d *diskUsageFetcher) GetDiskUsage() (int64, error) { + return d.usage.Load(), d.err.Load() +} + +func (d *diskUsageFetcher) fetch() { + diskUsage, err := segcore.GetLocalUsedSize(d.path) + if err != nil { + log.Warn("failed to get disk usage", zap.Error(err)) + d.err.Store(err) + return + } + d.usage.Store(diskUsage) + d.err.Store(nil) + metrics.QueryNodeDiskUsedSize.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Set(float64(diskUsage) / 1024 / 1024) // in MB + log.Ctx(d.ctx).WithRateGroup("diskUsageFetcher", 1, 300). + RatedInfo(300, "querynode disk usage", zap.Int64("size", diskUsage), zap.Int64("nodeID", paramtable.GetNodeID())) +} + +func (d *diskUsageFetcher) Start() { + d.fetch() // Synchronously fetch once before starting. + + interval := paramtable.Get().QueryNodeCfg.DiskSizeFetchInterval.GetAsDuration(time.Second) + ticker := time.NewTicker(interval) + defer ticker.Stop() + for { + select { + case <-d.ctx.Done(): + return + case <-ticker.C: + d.fetch() + } + } +} diff --git a/internal/querynodev2/segments/segment.go b/internal/querynodev2/segments/segment.go index f1b712cef2866..d8239077b04cb 100644 --- a/internal/querynodev2/segments/segment.go +++ b/internal/querynodev2/segments/segment.go @@ -1274,11 +1274,6 @@ func (s *LocalSegment) Release(ctx context.Context, opts ...releaseOption) { GetDynamicPool().Submit(func() (any, error) { C.DeleteSegment(ptr) - localDiskUsage, err := segcore.GetLocalUsedSize(context.Background(), paramtable.Get().LocalStorageCfg.Path.GetValue()) - // ignore error here, shall not block releasing - if err == nil { - metrics.QueryNodeDiskUsedSize.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Set(float64(localDiskUsage) / 1024 / 1024) // in MB - } return nil, nil }).Await() diff --git a/internal/querynodev2/segments/segment_loader.go b/internal/querynodev2/segments/segment_loader.go index 421c9cfc02dd8..94d7f3535e11a 100644 --- a/internal/querynodev2/segments/segment_loader.go +++ b/internal/querynodev2/segments/segment_loader.go @@ -44,7 +44,6 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/querynodev2/pkoracle" "github.com/milvus-io/milvus/internal/storage" - "github.com/milvus-io/milvus/internal/util/segcore" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/metrics" @@ -147,6 +146,7 @@ type resourceEstimateFactor struct { } func NewLoader( + ctx context.Context, manager *Manager, cm storage.ChunkManager, ) *segmentLoader { @@ -166,12 +166,15 @@ func NewLoader( } log.Info("SegmentLoader created", zap.Int("ioPoolSize", ioPoolSize)) + duf := NewDiskUsageFetcher(ctx) + go duf.Start() loader := &segmentLoader{ manager: manager, cm: cm, loadingSegments: typeutil.NewConcurrentMap[int64, *loadResult](), committedResourceNotifier: syncutil.NewVersionedNotifier(), + duf: duf, } return loader @@ -207,11 +210,14 @@ type segmentLoader struct { manager *Manager cm storage.ChunkManager - mut sync.Mutex // The channel will be closed as the segment loaded - loadingSegments *typeutil.ConcurrentMap[int64, *loadResult] + loadingSegments *typeutil.ConcurrentMap[int64, *loadResult] + + mut sync.Mutex // guards committedResource committedResource LoadResource committedResourceNotifier *syncutil.VersionedNotifier + + duf *diskUsageFetcher } var _ Loader = (*segmentLoader)(nil) @@ -383,8 +389,6 @@ func (loader *segmentLoader) prepare(ctx context.Context, segmentType SegmentTyp log := log.Ctx(ctx).With( zap.Stringer("segmentType", segmentType), ) - loader.mut.Lock() - defer loader.mut.Unlock() // filter out loaded & loading segments infos := make([]*querypb.SegmentLoadInfo, 0, len(segments)) @@ -407,8 +411,6 @@ func (loader *segmentLoader) prepare(ctx context.Context, segmentType SegmentTyp } func (loader *segmentLoader) unregister(segments ...*querypb.SegmentLoadInfo) { - loader.mut.Lock() - defer loader.mut.Unlock() for i := range segments { result, ok := loader.loadingSegments.GetAndRemove(segments[i].GetSegmentID()) if ok { @@ -443,22 +445,22 @@ func (loader *segmentLoader) requestResource(ctx context.Context, infos ...*quer zap.Int64s("segmentIDs", segmentIDs), ) - loader.mut.Lock() - defer loader.mut.Unlock() - - result := requestResourceResult{ - CommittedResource: loader.committedResource, - } - memoryUsage := hardware.GetUsedMemoryCount() totalMemory := hardware.GetMemoryCount() - diskUsage, err := segcore.GetLocalUsedSize(ctx, paramtable.Get().LocalStorageCfg.Path.GetValue()) + diskUsage, err := loader.duf.GetDiskUsage() if err != nil { - return result, errors.Wrap(err, "get local used size failed") + return requestResourceResult{}, errors.Wrap(err, "get local used size failed") } diskCap := paramtable.Get().QueryNodeCfg.DiskCapacityLimit.GetAsUint64() + loader.mut.Lock() + defer loader.mut.Unlock() + + result := requestResourceResult{ + CommittedResource: loader.committedResource, + } + if loader.committedResource.MemorySize+memoryUsage >= totalMemory { return result, merr.WrapErrServiceMemoryLimitExceeded(float32(loader.committedResource.MemorySize+memoryUsage), float32(totalMemory)) } else if loader.committedResource.DiskSize+uint64(diskUsage) >= diskCap { @@ -466,7 +468,7 @@ func (loader *segmentLoader) requestResource(ctx context.Context, infos ...*quer } result.ConcurrencyLevel = funcutil.Min(hardware.GetCPUNum(), len(infos)) - mu, du, err := loader.checkSegmentSize(ctx, infos) + mu, du, err := loader.checkSegmentSize(ctx, infos, memoryUsage, totalMemory, diskUsage) if err != nil { log.Warn("no sufficient resource to load segments", zap.Error(err)) return result, err @@ -1347,7 +1349,7 @@ func JoinIDPath(ids ...int64) string { // checkSegmentSize checks whether the memory & disk is sufficient to load the segments // returns the memory & disk usage while loading if possible to load, // otherwise, returns error -func (loader *segmentLoader) checkSegmentSize(ctx context.Context, segmentLoadInfos []*querypb.SegmentLoadInfo) (uint64, uint64, error) { +func (loader *segmentLoader) checkSegmentSize(ctx context.Context, segmentLoadInfos []*querypb.SegmentLoadInfo, memUsage, totalMem uint64, localDiskUsage int64) (uint64, uint64, error) { if len(segmentLoadInfos) == 0 { return 0, 0, nil } @@ -1360,18 +1362,11 @@ func (loader *segmentLoader) checkSegmentSize(ctx context.Context, segmentLoadIn return float64(mem) / 1024 / 1024 } - memUsage := hardware.GetUsedMemoryCount() + loader.committedResource.MemorySize - totalMem := hardware.GetMemoryCount() + memUsage = memUsage + loader.committedResource.MemorySize if memUsage == 0 || totalMem == 0 { return 0, 0, errors.New("get memory failed when checkSegmentSize") } - localDiskUsage, err := segcore.GetLocalUsedSize(ctx, paramtable.Get().LocalStorageCfg.Path.GetValue()) - if err != nil { - return 0, 0, errors.Wrap(err, "get local used size failed") - } - - metrics.QueryNodeDiskUsedSize.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Set(toMB(uint64(localDiskUsage))) diskUsage := uint64(localDiskUsage) + loader.committedResource.DiskSize factor := resourceEstimateFactor{ diff --git a/internal/querynodev2/segments/segment_loader_test.go b/internal/querynodev2/segments/segment_loader_test.go index 57df025e28ae7..b3b0db5df0acd 100644 --- a/internal/querynodev2/segments/segment_loader_test.go +++ b/internal/querynodev2/segments/segment_loader_test.go @@ -81,7 +81,7 @@ func (suite *SegmentLoaderSuite) SetupTest() { // Dependencies suite.manager = NewManager() - suite.loader = NewLoader(suite.manager, suite.chunkManager) + suite.loader = NewLoader(context.Background(), suite.manager, suite.chunkManager) initcore.InitRemoteChunkManager(paramtable.Get()) // Data @@ -98,7 +98,7 @@ func (suite *SegmentLoaderSuite) SetupTest() { func (suite *SegmentLoaderSuite) SetupBM25() { // Dependencies suite.manager = NewManager() - suite.loader = NewLoader(suite.manager, suite.chunkManager) + suite.loader = NewLoader(context.Background(), suite.manager, suite.chunkManager) initcore.InitRemoteChunkManager(paramtable.Get()) suite.schema = mock_segcore.GenTestBM25CollectionSchema("test") @@ -798,7 +798,7 @@ func (suite *SegmentLoaderDetailSuite) SetupTest() { ctx := context.Background() chunkManagerFactory := storage.NewTestChunkManagerFactory(paramtable.Get(), suite.rootPath) suite.chunkManager, _ = chunkManagerFactory.NewPersistentStorageChunkManager(ctx) - suite.loader = NewLoader(suite.manager, suite.chunkManager) + suite.loader = NewLoader(context.Background(), suite.manager, suite.chunkManager) initcore.InitRemoteChunkManager(paramtable.Get()) // Data diff --git a/internal/querynodev2/server.go b/internal/querynodev2/server.go index 66c4554d7b7ff..8b34406eb276f 100644 --- a/internal/querynodev2/server.go +++ b/internal/querynodev2/server.go @@ -324,7 +324,7 @@ func (node *QueryNode) Init() error { node.factory.Init(paramtable.Get()) localRootPath := paramtable.Get().LocalStorageCfg.Path.GetValue() - localUsedSize, err := segcore.GetLocalUsedSize(node.ctx, localRootPath) + localUsedSize, err := segcore.GetLocalUsedSize(localRootPath) if err != nil { log.Warn("get local used size failed", zap.Error(err)) initError = err @@ -371,7 +371,7 @@ func (node *QueryNode) Init() error { node.subscribingChannels = typeutil.NewConcurrentSet[string]() node.unsubscribingChannels = typeutil.NewConcurrentSet[string]() node.manager = segments.NewManager() - node.loader = segments.NewLoader(node.manager, node.chunkManager) + node.loader = segments.NewLoader(node.ctx, node.manager, node.chunkManager) node.manager.SetLoader(node.loader) node.dispClient = msgdispatcher.NewClient(node.factory, typeutil.QueryNodeRole, node.GetNodeID()) // init pipeline manager diff --git a/internal/util/segcore/cgo_util.go b/internal/util/segcore/cgo_util.go index c7cff7d847f98..9a4298b38e058 100644 --- a/internal/util/segcore/cgo_util.go +++ b/internal/util/segcore/cgo_util.go @@ -28,7 +28,6 @@ package segcore import "C" import ( - "context" "math" "reflect" "unsafe" @@ -73,7 +72,7 @@ func getCProtoBlob(cProto *C.CProto) []byte { } // GetLocalUsedSize returns the used size of the local path -func GetLocalUsedSize(ctx context.Context, path string) (int64, error) { +func GetLocalUsedSize(path string) (int64, error) { var availableSize int64 cSize := (*C.int64_t)(&availableSize) cPath := C.CString(path) diff --git a/internal/util/segcore/cgo_util_test.go b/internal/util/segcore/cgo_util_test.go index e2b8e997ab122..e5486aa4cae6a 100644 --- a/internal/util/segcore/cgo_util_test.go +++ b/internal/util/segcore/cgo_util_test.go @@ -1,7 +1,6 @@ package segcore import ( - "context" "runtime" "testing" @@ -17,7 +16,7 @@ func TestConsumeCStatusIntoError(t *testing.T) { } func TestGetLocalUsedSize(t *testing.T) { - size, err := GetLocalUsedSize(context.Background(), "") + size, err := GetLocalUsedSize("") assert.NoError(t, err) assert.NotNil(t, size) } diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index 71f26cd5cc365..17023a0b4a1b8 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -2518,6 +2518,7 @@ type queryNodeConfig struct { // loader IoPoolSize ParamItem `refreshable:"false"` DeltaDataExpansionRate ParamItem `refreshable:"true"` + DiskSizeFetchInterval ParamItem `refreshable:"false"` // schedule task policy. SchedulePolicyName ParamItem `refreshable:"false"` @@ -3134,6 +3135,14 @@ Max read concurrency must greater than or equal to 1, and less than or equal to } p.DeltaDataExpansionRate.Init(base.mgr) + p.DiskSizeFetchInterval = ParamItem{ + Key: "querynode.diskSizeFetchInterval", + Version: "2.5.0", + DefaultValue: "60", + Doc: "The time interval in seconds for retrieving disk usage.", + } + p.DiskSizeFetchInterval.Init(base.mgr) + // schedule read task policy. p.SchedulePolicyName = ParamItem{ Key: "queryNode.scheduler.scheduleReadPolicy.name", diff --git a/pkg/util/paramtable/component_param_test.go b/pkg/util/paramtable/component_param_test.go index 9b95f3384ae4a..aaa988c35beed 100644 --- a/pkg/util/paramtable/component_param_test.go +++ b/pkg/util/paramtable/component_param_test.go @@ -483,6 +483,7 @@ func TestComponentParam(t *testing.T) { assert.Equal(t, "/var/lib/milvus/data/mmap", Params.MmapDirPath.GetValue()) assert.Equal(t, true, Params.MmapChunkCache.GetAsBool()) + assert.Equal(t, 60*time.Second, Params.DiskSizeFetchInterval.GetAsDuration(time.Second)) }) t.Run("test dataCoordConfig", func(t *testing.T) {