Skip to content

Commit

Permalink
enhance: Unify LoadStateLock RLock & PinIf (milvus-io#39206)
Browse files Browse the repository at this point in the history
Related to milvus-io#39205

This PR merge `RLock` & `PinIfNotReleased` into `PinIf` function
preventing segment being released before any Read operation finished.

---------

Signed-off-by: Congqi Xia <[email protected]>
  • Loading branch information
congqixia committed Jan 15, 2025
1 parent 995d5e1 commit e0f02e1
Show file tree
Hide file tree
Showing 5 changed files with 167 additions and 122 deletions.
1 change: 1 addition & 0 deletions configs/milvus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -836,6 +836,7 @@ common:
threshold:
info: 500 # minimum milliseconds for printing durations in info level
warn: 1000 # minimum milliseconds for printing durations in warn level
maxWLockConditionalWaitTime: 600 # maximum seconds for waiting wlock conditional
storage:
scheme: s3
enablev2: false
Expand Down
66 changes: 31 additions & 35 deletions internal/querynodev2/segments/segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,7 @@ func (s *LocalSegment) initializeSegment() error {
// Provide ONLY the read lock operations,
// don't make `ptrLock` public to avoid abusing of the mutex.
func (s *LocalSegment) PinIfNotReleased() error {
if !s.ptrLock.PinIfNotReleased() {
if !s.ptrLock.PinIf(state.IsNotReleased) {
return merr.WrapErrSegmentNotLoaded(s.ID(), "segment released")
}
return nil
Expand All @@ -465,10 +465,10 @@ func (s *LocalSegment) InsertCount() int64 {

func (s *LocalSegment) RowNum() int64 {
// if segment is not loaded, return 0 (maybe not loaded or release by lru)
if !s.ptrLock.RLockIf(state.IsDataLoaded) {
if !s.ptrLock.PinIf(state.IsDataLoaded) {
return 0
}
defer s.ptrLock.RUnlock()
defer s.ptrLock.Unpin()

rowNum := s.rowNum.Load()
if rowNum < 0 {
Expand All @@ -485,10 +485,10 @@ func (s *LocalSegment) RowNum() int64 {
}

func (s *LocalSegment) MemSize() int64 {
if !s.ptrLock.RLockIf(state.IsNotReleased) {
if !s.ptrLock.PinIf(state.IsNotReleased) {
return 0
}
defer s.ptrLock.RUnlock()
defer s.ptrLock.Unpin()

memSize := s.memSize.Load()
if memSize < 0 {
Expand Down Expand Up @@ -522,10 +522,10 @@ func (s *LocalSegment) ExistIndex(fieldID int64) bool {
}

func (s *LocalSegment) HasRawData(fieldID int64) bool {
if !s.ptrLock.RLockIf(state.IsNotReleased) {
if !s.ptrLock.PinIf(state.IsNotReleased) {
return false
}
defer s.ptrLock.RUnlock()
defer s.ptrLock.Unpin()

ret := C.HasRawData(s.ptr, C.int64_t(fieldID))
return bool(ret)
Expand Down Expand Up @@ -561,11 +561,12 @@ func (s *LocalSegment) Search(ctx context.Context, searchReq *SearchRequest) (*S
zap.Int64("segmentID", s.ID()),
zap.String("segmentType", s.segmentType.String()),
)
if !s.ptrLock.RLockIf(state.IsNotReleased) {

if !s.ptrLock.PinIf(state.IsNotReleased) {
// TODO: check if the segment is readable but not released. too many related logic need to be refactor.
return nil, merr.WrapErrSegmentNotLoaded(s.ID(), "segment released")
}
defer s.ptrLock.RUnlock()
defer s.ptrLock.Unpin()

traceCtx := ParseCTraceContext(ctx)
defer runtime.KeepAlive(traceCtx)
Expand Down Expand Up @@ -604,11 +605,9 @@ func (s *LocalSegment) Search(ctx context.Context, searchReq *SearchRequest) (*S
}

func (s *LocalSegment) Retrieve(ctx context.Context, plan *RetrievePlan) (*segcorepb.RetrieveResults, error) {
if !s.ptrLock.RLockIf(state.IsNotReleased) {
// TODO: check if the segment is readable but not released. too many related logic need to be refactor.
if !s.ptrLock.PinIf(state.IsNotReleased) {
return nil, merr.WrapErrSegmentNotLoaded(s.ID(), "segment released")
}
defer s.ptrLock.RUnlock()

log := log.Ctx(ctx).With(
zap.Int64("collectionID", s.Collection()),
Expand Down Expand Up @@ -673,18 +672,15 @@ func (s *LocalSegment) RetrieveByOffsets(ctx context.Context, plan *RetrievePlan
return nil, merr.WrapErrParameterInvalid("segment offsets", "empty offsets")
}

if !s.ptrLock.RLockIf(state.IsNotReleased) {
// TODO: check if the segment is readable but not released. too many related logic need to be refactor.
if !s.ptrLock.PinIf(state.IsNotReleased) {
return nil, merr.WrapErrSegmentNotLoaded(s.ID(), "segment released")
}
defer s.ptrLock.RUnlock()
defer s.ptrLock.Unpin()

fields := []zap.Field{
zap.Int64("collectionID", s.Collection()),
zap.Int64("partitionID", s.Partition()),
zap.Int64("segmentID", s.ID()),
zap.Int64("msgID", plan.msgID),
zap.String("segmentType", s.segmentType.String()),
zap.Int("resultNum", len(offsets)),
}

Expand Down Expand Up @@ -772,10 +768,10 @@ func (s *LocalSegment) Insert(ctx context.Context, rowIDs []int64, timestamps []
if s.Type() != SegmentTypeGrowing {
return fmt.Errorf("unexpected segmentType when segmentInsert, segmentType = %s", s.segmentType.String())
}
if !s.ptrLock.RLockIf(state.IsNotReleased) {
if !s.ptrLock.PinIf(state.IsNotReleased) {
return merr.WrapErrSegmentNotLoaded(s.ID(), "segment released")
}
defer s.ptrLock.RUnlock()
defer s.ptrLock.Unpin()

offset, err := s.preInsert(ctx, len(rowIDs))
if err != nil {
Expand Down Expand Up @@ -837,10 +833,10 @@ func (s *LocalSegment) Delete(ctx context.Context, primaryKeys []storage.Primary
if len(primaryKeys) == 0 {
return nil
}
if !s.ptrLock.RLockIf(state.IsNotReleased) {
if !s.ptrLock.PinIf(state.IsNotReleased) {
return merr.WrapErrSegmentNotLoaded(s.ID(), "segment released")
}
defer s.ptrLock.RUnlock()
defer s.ptrLock.Unpin()

cOffset := C.int64_t(0) // depre
cSize := C.int64_t(len(primaryKeys))
Expand Down Expand Up @@ -913,10 +909,10 @@ func (s *LocalSegment) LoadMultiFieldData(ctx context.Context) error {
rowCount := loadInfo.GetNumOfRows()
fields := loadInfo.GetBinlogPaths()

if !s.ptrLock.RLockIf(state.IsNotReleased) {
if !s.ptrLock.PinIf(state.IsNotReleased) {
return merr.WrapErrSegmentNotLoaded(s.ID(), "segment released")
}
defer s.ptrLock.RUnlock()
defer s.ptrLock.Unpin()

log := log.Ctx(ctx).With(
zap.Int64("collectionID", s.Collection()),
Expand Down Expand Up @@ -986,10 +982,10 @@ func (s *LocalSegment) LoadMultiFieldData(ctx context.Context) error {
}

func (s *LocalSegment) LoadFieldData(ctx context.Context, fieldID int64, rowCount int64, field *datapb.FieldBinlog) error {
if !s.ptrLock.RLockIf(state.IsNotReleased) {
if !s.ptrLock.PinIf(state.IsNotReleased) {
return merr.WrapErrSegmentNotLoaded(s.ID(), "segment released")
}
defer s.ptrLock.RUnlock()
defer s.ptrLock.Unpin()

ctx, sp := otel.Tracer(typeutil.QueryNodeRole).Start(ctx, fmt.Sprintf("LoadFieldData-%d-%d", s.ID(), fieldID))
defer sp.End()
Expand Down Expand Up @@ -1161,10 +1157,10 @@ func (s *LocalSegment) LoadDeltaData2(ctx context.Context, schema *schemapb.Coll
}

func (s *LocalSegment) AddFieldDataInfo(ctx context.Context, rowCount int64, fields []*datapb.FieldBinlog) error {
if !s.ptrLock.RLockIf(state.IsNotReleased) {
if !s.ptrLock.PinIf(state.IsNotReleased) {
return merr.WrapErrSegmentNotLoaded(s.ID(), "segment released")
}
defer s.ptrLock.RUnlock()
defer s.ptrLock.Unpin()

log := log.Ctx(ctx).With(
zap.Int64("collectionID", s.Collection()),
Expand Down Expand Up @@ -1214,10 +1210,10 @@ func (s *LocalSegment) LoadDeltaData(ctx context.Context, deltaData *storage.Del
pks, tss := deltaData.Pks, deltaData.Tss
rowNum := deltaData.RowCount

if !s.ptrLock.RLockIf(state.IsNotReleased) {
if !s.ptrLock.PinIf(state.IsNotReleased) {
return merr.WrapErrSegmentNotLoaded(s.ID(), "segment released")
}
defer s.ptrLock.RUnlock()
defer s.ptrLock.Unpin()

log := log.Ctx(ctx).With(
zap.Int64("collectionID", s.Collection()),
Expand Down Expand Up @@ -1450,10 +1446,10 @@ func (s *LocalSegment) UpdateIndexInfo(ctx context.Context, indexInfo *querypb.F
zap.Int64("segmentID", s.ID()),
zap.Int64("fieldID", indexInfo.FieldID),
)
if !s.ptrLock.RLockIf(state.IsNotReleased) {
if !s.ptrLock.PinIf(state.IsNotReleased) {
return merr.WrapErrSegmentNotLoaded(s.ID(), "segment released")
}
defer s.ptrLock.RUnlock()
defer s.ptrLock.Unpin()

var status C.CStatus
GetDynamicPool().Submit(func() (any, error) {
Expand Down Expand Up @@ -1488,10 +1484,10 @@ func (s *LocalSegment) WarmupChunkCache(ctx context.Context, fieldID int64, mmap
zap.Int64("fieldID", fieldID),
zap.Bool("mmapEnabled", mmapEnabled),
)
if !s.ptrLock.RLockIf(state.IsNotReleased) {
if !s.ptrLock.PinIf(state.IsNotReleased) {
return
}
defer s.ptrLock.RUnlock()
defer s.ptrLock.Unpin()

var status C.CStatus

Expand All @@ -1515,10 +1511,10 @@ func (s *LocalSegment) WarmupChunkCache(ctx context.Context, fieldID int64, mmap
// the state transition of segment in segment loader will blocked.
// add a waiter to avoid it.
s.ptrLock.BlockUntilDataLoadedOrReleased()
if !s.ptrLock.RLockIf(state.IsNotReleased) {
if s.PinIfNotReleased() != nil {
return nil, nil
}
defer s.ptrLock.RUnlock()
defer s.Unpin()

cFieldID := C.int64_t(fieldID)
cMmapEnabled := C.bool(mmapEnabled)
Expand Down
Loading

0 comments on commit e0f02e1

Please sign in to comment.