Skip to content

Commit

Permalink
enhance: [2.5] Optimize GetLocalDiskSize and segment loader mutex (#3…
Browse files Browse the repository at this point in the history
…8907)

1. Make the segment loader lock protect only the resource.
2. Optimize GetDiskUsage to avoid excessive overhead.

issue: #37630

pr: #38599

Signed-off-by: bigsheeper <[email protected]>
  • Loading branch information
bigsheeper authored Jan 14, 2025
1 parent 5199415 commit b91c0a8
Show file tree
Hide file tree
Showing 9 changed files with 119 additions and 40 deletions.
81 changes: 81 additions & 0 deletions internal/querynodev2/segments/disk_usage_fetcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package segments

import (
"context"
"fmt"
"time"

"go.uber.org/atomic"
"go.uber.org/zap"

"github.com/milvus-io/milvus/internal/util/segcore"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/paramtable"
)

type diskUsageFetcher struct {
ctx context.Context
path string
usage *atomic.Int64
err *atomic.Error
}

func NewDiskUsageFetcher(ctx context.Context) *diskUsageFetcher {
return &diskUsageFetcher{
ctx: ctx,
path: paramtable.Get().LocalStorageCfg.Path.GetValue(),
usage: atomic.NewInt64(0),
err: atomic.NewError(nil),
}
}

func (d *diskUsageFetcher) GetDiskUsage() (int64, error) {
return d.usage.Load(), d.err.Load()
}

func (d *diskUsageFetcher) fetch() {
diskUsage, err := segcore.GetLocalUsedSize(d.path)
if err != nil {
log.Warn("failed to get disk usage", zap.Error(err))
d.err.Store(err)
return
}
d.usage.Store(diskUsage)
d.err.Store(nil)
metrics.QueryNodeDiskUsedSize.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Set(float64(diskUsage) / 1024 / 1024) // in MB
log.Ctx(d.ctx).WithRateGroup("diskUsageFetcher", 1, 300).
RatedInfo(300, "querynode disk usage", zap.Int64("size", diskUsage), zap.Int64("nodeID", paramtable.GetNodeID()))
}

func (d *diskUsageFetcher) Start() {
d.fetch() // Synchronously fetch once before starting.

interval := paramtable.Get().QueryNodeCfg.DiskSizeFetchInterval.GetAsDuration(time.Second)
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-d.ctx.Done():
return
case <-ticker.C:
d.fetch()
}
}
}
5 changes: 0 additions & 5 deletions internal/querynodev2/segments/segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -1274,11 +1274,6 @@ func (s *LocalSegment) Release(ctx context.Context, opts ...releaseOption) {

GetDynamicPool().Submit(func() (any, error) {
C.DeleteSegment(ptr)
localDiskUsage, err := segcore.GetLocalUsedSize(context.Background(), paramtable.Get().LocalStorageCfg.Path.GetValue())
// ignore error here, shall not block releasing
if err == nil {
metrics.QueryNodeDiskUsedSize.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Set(float64(localDiskUsage) / 1024 / 1024) // in MB
}
return nil, nil
}).Await()

Expand Down
47 changes: 21 additions & 26 deletions internal/querynodev2/segments/segment_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/querynodev2/pkoracle"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/util/segcore"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
Expand Down Expand Up @@ -147,6 +146,7 @@ type resourceEstimateFactor struct {
}

func NewLoader(
ctx context.Context,
manager *Manager,
cm storage.ChunkManager,
) *segmentLoader {
Expand All @@ -166,12 +166,15 @@ func NewLoader(
}

log.Info("SegmentLoader created", zap.Int("ioPoolSize", ioPoolSize))
duf := NewDiskUsageFetcher(ctx)
go duf.Start()

loader := &segmentLoader{
manager: manager,
cm: cm,
loadingSegments: typeutil.NewConcurrentMap[int64, *loadResult](),
committedResourceNotifier: syncutil.NewVersionedNotifier(),
duf: duf,
}

return loader
Expand Down Expand Up @@ -207,11 +210,14 @@ type segmentLoader struct {
manager *Manager
cm storage.ChunkManager

mut sync.Mutex
// The channel will be closed as the segment loaded
loadingSegments *typeutil.ConcurrentMap[int64, *loadResult]
loadingSegments *typeutil.ConcurrentMap[int64, *loadResult]

mut sync.Mutex // guards committedResource
committedResource LoadResource
committedResourceNotifier *syncutil.VersionedNotifier

duf *diskUsageFetcher
}

var _ Loader = (*segmentLoader)(nil)
Expand Down Expand Up @@ -383,8 +389,6 @@ func (loader *segmentLoader) prepare(ctx context.Context, segmentType SegmentTyp
log := log.Ctx(ctx).With(
zap.Stringer("segmentType", segmentType),
)
loader.mut.Lock()
defer loader.mut.Unlock()

// filter out loaded & loading segments
infos := make([]*querypb.SegmentLoadInfo, 0, len(segments))
Expand All @@ -407,8 +411,6 @@ func (loader *segmentLoader) prepare(ctx context.Context, segmentType SegmentTyp
}

func (loader *segmentLoader) unregister(segments ...*querypb.SegmentLoadInfo) {
loader.mut.Lock()
defer loader.mut.Unlock()
for i := range segments {
result, ok := loader.loadingSegments.GetAndRemove(segments[i].GetSegmentID())
if ok {
Expand Down Expand Up @@ -443,30 +445,30 @@ func (loader *segmentLoader) requestResource(ctx context.Context, infos ...*quer
zap.Int64s("segmentIDs", segmentIDs),
)

loader.mut.Lock()
defer loader.mut.Unlock()

result := requestResourceResult{
CommittedResource: loader.committedResource,
}

memoryUsage := hardware.GetUsedMemoryCount()
totalMemory := hardware.GetMemoryCount()

diskUsage, err := segcore.GetLocalUsedSize(ctx, paramtable.Get().LocalStorageCfg.Path.GetValue())
diskUsage, err := loader.duf.GetDiskUsage()
if err != nil {
return result, errors.Wrap(err, "get local used size failed")
return requestResourceResult{}, errors.Wrap(err, "get local used size failed")
}
diskCap := paramtable.Get().QueryNodeCfg.DiskCapacityLimit.GetAsUint64()

loader.mut.Lock()
defer loader.mut.Unlock()

result := requestResourceResult{
CommittedResource: loader.committedResource,
}

if loader.committedResource.MemorySize+memoryUsage >= totalMemory {
return result, merr.WrapErrServiceMemoryLimitExceeded(float32(loader.committedResource.MemorySize+memoryUsage), float32(totalMemory))
} else if loader.committedResource.DiskSize+uint64(diskUsage) >= diskCap {
return result, merr.WrapErrServiceDiskLimitExceeded(float32(loader.committedResource.DiskSize+uint64(diskUsage)), float32(diskCap))
}

result.ConcurrencyLevel = funcutil.Min(hardware.GetCPUNum(), len(infos))
mu, du, err := loader.checkSegmentSize(ctx, infos)
mu, du, err := loader.checkSegmentSize(ctx, infos, memoryUsage, totalMemory, diskUsage)
if err != nil {
log.Warn("no sufficient resource to load segments", zap.Error(err))
return result, err
Expand Down Expand Up @@ -1347,7 +1349,7 @@ func JoinIDPath(ids ...int64) string {
// checkSegmentSize checks whether the memory & disk is sufficient to load the segments
// returns the memory & disk usage while loading if possible to load,
// otherwise, returns error
func (loader *segmentLoader) checkSegmentSize(ctx context.Context, segmentLoadInfos []*querypb.SegmentLoadInfo) (uint64, uint64, error) {
func (loader *segmentLoader) checkSegmentSize(ctx context.Context, segmentLoadInfos []*querypb.SegmentLoadInfo, memUsage, totalMem uint64, localDiskUsage int64) (uint64, uint64, error) {
if len(segmentLoadInfos) == 0 {
return 0, 0, nil
}
Expand All @@ -1360,18 +1362,11 @@ func (loader *segmentLoader) checkSegmentSize(ctx context.Context, segmentLoadIn
return float64(mem) / 1024 / 1024
}

memUsage := hardware.GetUsedMemoryCount() + loader.committedResource.MemorySize
totalMem := hardware.GetMemoryCount()
memUsage = memUsage + loader.committedResource.MemorySize
if memUsage == 0 || totalMem == 0 {
return 0, 0, errors.New("get memory failed when checkSegmentSize")
}

localDiskUsage, err := segcore.GetLocalUsedSize(ctx, paramtable.Get().LocalStorageCfg.Path.GetValue())
if err != nil {
return 0, 0, errors.Wrap(err, "get local used size failed")
}

metrics.QueryNodeDiskUsedSize.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Set(toMB(uint64(localDiskUsage)))
diskUsage := uint64(localDiskUsage) + loader.committedResource.DiskSize

factor := resourceEstimateFactor{
Expand Down
6 changes: 3 additions & 3 deletions internal/querynodev2/segments/segment_loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func (suite *SegmentLoaderSuite) SetupTest() {

// Dependencies
suite.manager = NewManager()
suite.loader = NewLoader(suite.manager, suite.chunkManager)
suite.loader = NewLoader(context.Background(), suite.manager, suite.chunkManager)
initcore.InitRemoteChunkManager(paramtable.Get())

// Data
Expand All @@ -98,7 +98,7 @@ func (suite *SegmentLoaderSuite) SetupTest() {
func (suite *SegmentLoaderSuite) SetupBM25() {
// Dependencies
suite.manager = NewManager()
suite.loader = NewLoader(suite.manager, suite.chunkManager)
suite.loader = NewLoader(context.Background(), suite.manager, suite.chunkManager)
initcore.InitRemoteChunkManager(paramtable.Get())

suite.schema = mock_segcore.GenTestBM25CollectionSchema("test")
Expand Down Expand Up @@ -798,7 +798,7 @@ func (suite *SegmentLoaderDetailSuite) SetupTest() {
ctx := context.Background()
chunkManagerFactory := storage.NewTestChunkManagerFactory(paramtable.Get(), suite.rootPath)
suite.chunkManager, _ = chunkManagerFactory.NewPersistentStorageChunkManager(ctx)
suite.loader = NewLoader(suite.manager, suite.chunkManager)
suite.loader = NewLoader(context.Background(), suite.manager, suite.chunkManager)
initcore.InitRemoteChunkManager(paramtable.Get())

// Data
Expand Down
4 changes: 2 additions & 2 deletions internal/querynodev2/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ func (node *QueryNode) Init() error {
node.factory.Init(paramtable.Get())

localRootPath := paramtable.Get().LocalStorageCfg.Path.GetValue()
localUsedSize, err := segcore.GetLocalUsedSize(node.ctx, localRootPath)
localUsedSize, err := segcore.GetLocalUsedSize(localRootPath)
if err != nil {
log.Warn("get local used size failed", zap.Error(err))
initError = err
Expand Down Expand Up @@ -371,7 +371,7 @@ func (node *QueryNode) Init() error {
node.subscribingChannels = typeutil.NewConcurrentSet[string]()
node.unsubscribingChannels = typeutil.NewConcurrentSet[string]()
node.manager = segments.NewManager()
node.loader = segments.NewLoader(node.manager, node.chunkManager)
node.loader = segments.NewLoader(node.ctx, node.manager, node.chunkManager)
node.manager.SetLoader(node.loader)
node.dispClient = msgdispatcher.NewClient(node.factory, typeutil.QueryNodeRole, node.GetNodeID())
// init pipeline manager
Expand Down
3 changes: 1 addition & 2 deletions internal/util/segcore/cgo_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ package segcore
import "C"

import (
"context"
"math"
"reflect"
"unsafe"
Expand Down Expand Up @@ -73,7 +72,7 @@ func getCProtoBlob(cProto *C.CProto) []byte {
}

// GetLocalUsedSize returns the used size of the local path
func GetLocalUsedSize(ctx context.Context, path string) (int64, error) {
func GetLocalUsedSize(path string) (int64, error) {
var availableSize int64
cSize := (*C.int64_t)(&availableSize)
cPath := C.CString(path)
Expand Down
3 changes: 1 addition & 2 deletions internal/util/segcore/cgo_util_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package segcore

import (
"context"
"runtime"
"testing"

Expand All @@ -17,7 +16,7 @@ func TestConsumeCStatusIntoError(t *testing.T) {
}

func TestGetLocalUsedSize(t *testing.T) {
size, err := GetLocalUsedSize(context.Background(), "")
size, err := GetLocalUsedSize("")
assert.NoError(t, err)
assert.NotNil(t, size)
}
Expand Down
9 changes: 9 additions & 0 deletions pkg/util/paramtable/component_param.go
Original file line number Diff line number Diff line change
Expand Up @@ -2518,6 +2518,7 @@ type queryNodeConfig struct {
// loader
IoPoolSize ParamItem `refreshable:"false"`
DeltaDataExpansionRate ParamItem `refreshable:"true"`
DiskSizeFetchInterval ParamItem `refreshable:"false"`

// schedule task policy.
SchedulePolicyName ParamItem `refreshable:"false"`
Expand Down Expand Up @@ -3134,6 +3135,14 @@ Max read concurrency must greater than or equal to 1, and less than or equal to
}
p.DeltaDataExpansionRate.Init(base.mgr)

p.DiskSizeFetchInterval = ParamItem{
Key: "querynode.diskSizeFetchInterval",
Version: "2.5.0",
DefaultValue: "60",
Doc: "The time interval in seconds for retrieving disk usage.",
}
p.DiskSizeFetchInterval.Init(base.mgr)

// schedule read task policy.
p.SchedulePolicyName = ParamItem{
Key: "queryNode.scheduler.scheduleReadPolicy.name",
Expand Down
1 change: 1 addition & 0 deletions pkg/util/paramtable/component_param_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,7 @@ func TestComponentParam(t *testing.T) {
assert.Equal(t, "/var/lib/milvus/data/mmap", Params.MmapDirPath.GetValue())

assert.Equal(t, true, Params.MmapChunkCache.GetAsBool())
assert.Equal(t, 60*time.Second, Params.DiskSizeFetchInterval.GetAsDuration(time.Second))
})

t.Run("test dataCoordConfig", func(t *testing.T) {
Expand Down

0 comments on commit b91c0a8

Please sign in to comment.