From 2c08722d7d026fb301d3c0965ab203555dfd0038 Mon Sep 17 00:00:00 2001 From: congqixia Date: Fri, 19 Jul 2024 15:11:41 +0800 Subject: [PATCH] enhance: Add dynamic cgo pool for proxy CGO call (#34768) Related to #34705 Signed-off-by: Congqi Xia --- internal/proxy/cgo_util.go | 48 ++++++++++++++++++++++++++++++++++---- 1 file changed, 43 insertions(+), 5 deletions(-) diff --git a/internal/proxy/cgo_util.go b/internal/proxy/cgo_util.go index ec91c8d3b2d1d..ca1336d0483cd 100644 --- a/internal/proxy/cgo_util.go +++ b/internal/proxy/cgo_util.go @@ -24,15 +24,53 @@ package proxy import "C" import ( + "runtime" + "sync" "unsafe" + "go.uber.org/atomic" + "go.uber.org/zap" + "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/util/conc" + "github.com/milvus-io/milvus/pkg/util/hardware" +) + +var ( + dp atomic.Pointer[conc.Pool[any]] + dynOnce sync.Once ) +func initDynamicPool() { + dynOnce.Do(func() { + pool := conc.NewPool[any]( + hardware.GetCPUNum(), + conc.WithPreAlloc(false), + conc.WithDisablePurge(false), + conc.WithPreHandler(runtime.LockOSThread), // lock os thread for cgo thread disposal + ) + + dp.Store(pool) + log.Info("init dynamicPool done", zap.Int("size", hardware.GetCPUNum())) + }) +} + +// GetDynamicPool returns the singleton pool for dynamic cgo operations. +func GetDynamicPool() *conc.Pool[any] { + initDynamicPool() + return dp.Load() +} + func CheckVecIndexWithDataTypeExist(name string, dType schemapb.DataType) bool { - cIndexName := C.CString(name) - cType := uint32(dType) - defer C.free(unsafe.Pointer(cIndexName)) - check := bool(C.CheckVecIndexWithDataType(cIndexName, cType)) - return check + var result bool + GetDynamicPool().Submit(func() (any, error) { + cIndexName := C.CString(name) + cType := uint32(dType) + defer C.free(unsafe.Pointer(cIndexName)) + result = bool(C.CheckVecIndexWithDataType(cIndexName, cType)) + return nil, nil + }).Await() + + return result }