From 618f0cb728dd3a25f7c140c4fb944c93f12761a7 Mon Sep 17 00:00:00 2001 From: congqixia Date: Thu, 5 Dec 2024 11:04:40 +0800 Subject: [PATCH] enhance: Put release segment and other misc cgo call into pool (#38186) Related to #30273 Signed-off-by: Congqi Xia --- internal/querynodev2/segments/segment.go | 26 ++++++++++++------- .../querynodev2/segments/segment_loader.go | 7 +++-- internal/querynodev2/segments/utils.go | 8 ++++-- 3 files changed, 28 insertions(+), 13 deletions(-) diff --git a/internal/querynodev2/segments/segment.go b/internal/querynodev2/segments/segment.go index 055b029063918..41188c1139bd5 100644 --- a/internal/querynodev2/segments/segment.go +++ b/internal/querynodev2/segments/segment.go @@ -1272,20 +1272,25 @@ func (s *LocalSegment) Release(ctx context.Context, opts ...releaseOption) { return } - C.DeleteSegment(ptr) - - localDiskUsage, err := segcore.GetLocalUsedSize(context.Background(), paramtable.Get().LocalStorageCfg.Path.GetValue()) - // ignore error here, shall not block releasing - if err == nil { - metrics.QueryNodeDiskUsedSize.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Set(float64(localDiskUsage) / 1024 / 1024) // in MB - } + GetDynamicPool().Submit(func() (any, error) { + C.DeleteSegment(ptr) + localDiskUsage, err := segcore.GetLocalUsedSize(context.Background(), paramtable.Get().LocalStorageCfg.Path.GetValue()) + // ignore error here, shall not block releasing + if err == nil { + metrics.QueryNodeDiskUsedSize.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Set(float64(localDiskUsage) / 1024 / 1024) // in MB + } + return nil, nil + }).Await() log.Info("delete segment from memory") } // ReleaseSegmentData releases the segment data. func (s *LocalSegment) ReleaseSegmentData() { - C.ClearSegmentData(s.ptr) + GetDynamicPool().Submit(func() (any, error) { + C.ClearSegmentData(s.ptr) + return nil, nil + }).Await() for _, indexInfo := range s.Indexes() { indexInfo.IsLoaded = false } @@ -1309,7 +1314,10 @@ func (s *LocalSegment) startRelease(scope ReleaseScope) state.LoadStateLockGuard } func (s *LocalSegment) RemoveFieldFile(fieldId int64) { - C.RemoveFieldFile(s.ptr, C.int64_t(fieldId)) + GetDynamicPool().Submit(func() (any, error) { + C.RemoveFieldFile(s.ptr, C.int64_t(fieldId)) + return nil, nil + }).Await() } func (s *LocalSegment) RemoveUnusedFieldFiles() error { diff --git a/internal/querynodev2/segments/segment_loader.go b/internal/querynodev2/segments/segment_loader.go index 72d0f72911095..6e883ab30781a 100644 --- a/internal/querynodev2/segments/segment_loader.go +++ b/internal/querynodev2/segments/segment_loader.go @@ -1479,8 +1479,11 @@ func getResourceUsageEstimateOfSegment(schema *schemapb.CollectionSchema, loadIn if fieldIndexInfo, ok := fieldID2IndexInfo[fieldID]; ok { var estimateResult ResourceEstimate err := GetCLoadInfoWithFunc(ctx, fieldSchema, loadInfo, fieldIndexInfo, func(c *LoadIndexInfo) error { - loadResourceRequest := C.EstimateLoadIndexResource(c.cLoadIndexInfo) - estimateResult = GetResourceEstimate(&loadResourceRequest) + GetDynamicPool().Submit(func() (any, error) { + loadResourceRequest := C.EstimateLoadIndexResource(c.cLoadIndexInfo) + estimateResult = GetResourceEstimate(&loadResourceRequest) + return nil, nil + }).Await() return nil }) if err != nil { diff --git a/internal/querynodev2/segments/utils.go b/internal/querynodev2/segments/utils.go index d9ebb961ff991..3c1108e937a92 100644 --- a/internal/querynodev2/segments/utils.go +++ b/internal/querynodev2/segments/utils.go @@ -183,8 +183,12 @@ func mergeRequestCost(requestCosts []*internalpb.CostAggregation) *internalpb.Co } func getIndexEngineVersion() (minimal, current int32) { - cMinimal, cCurrent := C.GetMinimalIndexVersion(), C.GetCurrentIndexVersion() - return int32(cMinimal), int32(cCurrent) + GetDynamicPool().Submit(func() (any, error) { + cMinimal, cCurrent := C.GetMinimalIndexVersion(), C.GetCurrentIndexVersion() + minimal, current = int32(cMinimal), int32(cCurrent) + return nil, nil + }).Await() + return minimal, current } // getSegmentMetricLabel returns the label for segment metrics.