Skip to content

Commit

Permalink
enhance: prevent multiple query nodes from causing excessive occupanc…
Browse files Browse the repository at this point in the history
…y of a single node, leading to GPU memory overflow (#39276) (#38617)

issue: #39276

Signed-off-by: yusheng.ma <[email protected]>
  • Loading branch information
Presburger authored Jan 15, 2025
1 parent 0df2c75 commit 38881bf
Show file tree
Hide file tree
Showing 7 changed files with 179 additions and 10 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ milvus-gpu: build-cpp-gpu print-gpu-build-info
@source $(PWD)/scripts/setenv.sh && \
mkdir -p $(INSTALL_PATH) && go env -w CGO_ENABLED="1" && \
CGO_LDFLAGS="$(CGO_LDFLAGS)" CGO_CFLAGS="$(CGO_CFLAGS)" GO111MODULE=on $(GO) build -pgo=$(PGO_PATH)/default.pgo -ldflags="-r $${RPATH} -X '$(OBJPREFIX).BuildTags=$(BUILD_TAGS_GPU)' -X '$(OBJPREFIX).BuildTime=$(BUILD_TIME)' -X '$(OBJPREFIX).GitCommit=$(GIT_COMMIT)' -X '$(OBJPREFIX).GoVersion=$(GO_VERSION)'" \
-tags $(MILVUS_GO_BUILD_TAGS) -o $(INSTALL_PATH)/milvus $(PWD)/cmd/main.go 1>/dev/null
-tags "$(MILVUS_GO_BUILD_TAGS),cuda" -o $(INSTALL_PATH)/milvus $(PWD)/cmd/main.go 1>/dev/null

get-build-deps:
@(env bash $(PWD)/scripts/install_deps.sh)
Expand Down
1 change: 1 addition & 0 deletions configs/milvus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1101,6 +1101,7 @@ trace:
gpu:
initMemSize: 2048 # Gpu Memory Pool init size
maxMemSize: 4096 # Gpu Memory Pool Max size
overloadedMemoryThresholdPercentage: 95

# Any configuration related to the streaming node server.
streamingNode:
Expand Down
7 changes: 4 additions & 3 deletions internal/querynodev2/segments/segment_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,10 @@ import (

// ResourceUsage is used to estimate the resource usage of a sealed segment.
type ResourceUsage struct {
MemorySize uint64
DiskSize uint64
MmapFieldCount int
MemorySize uint64
DiskSize uint64
MmapFieldCount int
FieldGpuMemorySize []uint64
}

// Segment is the interface of a segment implementation.
Expand Down
56 changes: 52 additions & 4 deletions internal/querynodev2/segments/segment_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"context"
"fmt"
"io"
"math"
"path"
"runtime/debug"
"strconv"
Expand All @@ -44,6 +45,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/querynodev2/pkoracle"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/util/vecindexmgr"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
Expand Down Expand Up @@ -1384,6 +1386,7 @@ func (loader *segmentLoader) checkSegmentSize(ctx context.Context, segmentLoadIn
maxSegmentSize := uint64(0)
predictMemUsage := memUsage
predictDiskUsage := diskUsage
var predictGpuMemUsage []uint64
mmapFieldCount := 0
for _, loadInfo := range segmentLoadInfos {
collection := loader.manager.Collection.Get(loadInfo.GetCollectionID())
Expand All @@ -1406,6 +1409,7 @@ func (loader *segmentLoader) checkSegmentSize(ctx context.Context, segmentLoadIn
mmapFieldCount += usage.MmapFieldCount
predictDiskUsage += usage.DiskSize
predictMemUsage += usage.MemorySize
predictGpuMemUsage = usage.FieldGpuMemorySize
if usage.MemorySize > maxSegmentSize {
maxSegmentSize = usage.MemorySize
}
Expand Down Expand Up @@ -1440,6 +1444,10 @@ func (loader *segmentLoader) checkSegmentSize(ctx context.Context, segmentLoadIn
paramtable.Get().QueryNodeCfg.MaxDiskUsagePercentage.GetAsFloat()))
}

err := checkSegmentGpuMemSize(predictGpuMemUsage, float32(paramtable.Get().GpuConfig.OverloadedMemoryThresholdPercentage.GetAsFloat()))
if err != nil {
return 0, 0, err
}
return predictMemUsage - memUsage, predictDiskUsage - diskUsage, nil
}

Expand All @@ -1448,6 +1456,7 @@ func getResourceUsageEstimateOfSegment(schema *schemapb.CollectionSchema, loadIn
var segmentMemorySize, segmentDiskSize uint64
var indexMemorySize uint64
var mmapFieldCount int
var fieldGpuMemorySize []uint64

fieldID2IndexInfo := make(map[int64]*querypb.FieldIndexInfo)
for _, fieldIndexInfo := range loadInfo.IndexInfos {
Expand Down Expand Up @@ -1492,9 +1501,11 @@ func getResourceUsageEstimateOfSegment(schema *schemapb.CollectionSchema, loadIn
loadInfo.GetSegmentID(),
fieldIndexInfo.GetBuildID())
}

indexMemorySize += estimateResult.MaxMemoryCost
segmentDiskSize += estimateResult.MaxDiskCost
if vecindexmgr.GetVecIndexMgrInstance().IsGPUVecIndex(common.GetIndexType(fieldIndexInfo.IndexParams)) {
fieldGpuMemorySize = append(fieldGpuMemorySize, estimateResult.MaxMemoryCost)
}
if !estimateResult.HasRawData && !isVectorType {
shouldCalculateDataSize = true
}
Expand Down Expand Up @@ -1555,9 +1566,10 @@ func getResourceUsageEstimateOfSegment(schema *schemapb.CollectionSchema, loadIn
segmentMemorySize += uint64(float64(memSize) * expansionFactor)
}
return &ResourceUsage{
MemorySize: segmentMemorySize + indexMemorySize,
DiskSize: segmentDiskSize,
MmapFieldCount: mmapFieldCount,
MemorySize: segmentMemorySize + indexMemorySize,
DiskSize: segmentDiskSize,
MmapFieldCount: mmapFieldCount,
FieldGpuMemorySize: fieldGpuMemorySize,
}, nil
}

Expand Down Expand Up @@ -1680,3 +1692,39 @@ func getBinlogDataMemorySize(fieldBinlog *datapb.FieldBinlog) int64 {

return fieldSize
}

func checkSegmentGpuMemSize(fieldGpuMemSizeList []uint64, OverloadedMemoryThresholdPercentage float32) error {
gpuInfos, err := hardware.GetAllGPUMemoryInfo()
if err != nil {
if len(fieldGpuMemSizeList) == 0 {
return nil
}
return err
}
var usedGpuMem []uint64
var maxGpuMemSize []uint64
for _, gpuInfo := range gpuInfos {
usedGpuMem = append(usedGpuMem, gpuInfo.TotalMemory-gpuInfo.FreeMemory)
maxGpuMemSize = append(maxGpuMemSize, uint64(float32(gpuInfo.TotalMemory)*OverloadedMemoryThresholdPercentage))
}
currentGpuMem := usedGpuMem
for _, fieldGpuMem := range fieldGpuMemSizeList {
var minId int = -1
var minGpuMem uint64 = math.MaxUint64
for i := int(0); i < len(gpuInfos); i++ {
GpuiMem := currentGpuMem[i] + fieldGpuMem
if GpuiMem < maxGpuMemSize[i] && GpuiMem < minGpuMem {
minId = i
minGpuMem = GpuiMem
}
}
if minId == -1 {
return fmt.Errorf("load segment failed, GPU OOM if loaded, GpuMemUsage(bytes) = %v, usedGpuMem(bytes) = %v, maxGPUMem(bytes) = %v",
fieldGpuMem,
usedGpuMem,
maxGpuMemSize)
}
currentGpuMem[minId] += minGpuMem
}
return nil
}
18 changes: 18 additions & 0 deletions pkg/util/hardware/gpu_mem_info.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
//go:build !cuda
// +build !cuda

package hardware

import "github.com/cockroachdb/errors"

// GPUMemoryInfo holds information about a GPU's memory
type GPUMemoryInfo struct {
TotalMemory uint64 // Total memory available on the GPU
FreeMemory uint64 // Free memory available on the GPU
}

// GetAllGPUMemoryInfo returns mock GPU memory information for non-CUDA builds
func GetAllGPUMemoryInfo() ([]GPUMemoryInfo, error) {
// Mock error to indicate no CUDA support
return nil, errors.New("CUDA not supported: failed to retrieve GPU memory info or no GPUs found")
}
90 changes: 90 additions & 0 deletions pkg/util/hardware/gpu_mem_info_cuda.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
//go:build cuda
// +build cuda

package hardware

/*
#cgo CFLAGS: -I/usr/local/cuda/include
#cgo LDFLAGS: -L/usr/local/cuda/lib64 -lcudart
#include <cuda_runtime.h>
#include <stdlib.h>
// Structure to store GPU memory info
typedef struct {
size_t totalMemory;
size_t freeMemory;
} GPUMemoryInfo;
// Function to get memory info for all GPUs
int getAllGPUMemoryInfo(GPUMemoryInfo** infos) {
int deviceCount = 0;
cudaError_t err = cudaGetDeviceCount(&deviceCount);
if (err != cudaSuccess || deviceCount == 0) {
return 0; // No GPUs found or error occurred
}
// Allocate memory for the output array
*infos = (GPUMemoryInfo*)malloc(deviceCount * sizeof(GPUMemoryInfo));
if (*infos == NULL) {
return 0; // Memory allocation failed
}
for (int i = 0; i < deviceCount; ++i) {
if (cudaSetDevice(i) != cudaSuccess) {
(*infos)[i].totalMemory = 0;
(*infos)[i].freeMemory = 0;
continue; // Skip if the device cannot be set
}
size_t freeMem = 0, totalMem = 0;
if (cudaMemGetInfo(&freeMem, &totalMem) != cudaSuccess) {
(*infos)[i].totalMemory = 0;
(*infos)[i].freeMemory = 0;
continue; // Skip if memory info cannot be fetched
}
(*infos)[i].totalMemory = totalMem;
(*infos)[i].freeMemory = freeMem;
}
return deviceCount; // Return the number of devices processed
}
*/
import "C"
import (
"github.com/cockroachdb/errors"
"unsafe"
)

// GPUMemoryInfo represents a single GPU's memory information.
type GPUMemoryInfo struct {
TotalMemory uint64 // Total memory in bytes
FreeMemory uint64 // Free memory in bytes
}

// GetAllGPUMemoryInfo retrieves the memory information for all available GPUs.
// It returns a slice of GPUMemoryInfo and an error if no GPUs are found or retrieval fails.
func GetAllGPUMemoryInfo() ([]GPUMemoryInfo, error) {
var infos *C.GPUMemoryInfo

// Call the C function to retrieve GPU memory info
deviceCount := int(C.getAllGPUMemoryInfo(&infos))
if deviceCount == 0 {
return nil, errors.New("failed to retrieve GPU memory info or no GPUs found")
}
defer C.free(unsafe.Pointer(infos)) // Free the allocated memory

// Convert C array to Go slice
gpuInfos := make([]GPUMemoryInfo, 0, deviceCount)
infoArray := (*[1 << 30]C.GPUMemoryInfo)(unsafe.Pointer(infos))[:deviceCount:deviceCount]

for i := 0; i < deviceCount; i++ {
info := infoArray[i]
gpuInfos = append(gpuInfos, GPUMemoryInfo{
TotalMemory: uint64(info.totalMemory),
FreeMemory: uint64(info.freeMemory),
})
}

return gpuInfos, nil
}
15 changes: 13 additions & 2 deletions pkg/util/paramtable/component_param.go
Original file line number Diff line number Diff line change
Expand Up @@ -970,8 +970,9 @@ This helps Milvus-CDC synchronize incremental data`,
}

type gpuConfig struct {
InitSize ParamItem `refreshable:"false"`
MaxSize ParamItem `refreshable:"false"`
InitSize ParamItem `refreshable:"false"`
MaxSize ParamItem `refreshable:"false"`
OverloadedMemoryThresholdPercentage ParamItem `refreshable:"false"`
}

func (t *gpuConfig) init(base *BaseTable) {
Expand All @@ -992,6 +993,16 @@ func (t *gpuConfig) init(base *BaseTable) {
DefaultValue: "4096",
}
t.MaxSize.Init(base.mgr)
t.OverloadedMemoryThresholdPercentage = ParamItem{
Key: "gpu.overloadedMemoryThresholdPercentage",
Version: "2.5.4",
Export: true,
DefaultValue: "95",
Formatter: func(v string) string {
return fmt.Sprintf("%f", getAsFloat(v)/100)
},
}
t.OverloadedMemoryThresholdPercentage.Init(base.mgr)
}

type traceConfig struct {
Expand Down

0 comments on commit 38881bf

Please sign in to comment.