Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enhance: Unify LoadStateLock RLock & PinIf #39206

Merged
merged 3 commits into from
Jan 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions configs/milvus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -863,6 +863,7 @@ common:
threshold:
info: 500 # minimum milliseconds for printing durations in info level
warn: 1000 # minimum milliseconds for printing durations in warn level
maxWLockConditionalWaitTime: 600 # maximum seconds for waiting wlock conditional
storage:
scheme: s3
enablev2: false
Expand Down
62 changes: 31 additions & 31 deletions internal/querynodev2/segments/segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,7 @@ func (s *LocalSegment) initializeSegment() error {
// Provide ONLY the read lock operations,
// don't make `ptrLock` public to avoid abusing of the mutex.
func (s *LocalSegment) PinIfNotReleased() error {
if !s.ptrLock.PinIfNotReleased() {
if !s.ptrLock.PinIf(state.IsNotReleased) {
return merr.WrapErrSegmentNotLoaded(s.ID(), "segment released")
}
return nil
Expand All @@ -419,10 +419,10 @@ func (s *LocalSegment) InsertCount() int64 {

func (s *LocalSegment) RowNum() int64 {
// if segment is not loaded, return 0 (maybe not loaded or release by lru)
if !s.ptrLock.RLockIf(state.IsDataLoaded) {
if !s.ptrLock.PinIf(state.IsDataLoaded) {
return 0
}
defer s.ptrLock.RUnlock()
defer s.ptrLock.Unpin()

rowNum := s.rowNum.Load()
if rowNum < 0 {
Expand All @@ -436,10 +436,10 @@ func (s *LocalSegment) RowNum() int64 {
}

func (s *LocalSegment) MemSize() int64 {
if !s.ptrLock.RLockIf(state.IsNotReleased) {
if !s.ptrLock.PinIf(state.IsNotReleased) {
return 0
}
defer s.ptrLock.RUnlock()
defer s.ptrLock.Unpin()

memSize := s.memSize.Load()
if memSize < 0 {
Expand Down Expand Up @@ -470,10 +470,10 @@ func (s *LocalSegment) ExistIndex(fieldID int64) bool {
}

func (s *LocalSegment) HasRawData(fieldID int64) bool {
if !s.ptrLock.RLockIf(state.IsNotReleased) {
if !s.ptrLock.PinIf(state.IsNotReleased) {
return false
}
defer s.ptrLock.RUnlock()
defer s.ptrLock.Unpin()

return s.csegment.HasRawData(fieldID)
}
Expand All @@ -500,11 +500,11 @@ func (s *LocalSegment) Search(ctx context.Context, searchReq *segcore.SearchRequ
zap.String("segmentType", s.segmentType.String()),
)

if !s.ptrLock.RLockIf(state.IsNotReleased) {
if !s.ptrLock.PinIf(state.IsNotReleased) {
// TODO: check if the segment is readable but not released. too many related logic need to be refactor.
return nil, merr.WrapErrSegmentNotLoaded(s.ID(), "segment released")
}
defer s.ptrLock.RUnlock()
defer s.ptrLock.Unpin()

hasIndex := s.ExistIndex(searchReq.SearchFieldID())
log = log.With(zap.Bool("withIndex", hasIndex))
Expand All @@ -522,11 +522,11 @@ func (s *LocalSegment) Search(ctx context.Context, searchReq *segcore.SearchRequ
}

func (s *LocalSegment) retrieve(ctx context.Context, plan *segcore.RetrievePlan, log *zap.Logger) (*segcore.RetrieveResult, error) {
if !s.ptrLock.RLockIf(state.IsNotReleased) {
if !s.ptrLock.PinIf(state.IsNotReleased) {
// TODO: check if the segment is readable but not released. too many related logic need to be refactor.
return nil, merr.WrapErrSegmentNotLoaded(s.ID(), "segment released")
}
defer s.ptrLock.RUnlock()
defer s.ptrLock.Unpin()

log.Debug("begin to retrieve")

Expand Down Expand Up @@ -569,11 +569,11 @@ func (s *LocalSegment) Retrieve(ctx context.Context, plan *segcore.RetrievePlan)
}

func (s *LocalSegment) retrieveByOffsets(ctx context.Context, plan *segcore.RetrievePlanWithOffsets, log *zap.Logger) (*segcore.RetrieveResult, error) {
if !s.ptrLock.RLockIf(state.IsNotReleased) {
if !s.ptrLock.PinIf(state.IsNotReleased) {
// TODO: check if the segment is readable but not released. too many related logic need to be refactor.
return nil, merr.WrapErrSegmentNotLoaded(s.ID(), "segment released")
}
defer s.ptrLock.RUnlock()
defer s.ptrLock.Unpin()

log.Debug("begin to retrieve by offsets")
tr := timerecord.NewTimeRecorder("cgoRetrieveByOffsets")
Expand Down Expand Up @@ -631,10 +631,10 @@ func (s *LocalSegment) Insert(ctx context.Context, rowIDs []int64, timestamps []
if s.Type() != SegmentTypeGrowing {
return fmt.Errorf("unexpected segmentType when segmentInsert, segmentType = %s", s.segmentType.String())
}
if !s.ptrLock.RLockIf(state.IsNotReleased) {
if !s.ptrLock.PinIf(state.IsNotReleased) {
return merr.WrapErrSegmentNotLoaded(s.ID(), "segment released")
}
defer s.ptrLock.RUnlock()
defer s.ptrLock.Unpin()

var result *segcore.InsertResult
var err error
Expand Down Expand Up @@ -678,10 +678,10 @@ func (s *LocalSegment) Delete(ctx context.Context, primaryKeys storage.PrimaryKe
if primaryKeys.Len() == 0 {
return nil
}
if !s.ptrLock.RLockIf(state.IsNotReleased) {
if !s.ptrLock.PinIf(state.IsNotReleased) {
return merr.WrapErrSegmentNotLoaded(s.ID(), "segment released")
}
defer s.ptrLock.RUnlock()
defer s.ptrLock.Unpin()

var err error
GetDynamicPool().Submit(func() (any, error) {
Expand Down Expand Up @@ -715,10 +715,10 @@ func (s *LocalSegment) LoadMultiFieldData(ctx context.Context) error {
rowCount := loadInfo.GetNumOfRows()
fields := loadInfo.GetBinlogPaths()

if !s.ptrLock.RLockIf(state.IsNotReleased) {
if !s.ptrLock.PinIf(state.IsNotReleased) {
return merr.WrapErrSegmentNotLoaded(s.ID(), "segment released")
}
defer s.ptrLock.RUnlock()
defer s.ptrLock.Unpin()

log := log.Ctx(ctx).With(
zap.Int64("collectionID", s.Collection()),
Expand Down Expand Up @@ -759,10 +759,10 @@ func (s *LocalSegment) LoadMultiFieldData(ctx context.Context) error {
}

func (s *LocalSegment) LoadFieldData(ctx context.Context, fieldID int64, rowCount int64, field *datapb.FieldBinlog) error {
if !s.ptrLock.RLockIf(state.IsNotReleased) {
if !s.ptrLock.PinIf(state.IsNotReleased) {
return merr.WrapErrSegmentNotLoaded(s.ID(), "segment released")
}
defer s.ptrLock.RUnlock()
defer s.ptrLock.Unpin()

ctx, sp := otel.Tracer(typeutil.QueryNodeRole).Start(ctx, fmt.Sprintf("LoadFieldData-%d-%d", s.ID(), fieldID))
defer sp.End()
Expand Down Expand Up @@ -815,10 +815,10 @@ func (s *LocalSegment) LoadFieldData(ctx context.Context, fieldID int64, rowCoun
}

func (s *LocalSegment) AddFieldDataInfo(ctx context.Context, rowCount int64, fields []*datapb.FieldBinlog) error {
if !s.ptrLock.RLockIf(state.IsNotReleased) {
if !s.ptrLock.PinIf(state.IsNotReleased) {
return merr.WrapErrSegmentNotLoaded(s.ID(), "segment released")
}
defer s.ptrLock.RUnlock()
defer s.ptrLock.Unpin()

log := log.Ctx(ctx).WithLazy(
zap.Int64("collectionID", s.Collection()),
Expand Down Expand Up @@ -855,10 +855,10 @@ func (s *LocalSegment) LoadDeltaData(ctx context.Context, deltaData *storage.Del
pks, tss := deltaData.DeletePks(), deltaData.DeleteTimestamps()
rowNum := deltaData.DeleteRowCount()

if !s.ptrLock.RLockIf(state.IsNotReleased) {
if !s.ptrLock.PinIf(state.IsNotReleased) {
return merr.WrapErrSegmentNotLoaded(s.ID(), "segment released")
}
defer s.ptrLock.RUnlock()
defer s.ptrLock.Unpin()

log := log.Ctx(ctx).With(
zap.Int64("collectionID", s.Collection()),
Expand Down Expand Up @@ -1100,10 +1100,10 @@ func (s *LocalSegment) UpdateIndexInfo(ctx context.Context, indexInfo *querypb.F
zap.Int64("segmentID", s.ID()),
zap.Int64("fieldID", indexInfo.FieldID),
)
if !s.ptrLock.RLockIf(state.IsNotReleased) {
if !s.ptrLock.PinIf(state.IsNotReleased) {
return merr.WrapErrSegmentNotLoaded(s.ID(), "segment released")
}
defer s.ptrLock.RUnlock()
defer s.ptrLock.Unpin()

var status C.CStatus
GetDynamicPool().Submit(func() (any, error) {
Expand Down Expand Up @@ -1138,10 +1138,10 @@ func (s *LocalSegment) WarmupChunkCache(ctx context.Context, fieldID int64, mmap
zap.Int64("fieldID", fieldID),
zap.Bool("mmapEnabled", mmapEnabled),
)
if !s.ptrLock.RLockIf(state.IsNotReleased) {
if !s.ptrLock.PinIf(state.IsNotReleased) {
return
}
defer s.ptrLock.RUnlock()
defer s.ptrLock.Unpin()

var status C.CStatus

Expand All @@ -1165,10 +1165,10 @@ func (s *LocalSegment) WarmupChunkCache(ctx context.Context, fieldID int64, mmap
// the state transition of segment in segment loader will blocked.
// add a waiter to avoid it.
s.ptrLock.BlockUntilDataLoadedOrReleased()
if !s.ptrLock.RLockIf(state.IsNotReleased) {
if s.PinIfNotReleased() != nil {
return nil, nil
}
defer s.ptrLock.RUnlock()
defer s.Unpin()

cFieldID := C.int64_t(fieldID)
cMmapEnabled := C.bool(mmapEnabled)
Expand Down
Loading
Loading