Skip to content

Commit

Permalink
enhance: Unify LoadStateLock RLock & PinIf (milvus-io#39206)
Browse files Browse the repository at this point in the history
Related to milvus-io#39205

This PR merge `RLock` & `PinIfNotReleased` into `PinIf` function
preventing segment being released before any Read operation finished.

---------

Signed-off-by: Congqi Xia <[email protected]>
  • Loading branch information
congqixia authored and gifi-siby committed Jan 16, 2025
1 parent e4dc5d0 commit a4fd1bd
Show file tree
Hide file tree
Showing 5 changed files with 167 additions and 118 deletions.
1 change: 1 addition & 0 deletions configs/milvus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -863,6 +863,7 @@ common:
threshold:
info: 500 # minimum milliseconds for printing durations in info level
warn: 1000 # minimum milliseconds for printing durations in warn level
maxWLockConditionalWaitTime: 600 # maximum seconds for waiting wlock conditional
storage:
scheme: s3
enablev2: false
Expand Down
62 changes: 31 additions & 31 deletions internal/querynodev2/segments/segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,7 @@ func (s *LocalSegment) initializeSegment() error {
// Provide ONLY the read lock operations,
// don't make `ptrLock` public to avoid abusing of the mutex.
func (s *LocalSegment) PinIfNotReleased() error {
if !s.ptrLock.PinIfNotReleased() {
if !s.ptrLock.PinIf(state.IsNotReleased) {
return merr.WrapErrSegmentNotLoaded(s.ID(), "segment released")
}
return nil
Expand All @@ -419,10 +419,10 @@ func (s *LocalSegment) InsertCount() int64 {

func (s *LocalSegment) RowNum() int64 {
// if segment is not loaded, return 0 (maybe not loaded or release by lru)
if !s.ptrLock.RLockIf(state.IsDataLoaded) {
if !s.ptrLock.PinIf(state.IsDataLoaded) {
return 0
}
defer s.ptrLock.RUnlock()
defer s.ptrLock.Unpin()

rowNum := s.rowNum.Load()
if rowNum < 0 {
Expand All @@ -436,10 +436,10 @@ func (s *LocalSegment) RowNum() int64 {
}

func (s *LocalSegment) MemSize() int64 {
if !s.ptrLock.RLockIf(state.IsNotReleased) {
if !s.ptrLock.PinIf(state.IsNotReleased) {
return 0
}
defer s.ptrLock.RUnlock()
defer s.ptrLock.Unpin()

memSize := s.memSize.Load()
if memSize < 0 {
Expand Down Expand Up @@ -470,10 +470,10 @@ func (s *LocalSegment) ExistIndex(fieldID int64) bool {
}

func (s *LocalSegment) HasRawData(fieldID int64) bool {
if !s.ptrLock.RLockIf(state.IsNotReleased) {
if !s.ptrLock.PinIf(state.IsNotReleased) {
return false
}
defer s.ptrLock.RUnlock()
defer s.ptrLock.Unpin()

return s.csegment.HasRawData(fieldID)
}
Expand All @@ -500,11 +500,11 @@ func (s *LocalSegment) Search(ctx context.Context, searchReq *segcore.SearchRequ
zap.String("segmentType", s.segmentType.String()),
)

if !s.ptrLock.RLockIf(state.IsNotReleased) {
if !s.ptrLock.PinIf(state.IsNotReleased) {
// TODO: check if the segment is readable but not released. too many related logic need to be refactor.
return nil, merr.WrapErrSegmentNotLoaded(s.ID(), "segment released")
}
defer s.ptrLock.RUnlock()
defer s.ptrLock.Unpin()

hasIndex := s.ExistIndex(searchReq.SearchFieldID())
log = log.With(zap.Bool("withIndex", hasIndex))
Expand All @@ -522,11 +522,11 @@ func (s *LocalSegment) Search(ctx context.Context, searchReq *segcore.SearchRequ
}

func (s *LocalSegment) retrieve(ctx context.Context, plan *segcore.RetrievePlan, log *zap.Logger) (*segcore.RetrieveResult, error) {
if !s.ptrLock.RLockIf(state.IsNotReleased) {
if !s.ptrLock.PinIf(state.IsNotReleased) {
// TODO: check if the segment is readable but not released. too many related logic need to be refactor.
return nil, merr.WrapErrSegmentNotLoaded(s.ID(), "segment released")
}
defer s.ptrLock.RUnlock()
defer s.ptrLock.Unpin()

log.Debug("begin to retrieve")

Expand Down Expand Up @@ -569,11 +569,11 @@ func (s *LocalSegment) Retrieve(ctx context.Context, plan *segcore.RetrievePlan)
}

func (s *LocalSegment) retrieveByOffsets(ctx context.Context, plan *segcore.RetrievePlanWithOffsets, log *zap.Logger) (*segcore.RetrieveResult, error) {
if !s.ptrLock.RLockIf(state.IsNotReleased) {
if !s.ptrLock.PinIf(state.IsNotReleased) {
// TODO: check if the segment is readable but not released. too many related logic need to be refactor.
return nil, merr.WrapErrSegmentNotLoaded(s.ID(), "segment released")
}
defer s.ptrLock.RUnlock()
defer s.ptrLock.Unpin()

log.Debug("begin to retrieve by offsets")
tr := timerecord.NewTimeRecorder("cgoRetrieveByOffsets")
Expand Down Expand Up @@ -631,10 +631,10 @@ func (s *LocalSegment) Insert(ctx context.Context, rowIDs []int64, timestamps []
if s.Type() != SegmentTypeGrowing {
return fmt.Errorf("unexpected segmentType when segmentInsert, segmentType = %s", s.segmentType.String())
}
if !s.ptrLock.RLockIf(state.IsNotReleased) {
if !s.ptrLock.PinIf(state.IsNotReleased) {
return merr.WrapErrSegmentNotLoaded(s.ID(), "segment released")
}
defer s.ptrLock.RUnlock()
defer s.ptrLock.Unpin()

var result *segcore.InsertResult
var err error
Expand Down Expand Up @@ -678,10 +678,10 @@ func (s *LocalSegment) Delete(ctx context.Context, primaryKeys storage.PrimaryKe
if primaryKeys.Len() == 0 {
return nil
}
if !s.ptrLock.RLockIf(state.IsNotReleased) {
if !s.ptrLock.PinIf(state.IsNotReleased) {
return merr.WrapErrSegmentNotLoaded(s.ID(), "segment released")
}
defer s.ptrLock.RUnlock()
defer s.ptrLock.Unpin()

var err error
GetDynamicPool().Submit(func() (any, error) {
Expand Down Expand Up @@ -715,10 +715,10 @@ func (s *LocalSegment) LoadMultiFieldData(ctx context.Context) error {
rowCount := loadInfo.GetNumOfRows()
fields := loadInfo.GetBinlogPaths()

if !s.ptrLock.RLockIf(state.IsNotReleased) {
if !s.ptrLock.PinIf(state.IsNotReleased) {
return merr.WrapErrSegmentNotLoaded(s.ID(), "segment released")
}
defer s.ptrLock.RUnlock()
defer s.ptrLock.Unpin()

log := log.Ctx(ctx).With(
zap.Int64("collectionID", s.Collection()),
Expand Down Expand Up @@ -759,10 +759,10 @@ func (s *LocalSegment) LoadMultiFieldData(ctx context.Context) error {
}

func (s *LocalSegment) LoadFieldData(ctx context.Context, fieldID int64, rowCount int64, field *datapb.FieldBinlog) error {
if !s.ptrLock.RLockIf(state.IsNotReleased) {
if !s.ptrLock.PinIf(state.IsNotReleased) {
return merr.WrapErrSegmentNotLoaded(s.ID(), "segment released")
}
defer s.ptrLock.RUnlock()
defer s.ptrLock.Unpin()

ctx, sp := otel.Tracer(typeutil.QueryNodeRole).Start(ctx, fmt.Sprintf("LoadFieldData-%d-%d", s.ID(), fieldID))
defer sp.End()
Expand Down Expand Up @@ -815,10 +815,10 @@ func (s *LocalSegment) LoadFieldData(ctx context.Context, fieldID int64, rowCoun
}

func (s *LocalSegment) AddFieldDataInfo(ctx context.Context, rowCount int64, fields []*datapb.FieldBinlog) error {
if !s.ptrLock.RLockIf(state.IsNotReleased) {
if !s.ptrLock.PinIf(state.IsNotReleased) {
return merr.WrapErrSegmentNotLoaded(s.ID(), "segment released")
}
defer s.ptrLock.RUnlock()
defer s.ptrLock.Unpin()

log := log.Ctx(ctx).WithLazy(
zap.Int64("collectionID", s.Collection()),
Expand Down Expand Up @@ -855,10 +855,10 @@ func (s *LocalSegment) LoadDeltaData(ctx context.Context, deltaData *storage.Del
pks, tss := deltaData.DeletePks(), deltaData.DeleteTimestamps()
rowNum := deltaData.DeleteRowCount()

if !s.ptrLock.RLockIf(state.IsNotReleased) {
if !s.ptrLock.PinIf(state.IsNotReleased) {
return merr.WrapErrSegmentNotLoaded(s.ID(), "segment released")
}
defer s.ptrLock.RUnlock()
defer s.ptrLock.Unpin()

log := log.Ctx(ctx).With(
zap.Int64("collectionID", s.Collection()),
Expand Down Expand Up @@ -1100,10 +1100,10 @@ func (s *LocalSegment) UpdateIndexInfo(ctx context.Context, indexInfo *querypb.F
zap.Int64("segmentID", s.ID()),
zap.Int64("fieldID", indexInfo.FieldID),
)
if !s.ptrLock.RLockIf(state.IsNotReleased) {
if !s.ptrLock.PinIf(state.IsNotReleased) {
return merr.WrapErrSegmentNotLoaded(s.ID(), "segment released")
}
defer s.ptrLock.RUnlock()
defer s.ptrLock.Unpin()

var status C.CStatus
GetDynamicPool().Submit(func() (any, error) {
Expand Down Expand Up @@ -1138,10 +1138,10 @@ func (s *LocalSegment) WarmupChunkCache(ctx context.Context, fieldID int64, mmap
zap.Int64("fieldID", fieldID),
zap.Bool("mmapEnabled", mmapEnabled),
)
if !s.ptrLock.RLockIf(state.IsNotReleased) {
if !s.ptrLock.PinIf(state.IsNotReleased) {
return
}
defer s.ptrLock.RUnlock()
defer s.ptrLock.Unpin()

var status C.CStatus

Expand All @@ -1165,10 +1165,10 @@ func (s *LocalSegment) WarmupChunkCache(ctx context.Context, fieldID int64, mmap
// the state transition of segment in segment loader will blocked.
// add a waiter to avoid it.
s.ptrLock.BlockUntilDataLoadedOrReleased()
if !s.ptrLock.RLockIf(state.IsNotReleased) {
if s.PinIfNotReleased() != nil {
return nil, nil
}
defer s.ptrLock.RUnlock()
defer s.Unpin()

cFieldID := C.int64_t(fieldID)
cMmapEnabled := C.bool(mmapEnabled)
Expand Down
Loading

0 comments on commit a4fd1bd

Please sign in to comment.