Skip to content

Commit

Permalink
enhance: [2.5] Unify LoadStateLock RLock & PinIf (#39206) (#39255)
Browse files Browse the repository at this point in the history
Cherry-pick from master
pr: #39206 #39308
Related to #39205

This PR merge `RLock` & `PinIfNotReleased` into `PinIf` function
preventing segment being released before any Read operation finished.

---------

---------

Signed-off-by: Congqi Xia <[email protected]>
  • Loading branch information
congqixia authored Jan 15, 2025
1 parent 9b916f2 commit b658467
Show file tree
Hide file tree
Showing 5 changed files with 182 additions and 123 deletions.
1 change: 1 addition & 0 deletions configs/milvus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -863,6 +863,7 @@ common:
threshold:
info: 500 # minimum milliseconds for printing durations in info level
warn: 1000 # minimum milliseconds for printing durations in warn level
maxWLockConditionalWaitTime: 600 # maximum seconds for waiting wlock conditional
storage:
scheme: s3
enablev2: false
Expand Down
71 changes: 36 additions & 35 deletions internal/querynodev2/segments/segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@ func (s *LocalSegment) initializeSegment() error {
// Provide ONLY the read lock operations,
// don't make `ptrLock` public to avoid abusing of the mutex.
func (s *LocalSegment) PinIfNotReleased() error {
if !s.ptrLock.PinIfNotReleased() {
if !s.ptrLock.PinIf(state.IsNotReleased) {
return merr.WrapErrSegmentNotLoaded(s.ID(), "segment released")
}
return nil
Expand All @@ -430,10 +430,10 @@ func (s *LocalSegment) InsertCount() int64 {

func (s *LocalSegment) RowNum() int64 {
// if segment is not loaded, return 0 (maybe not loaded or release by lru)
if !s.ptrLock.RLockIf(state.IsDataLoaded) {
if !s.ptrLock.PinIf(state.IsDataLoaded) {
return 0
}
defer s.ptrLock.RUnlock()
defer s.ptrLock.Unpin()

rowNum := s.rowNum.Load()
if rowNum < 0 {
Expand All @@ -447,10 +447,10 @@ func (s *LocalSegment) RowNum() int64 {
}

func (s *LocalSegment) MemSize() int64 {
if !s.ptrLock.RLockIf(state.IsNotReleased) {
if !s.ptrLock.PinIf(state.IsNotReleased) {
return 0
}
defer s.ptrLock.RUnlock()
defer s.ptrLock.Unpin()

memSize := s.memSize.Load()
if memSize < 0 {
Expand Down Expand Up @@ -481,10 +481,10 @@ func (s *LocalSegment) ExistIndex(fieldID int64) bool {
}

func (s *LocalSegment) HasRawData(fieldID int64) bool {
if !s.ptrLock.RLockIf(state.IsNotReleased) {
if !s.ptrLock.PinIf(state.IsNotReleased) {
return false
}
defer s.ptrLock.RUnlock()
defer s.ptrLock.Unpin()

return s.csegment.HasRawData(fieldID)
}
Expand All @@ -511,11 +511,11 @@ func (s *LocalSegment) Search(ctx context.Context, searchReq *segcore.SearchRequ
zap.String("segmentType", s.segmentType.String()),
)

if !s.ptrLock.RLockIf(state.IsNotReleased) {
if !s.ptrLock.PinIf(state.IsNotReleased) {
// TODO: check if the segment is readable but not released. too many related logic need to be refactor.
return nil, merr.WrapErrSegmentNotLoaded(s.ID(), "segment released")
}
defer s.ptrLock.RUnlock()
defer s.ptrLock.Unpin()

hasIndex := s.ExistIndex(searchReq.SearchFieldID())
log = log.With(zap.Bool("withIndex", hasIndex))
Expand All @@ -533,11 +533,11 @@ func (s *LocalSegment) Search(ctx context.Context, searchReq *segcore.SearchRequ
}

func (s *LocalSegment) retrieve(ctx context.Context, plan *segcore.RetrievePlan, log *zap.Logger) (*segcore.RetrieveResult, error) {
if !s.ptrLock.RLockIf(state.IsNotReleased) {
if !s.ptrLock.PinIf(state.IsNotReleased) {
// TODO: check if the segment is readable but not released. too many related logic need to be refactor.
return nil, merr.WrapErrSegmentNotLoaded(s.ID(), "segment released")
}
defer s.ptrLock.RUnlock()
defer s.ptrLock.Unpin()

log.Debug("begin to retrieve")

Expand Down Expand Up @@ -580,11 +580,11 @@ func (s *LocalSegment) Retrieve(ctx context.Context, plan *segcore.RetrievePlan)
}

func (s *LocalSegment) retrieveByOffsets(ctx context.Context, plan *segcore.RetrievePlanWithOffsets, log *zap.Logger) (*segcore.RetrieveResult, error) {
if !s.ptrLock.RLockIf(state.IsNotReleased) {
if !s.ptrLock.PinIf(state.IsNotReleased) {
// TODO: check if the segment is readable but not released. too many related logic need to be refactor.
return nil, merr.WrapErrSegmentNotLoaded(s.ID(), "segment released")
}
defer s.ptrLock.RUnlock()
defer s.ptrLock.Unpin()

log.Debug("begin to retrieve by offsets")
tr := timerecord.NewTimeRecorder("cgoRetrieveByOffsets")
Expand Down Expand Up @@ -642,10 +642,10 @@ func (s *LocalSegment) Insert(ctx context.Context, rowIDs []int64, timestamps []
if s.Type() != SegmentTypeGrowing {
return fmt.Errorf("unexpected segmentType when segmentInsert, segmentType = %s", s.segmentType.String())
}
if !s.ptrLock.RLockIf(state.IsNotReleased) {
if !s.ptrLock.PinIf(state.IsNotReleased) {
return merr.WrapErrSegmentNotLoaded(s.ID(), "segment released")
}
defer s.ptrLock.RUnlock()
defer s.ptrLock.Unpin()

var result *segcore.InsertResult
var err error
Expand Down Expand Up @@ -689,10 +689,10 @@ func (s *LocalSegment) Delete(ctx context.Context, primaryKeys storage.PrimaryKe
if primaryKeys.Len() == 0 {
return nil
}
if !s.ptrLock.RLockIf(state.IsNotReleased) {
if !s.ptrLock.PinIf(state.IsNotReleased) {
return merr.WrapErrSegmentNotLoaded(s.ID(), "segment released")
}
defer s.ptrLock.RUnlock()
defer s.ptrLock.Unpin()

var err error
GetDynamicPool().Submit(func() (any, error) {
Expand Down Expand Up @@ -726,10 +726,10 @@ func (s *LocalSegment) LoadMultiFieldData(ctx context.Context) error {
rowCount := loadInfo.GetNumOfRows()
fields := loadInfo.GetBinlogPaths()

if !s.ptrLock.RLockIf(state.IsNotReleased) {
if !s.ptrLock.PinIf(state.IsNotReleased) {
return merr.WrapErrSegmentNotLoaded(s.ID(), "segment released")
}
defer s.ptrLock.RUnlock()
defer s.ptrLock.Unpin()

log := log.Ctx(ctx).With(
zap.Int64("collectionID", s.Collection()),
Expand Down Expand Up @@ -770,10 +770,10 @@ func (s *LocalSegment) LoadMultiFieldData(ctx context.Context) error {
}

func (s *LocalSegment) LoadFieldData(ctx context.Context, fieldID int64, rowCount int64, field *datapb.FieldBinlog) error {
if !s.ptrLock.RLockIf(state.IsNotReleased) {
if !s.ptrLock.PinIf(state.IsNotReleased) {
return merr.WrapErrSegmentNotLoaded(s.ID(), "segment released")
}
defer s.ptrLock.RUnlock()
defer s.ptrLock.Unpin()

ctx, sp := otel.Tracer(typeutil.QueryNodeRole).Start(ctx, fmt.Sprintf("LoadFieldData-%d-%d", s.ID(), fieldID))
defer sp.End()
Expand Down Expand Up @@ -826,10 +826,10 @@ func (s *LocalSegment) LoadFieldData(ctx context.Context, fieldID int64, rowCoun
}

func (s *LocalSegment) AddFieldDataInfo(ctx context.Context, rowCount int64, fields []*datapb.FieldBinlog) error {
if !s.ptrLock.RLockIf(state.IsNotReleased) {
if !s.ptrLock.PinIf(state.IsNotReleased) {
return merr.WrapErrSegmentNotLoaded(s.ID(), "segment released")
}
defer s.ptrLock.RUnlock()
defer s.ptrLock.Unpin()

log := log.Ctx(ctx).WithLazy(
zap.Int64("collectionID", s.Collection()),
Expand Down Expand Up @@ -866,10 +866,10 @@ func (s *LocalSegment) LoadDeltaData(ctx context.Context, deltaData *storage.Del
pks, tss := deltaData.DeletePks(), deltaData.DeleteTimestamps()
rowNum := deltaData.DeleteRowCount()

if !s.ptrLock.RLockIf(state.IsNotReleased) {
if !s.ptrLock.PinIf(state.IsNotReleased) {
return merr.WrapErrSegmentNotLoaded(s.ID(), "segment released")
}
defer s.ptrLock.RUnlock()
defer s.ptrLock.Unpin()

log := log.Ctx(ctx).With(
zap.Int64("collectionID", s.Collection()),
Expand Down Expand Up @@ -1111,10 +1111,10 @@ func (s *LocalSegment) UpdateIndexInfo(ctx context.Context, indexInfo *querypb.F
zap.Int64("segmentID", s.ID()),
zap.Int64("fieldID", indexInfo.FieldID),
)
if !s.ptrLock.RLockIf(state.IsNotReleased) {
if !s.ptrLock.PinIf(state.IsNotReleased) {
return merr.WrapErrSegmentNotLoaded(s.ID(), "segment released")
}
defer s.ptrLock.RUnlock()
defer s.ptrLock.Unpin()

var status C.CStatus
GetDynamicPool().Submit(func() (any, error) {
Expand Down Expand Up @@ -1149,10 +1149,10 @@ func (s *LocalSegment) WarmupChunkCache(ctx context.Context, fieldID int64, mmap
zap.Int64("fieldID", fieldID),
zap.Bool("mmapEnabled", mmapEnabled),
)
if !s.ptrLock.RLockIf(state.IsNotReleased) {
if !s.ptrLock.PinIf(state.IsNotReleased) {
return
}
defer s.ptrLock.RUnlock()
defer s.ptrLock.Unpin()

var status C.CStatus

Expand All @@ -1172,14 +1172,15 @@ func (s *LocalSegment) WarmupChunkCache(ctx context.Context, fieldID int64, mmap
}).Await()
case "async":
GetWarmupPool().Submit(func() (any, error) {
// bad implemtation, warmup is async at another goroutine and hold the rlock.
// the state transition of segment in segment loader will blocked.
// add a waiter to avoid it.
s.ptrLock.BlockUntilDataLoadedOrReleased()
if !s.ptrLock.RLockIf(state.IsNotReleased) {
// failed to wait for state update, return directly
if !s.ptrLock.BlockUntilDataLoadedOrReleased() {
return nil, nil
}
defer s.ptrLock.RUnlock()

if s.PinIfNotReleased() != nil {
return nil, nil
}
defer s.Unpin()

cFieldID := C.int64_t(fieldID)
cMmapEnabled := C.bool(mmapEnabled)
Expand Down
Loading

0 comments on commit b658467

Please sign in to comment.