From 6f6923c1d64152b811c784d5c784407769b9b669 Mon Sep 17 00:00:00 2001 From: Congqi Xia Date: Mon, 13 Jan 2025 15:29:26 +0800 Subject: [PATCH 1/3] enhance: Unify LoadStateLock RLock & PinIf Related to #39205 This PR merge `RLock` & `PinIfNotReleased` into `PinIf` function preventing segment being released before any Read operation finished. Signed-off-by: Congqi Xia --- internal/querynodev2/segments/segment.go | 62 +++++++++---------- .../segments/state/load_state_lock.go | 33 +++++----- .../segments/state/load_state_lock_test.go | 18 +++--- 3 files changed, 55 insertions(+), 58 deletions(-) diff --git a/internal/querynodev2/segments/segment.go b/internal/querynodev2/segments/segment.go index fd9c51157a288..db16d70e4b2fd 100644 --- a/internal/querynodev2/segments/segment.go +++ b/internal/querynodev2/segments/segment.go @@ -403,7 +403,7 @@ func (s *LocalSegment) initializeSegment() error { // Provide ONLY the read lock operations, // don't make `ptrLock` public to avoid abusing of the mutex. func (s *LocalSegment) PinIfNotReleased() error { - if !s.ptrLock.PinIfNotReleased() { + if !s.ptrLock.PinIf(state.IsNotReleased) { return merr.WrapErrSegmentNotLoaded(s.ID(), "segment released") } return nil @@ -419,10 +419,10 @@ func (s *LocalSegment) InsertCount() int64 { func (s *LocalSegment) RowNum() int64 { // if segment is not loaded, return 0 (maybe not loaded or release by lru) - if !s.ptrLock.RLockIf(state.IsDataLoaded) { + if !s.ptrLock.PinIf(state.IsDataLoaded) { return 0 } - defer s.ptrLock.RUnlock() + defer s.ptrLock.Unpin() rowNum := s.rowNum.Load() if rowNum < 0 { @@ -436,10 +436,10 @@ func (s *LocalSegment) RowNum() int64 { } func (s *LocalSegment) MemSize() int64 { - if !s.ptrLock.RLockIf(state.IsNotReleased) { + if !s.ptrLock.PinIf(state.IsNotReleased) { return 0 } - defer s.ptrLock.RUnlock() + defer s.ptrLock.Unpin() memSize := s.memSize.Load() if memSize < 0 { @@ -470,10 +470,10 @@ func (s *LocalSegment) ExistIndex(fieldID int64) bool { } func (s *LocalSegment) HasRawData(fieldID int64) bool { - if !s.ptrLock.RLockIf(state.IsNotReleased) { + if !s.ptrLock.PinIf(state.IsNotReleased) { return false } - defer s.ptrLock.RUnlock() + defer s.ptrLock.Unpin() return s.csegment.HasRawData(fieldID) } @@ -500,11 +500,11 @@ func (s *LocalSegment) Search(ctx context.Context, searchReq *segcore.SearchRequ zap.String("segmentType", s.segmentType.String()), ) - if !s.ptrLock.RLockIf(state.IsNotReleased) { + if !s.ptrLock.PinIf(state.IsNotReleased) { // TODO: check if the segment is readable but not released. too many related logic need to be refactor. return nil, merr.WrapErrSegmentNotLoaded(s.ID(), "segment released") } - defer s.ptrLock.RUnlock() + defer s.ptrLock.Unpin() hasIndex := s.ExistIndex(searchReq.SearchFieldID()) log = log.With(zap.Bool("withIndex", hasIndex)) @@ -522,11 +522,11 @@ func (s *LocalSegment) Search(ctx context.Context, searchReq *segcore.SearchRequ } func (s *LocalSegment) retrieve(ctx context.Context, plan *segcore.RetrievePlan, log *zap.Logger) (*segcore.RetrieveResult, error) { - if !s.ptrLock.RLockIf(state.IsNotReleased) { + if !s.ptrLock.PinIf(state.IsNotReleased) { // TODO: check if the segment is readable but not released. too many related logic need to be refactor. return nil, merr.WrapErrSegmentNotLoaded(s.ID(), "segment released") } - defer s.ptrLock.RUnlock() + defer s.ptrLock.Unpin() log.Debug("begin to retrieve") @@ -569,11 +569,11 @@ func (s *LocalSegment) Retrieve(ctx context.Context, plan *segcore.RetrievePlan) } func (s *LocalSegment) retrieveByOffsets(ctx context.Context, plan *segcore.RetrievePlanWithOffsets, log *zap.Logger) (*segcore.RetrieveResult, error) { - if !s.ptrLock.RLockIf(state.IsNotReleased) { + if !s.ptrLock.PinIf(state.IsNotReleased) { // TODO: check if the segment is readable but not released. too many related logic need to be refactor. return nil, merr.WrapErrSegmentNotLoaded(s.ID(), "segment released") } - defer s.ptrLock.RUnlock() + defer s.ptrLock.Unpin() log.Debug("begin to retrieve by offsets") tr := timerecord.NewTimeRecorder("cgoRetrieveByOffsets") @@ -631,10 +631,10 @@ func (s *LocalSegment) Insert(ctx context.Context, rowIDs []int64, timestamps [] if s.Type() != SegmentTypeGrowing { return fmt.Errorf("unexpected segmentType when segmentInsert, segmentType = %s", s.segmentType.String()) } - if !s.ptrLock.RLockIf(state.IsNotReleased) { + if !s.ptrLock.PinIf(state.IsNotReleased) { return merr.WrapErrSegmentNotLoaded(s.ID(), "segment released") } - defer s.ptrLock.RUnlock() + defer s.ptrLock.Unpin() var result *segcore.InsertResult var err error @@ -678,10 +678,10 @@ func (s *LocalSegment) Delete(ctx context.Context, primaryKeys storage.PrimaryKe if primaryKeys.Len() == 0 { return nil } - if !s.ptrLock.RLockIf(state.IsNotReleased) { + if !s.ptrLock.PinIf(state.IsNotReleased) { return merr.WrapErrSegmentNotLoaded(s.ID(), "segment released") } - defer s.ptrLock.RUnlock() + defer s.ptrLock.Unpin() var err error GetDynamicPool().Submit(func() (any, error) { @@ -715,10 +715,10 @@ func (s *LocalSegment) LoadMultiFieldData(ctx context.Context) error { rowCount := loadInfo.GetNumOfRows() fields := loadInfo.GetBinlogPaths() - if !s.ptrLock.RLockIf(state.IsNotReleased) { + if !s.ptrLock.PinIf(state.IsNotReleased) { return merr.WrapErrSegmentNotLoaded(s.ID(), "segment released") } - defer s.ptrLock.RUnlock() + defer s.ptrLock.Unpin() log := log.Ctx(ctx).With( zap.Int64("collectionID", s.Collection()), @@ -759,10 +759,10 @@ func (s *LocalSegment) LoadMultiFieldData(ctx context.Context) error { } func (s *LocalSegment) LoadFieldData(ctx context.Context, fieldID int64, rowCount int64, field *datapb.FieldBinlog) error { - if !s.ptrLock.RLockIf(state.IsNotReleased) { + if !s.ptrLock.PinIf(state.IsNotReleased) { return merr.WrapErrSegmentNotLoaded(s.ID(), "segment released") } - defer s.ptrLock.RUnlock() + defer s.ptrLock.Unpin() ctx, sp := otel.Tracer(typeutil.QueryNodeRole).Start(ctx, fmt.Sprintf("LoadFieldData-%d-%d", s.ID(), fieldID)) defer sp.End() @@ -815,10 +815,10 @@ func (s *LocalSegment) LoadFieldData(ctx context.Context, fieldID int64, rowCoun } func (s *LocalSegment) AddFieldDataInfo(ctx context.Context, rowCount int64, fields []*datapb.FieldBinlog) error { - if !s.ptrLock.RLockIf(state.IsNotReleased) { + if !s.ptrLock.PinIf(state.IsNotReleased) { return merr.WrapErrSegmentNotLoaded(s.ID(), "segment released") } - defer s.ptrLock.RUnlock() + defer s.ptrLock.Unpin() log := log.Ctx(ctx).WithLazy( zap.Int64("collectionID", s.Collection()), @@ -855,10 +855,10 @@ func (s *LocalSegment) LoadDeltaData(ctx context.Context, deltaData *storage.Del pks, tss := deltaData.DeletePks(), deltaData.DeleteTimestamps() rowNum := deltaData.DeleteRowCount() - if !s.ptrLock.RLockIf(state.IsNotReleased) { + if !s.ptrLock.PinIf(state.IsNotReleased) { return merr.WrapErrSegmentNotLoaded(s.ID(), "segment released") } - defer s.ptrLock.RUnlock() + defer s.ptrLock.Unpin() log := log.Ctx(ctx).With( zap.Int64("collectionID", s.Collection()), @@ -1100,10 +1100,10 @@ func (s *LocalSegment) UpdateIndexInfo(ctx context.Context, indexInfo *querypb.F zap.Int64("segmentID", s.ID()), zap.Int64("fieldID", indexInfo.FieldID), ) - if !s.ptrLock.RLockIf(state.IsNotReleased) { + if !s.ptrLock.PinIf(state.IsNotReleased) { return merr.WrapErrSegmentNotLoaded(s.ID(), "segment released") } - defer s.ptrLock.RUnlock() + defer s.ptrLock.Unpin() var status C.CStatus GetDynamicPool().Submit(func() (any, error) { @@ -1138,10 +1138,10 @@ func (s *LocalSegment) WarmupChunkCache(ctx context.Context, fieldID int64, mmap zap.Int64("fieldID", fieldID), zap.Bool("mmapEnabled", mmapEnabled), ) - if !s.ptrLock.RLockIf(state.IsNotReleased) { + if !s.ptrLock.PinIf(state.IsNotReleased) { return } - defer s.ptrLock.RUnlock() + defer s.ptrLock.Unpin() var status C.CStatus @@ -1165,10 +1165,10 @@ func (s *LocalSegment) WarmupChunkCache(ctx context.Context, fieldID int64, mmap // the state transition of segment in segment loader will blocked. // add a waiter to avoid it. s.ptrLock.BlockUntilDataLoadedOrReleased() - if !s.ptrLock.RLockIf(state.IsNotReleased) { + if s.PinIfNotReleased() != nil { return nil, nil } - defer s.ptrLock.RUnlock() + defer s.Unpin() cFieldID := C.int64_t(fieldID) cMmapEnabled := C.bool(mmapEnabled) diff --git a/internal/querynodev2/segments/state/load_state_lock.go b/internal/querynodev2/segments/state/load_state_lock.go index 6d2f601e17e2a..541733bfd1aa7 100644 --- a/internal/querynodev2/segments/state/load_state_lock.go +++ b/internal/querynodev2/segments/state/load_state_lock.go @@ -69,32 +69,17 @@ type LoadStateLock struct { } // RLockIfNotReleased locks the segment if the state is not released. -func (ls *LoadStateLock) RLockIf(pred StatePredicate) bool { - ls.mu.RLock() - if !pred(ls.state) { - ls.mu.RUnlock() - return false - } - return true -} - -// RUnlock unlocks the segment. -func (ls *LoadStateLock) RUnlock() { - ls.mu.RUnlock() -} - -// PinIfNotReleased pin the segment into memory, avoid ReleaseAll to release it. -func (ls *LoadStateLock) PinIfNotReleased() bool { +func (ls *LoadStateLock) PinIf(pred StatePredicate) bool { ls.mu.RLock() defer ls.mu.RUnlock() - if ls.state == LoadStateReleased { + if !pred(ls.state) { return false } ls.refCnt.Inc() return true } -// Unpin unpin the segment, then segment can be released by ReleaseAll. +// Unpin unlocks the segment. func (ls *LoadStateLock) Unpin() { ls.mu.RLock() defer ls.mu.RUnlock() @@ -108,6 +93,18 @@ func (ls *LoadStateLock) Unpin() { } } +// PinIfNotReleased pin the segment if the state is not released. +// grammar suger for PinIf(IsNotReleased). +func (ls *LoadStateLock) PinIfNotReleased() bool { + ls.mu.RLock() + defer ls.mu.RUnlock() + if ls.state == LoadStateReleased { + return false + } + ls.refCnt.Inc() + return true +} + // StartLoadData starts load segment data // Fast fail if segment is not in LoadStateOnlyMeta. func (ls *LoadStateLock) StartLoadData() (LoadStateLockGuard, error) { diff --git a/internal/querynodev2/segments/state/load_state_lock_test.go b/internal/querynodev2/segments/state/load_state_lock_test.go index 27d3a94933928..3c2f3e55e658a 100644 --- a/internal/querynodev2/segments/state/load_state_lock_test.go +++ b/internal/querynodev2/segments/state/load_state_lock_test.go @@ -185,20 +185,20 @@ func TestStartReleaseAll(t *testing.T) { func TestRLock(t *testing.T) { l := NewLoadStateLock(LoadStateOnlyMeta) - assert.True(t, l.RLockIf(IsNotReleased)) - l.RUnlock() - assert.False(t, l.RLockIf(IsDataLoaded)) + assert.True(t, l.PinIf(IsNotReleased)) + l.Unpin() + assert.False(t, l.PinIf(IsDataLoaded)) l = NewLoadStateLock(LoadStateDataLoaded) - assert.True(t, l.RLockIf(IsNotReleased)) - l.RUnlock() - assert.True(t, l.RLockIf(IsDataLoaded)) - l.RUnlock() + assert.True(t, l.PinIf(IsNotReleased)) + l.Unpin() + assert.True(t, l.PinIf(IsDataLoaded)) + l.Unpin() l = NewLoadStateLock(LoadStateOnlyMeta) l.StartReleaseAll().Done(nil) - assert.False(t, l.RLockIf(IsNotReleased)) - assert.False(t, l.RLockIf(IsDataLoaded)) + assert.False(t, l.PinIf(IsNotReleased)) + assert.False(t, l.PinIf(IsDataLoaded)) } func TestPin(t *testing.T) { From b71756a5c198870000cc7329af226c7ce517b476 Mon Sep 17 00:00:00 2001 From: Congqi Xia Date: Mon, 13 Jan 2025 19:01:06 +0800 Subject: [PATCH 2/3] Reuse PinIf and add wlock conditional wait timeout protection Signed-off-by: Congqi Xia --- configs/milvus.yaml | 1 + .../segments/state/load_state_lock.go | 49 ++++++++++++------- .../segments/state/load_state_lock_test.go | 41 +++++++++++++++- pkg/util/paramtable/component_param.go | 16 ++++-- 4 files changed, 84 insertions(+), 23 deletions(-) diff --git a/configs/milvus.yaml b/configs/milvus.yaml index 305db189ef1be..4bdcb1db82b1b 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -863,6 +863,7 @@ common: threshold: info: 500 # minimum milliseconds for printing durations in info level warn: 1000 # minimum milliseconds for printing durations in warn level + maxWLockConditionalWaitTime: 600 # maximum seconds for waiting wlock conditional storage: scheme: s3 enablev2: false diff --git a/internal/querynodev2/segments/state/load_state_lock.go b/internal/querynodev2/segments/state/load_state_lock.go index 541733bfd1aa7..0e0da20ec5cab 100644 --- a/internal/querynodev2/segments/state/load_state_lock.go +++ b/internal/querynodev2/segments/state/load_state_lock.go @@ -3,9 +3,12 @@ package state import ( "fmt" "sync" + "time" "github.com/cockroachdb/errors" "go.uber.org/atomic" + + "github.com/milvus-io/milvus/pkg/util/paramtable" ) type loadStateEnum int @@ -96,13 +99,7 @@ func (ls *LoadStateLock) Unpin() { // PinIfNotReleased pin the segment if the state is not released. // grammar suger for PinIf(IsNotReleased). func (ls *LoadStateLock) PinIfNotReleased() bool { - ls.mu.RLock() - defer ls.mu.RUnlock() - if ls.state == LoadStateReleased { - return false - } - ls.refCnt.Inc() - return true + return ls.PinIf(IsNotReleased) } // StartLoadData starts load segment data @@ -176,26 +173,40 @@ func (ls *LoadStateLock) BlockUntilDataLoadedOrReleased() { ls.cv.L.Lock() defer ls.cv.L.Unlock() - for ls.state != LoadStateDataLoaded && ls.state != LoadStateReleased { - ls.cv.Wait() - } + ls.waitOrPanic(func(state loadStateEnum) bool { + return state == LoadStateDataLoaded || state == LoadStateReleased + }) } // waitUntilCanReleaseData waits until segment is release data able. func (ls *LoadStateLock) waitUntilCanReleaseData() { - state := ls.state - for state != LoadStateDataLoaded && state != LoadStateOnlyMeta && state != LoadStateReleased { - ls.cv.Wait() - state = ls.state - } + ls.waitOrPanic(func(state loadStateEnum) bool { + return state == LoadStateDataLoaded || state == LoadStateOnlyMeta || state == LoadStateReleased + }) } // waitUntilCanReleaseAll waits until segment is releasable. func (ls *LoadStateLock) waitUntilCanReleaseAll() { - state := ls.state - for (state != LoadStateDataLoaded && state != LoadStateOnlyMeta && state != LoadStateReleased) || ls.refCnt.Load() != 0 { - ls.cv.Wait() - state = ls.state + ls.waitOrPanic(func(state loadStateEnum) bool { + return (state == LoadStateDataLoaded || state == LoadStateOnlyMeta || state == LoadStateReleased) && ls.refCnt.Load() == 0 + }) +} + +func (ls *LoadStateLock) waitOrPanic(ready func(state loadStateEnum) bool) { + ch := make(chan struct{}) + go func() { + defer close(ch) + for !ready(ls.state) { + ls.cv.Wait() + } + }() + + maxWaitTime := paramtable.Get().CommonCfg.MaxWLockConditionalWaitTime.GetAsDuration(time.Second) + + select { + case <-time.After(maxWaitTime): + panic(fmt.Sprintf("max WLock wait time(%v) excceeded", maxWaitTime)) + case <-ch: } } diff --git a/internal/querynodev2/segments/state/load_state_lock_test.go b/internal/querynodev2/segments/state/load_state_lock_test.go index 3c2f3e55e658a..6273f6edd4321 100644 --- a/internal/querynodev2/segments/state/load_state_lock_test.go +++ b/internal/querynodev2/segments/state/load_state_lock_test.go @@ -6,9 +6,12 @@ import ( "github.com/cockroachdb/errors" "github.com/stretchr/testify/assert" + + "github.com/milvus-io/milvus/pkg/util/paramtable" ) func TestLoadStateLoadData(t *testing.T) { + paramtable.Init() l := NewLoadStateLock(LoadStateOnlyMeta) // Test Load Data, roll back g, err := l.StartLoadData() @@ -44,6 +47,7 @@ func TestLoadStateLoadData(t *testing.T) { } func TestStartReleaseData(t *testing.T) { + paramtable.Init() l := NewLoadStateLock(LoadStateOnlyMeta) // Test Release Data, nothing to do on only meta. g := l.StartReleaseData() @@ -104,6 +108,7 @@ func TestStartReleaseData(t *testing.T) { } func TestBlockUntilDataLoadedOrReleased(t *testing.T) { + paramtable.Init() l := NewLoadStateLock(LoadStateOnlyMeta) ch := make(chan struct{}) go func() { @@ -122,6 +127,7 @@ func TestBlockUntilDataLoadedOrReleased(t *testing.T) { } func TestStartReleaseAll(t *testing.T) { + paramtable.Init() l := NewLoadStateLock(LoadStateOnlyMeta) // Test Release All, nothing to do on only meta. g := l.StartReleaseAll() @@ -183,7 +189,40 @@ func TestStartReleaseAll(t *testing.T) { assert.Equal(t, LoadStateReleased, l.state) } -func TestRLock(t *testing.T) { +func TestWaitOrPanic(t *testing.T) { + paramtable.Init() + + t.Run("normal", func(t *testing.T) { + paramtable.Get().Save(paramtable.Get().CommonCfg.MaxWLockConditionalWaitTime.Key, "600") + defer paramtable.Get().Reset(paramtable.Get().CommonCfg.MaxWLockConditionalWaitTime.Key) + + l := NewLoadStateLock(LoadStateDataLoaded) + l.cv.L.Lock() + defer l.cv.L.Unlock() + + assert.NotPanics(t, func() { + l.waitOrPanic(func(state loadStateEnum) bool { + return state == LoadStateDataLoaded + }) + }) + }) + + t.Run("timeout_panic", func(t *testing.T) { + paramtable.Get().Save(paramtable.Get().CommonCfg.MaxWLockConditionalWaitTime.Key, "1") + defer paramtable.Get().Reset(paramtable.Get().CommonCfg.MaxWLockConditionalWaitTime.Key) + + l := NewLoadStateLock(LoadStateOnlyMeta) + l.cv.L.Lock() + + assert.Panics(t, func() { + l.waitOrPanic(func(state loadStateEnum) bool { + return state == LoadStateDataLoaded + }) + }) + }) +} + +func TestPinIf(t *testing.T) { l := NewLoadStateLock(LoadStateOnlyMeta) assert.True(t, l.PinIf(IsNotReleased)) l.Unpin() diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index 37b6a2fdf7380..fe032eb0ebd75 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -254,9 +254,10 @@ type commonConfig struct { MetricsPort ParamItem `refreshable:"false"` // lock related params - EnableLockMetrics ParamItem `refreshable:"false"` - LockSlowLogInfoThreshold ParamItem `refreshable:"true"` - LockSlowLogWarnThreshold ParamItem `refreshable:"true"` + EnableLockMetrics ParamItem `refreshable:"false"` + LockSlowLogInfoThreshold ParamItem `refreshable:"true"` + LockSlowLogWarnThreshold ParamItem `refreshable:"true"` + MaxWLockConditionalWaitTime ParamItem `refreshable:"true"` StorageScheme ParamItem `refreshable:"false"` EnableStorageV2 ParamItem `refreshable:"false"` @@ -753,6 +754,15 @@ like the old password verification when updating the credential`, } p.LockSlowLogWarnThreshold.Init(base.mgr) + p.MaxWLockConditionalWaitTime = ParamItem{ + Key: "common.locks.maxWLockConditionalWaitTime", + Version: "2.5.4", + DefaultValue: "600", + Doc: "maximum seconds for waiting wlock conditional", + Export: true, + } + p.MaxWLockConditionalWaitTime.Init(base.mgr) + p.EnableStorageV2 = ParamItem{ Key: "common.storage.enablev2", Version: "2.3.1", From cfda1de860079f6f4ef9a041bab4e0fa6feda1c1 Mon Sep 17 00:00:00 2001 From: Congqi Xia Date: Tue, 14 Jan 2025 15:35:53 +0800 Subject: [PATCH 3/3] Move lock op to wait util func Signed-off-by: Congqi Xia --- .../segments/state/load_state_lock.go | 100 ++++++++---------- .../segments/state/load_state_lock_test.go | 7 +- 2 files changed, 49 insertions(+), 58 deletions(-) diff --git a/internal/querynodev2/segments/state/load_state_lock.go b/internal/querynodev2/segments/state/load_state_lock.go index 0e0da20ec5cab..899c54fa41f9f 100644 --- a/internal/querynodev2/segments/state/load_state_lock.go +++ b/internal/querynodev2/segments/state/load_state_lock.go @@ -13,6 +13,8 @@ import ( type loadStateEnum int +func noop() {} + // LoadState represent the state transition of segment. // LoadStateOnlyMeta: segment is created with meta, but not loaded. // LoadStateDataLoading: segment is loading data. @@ -123,86 +125,78 @@ func (ls *LoadStateLock) StartLoadData() (LoadStateLockGuard, error) { // StartReleaseData wait until the segment is releasable and starts releasing segment data. func (ls *LoadStateLock) StartReleaseData() (g LoadStateLockGuard) { - ls.cv.L.Lock() - defer ls.cv.L.Unlock() - - ls.waitUntilCanReleaseData() - - switch ls.state { - case LoadStateDataLoaded: - ls.state = LoadStateDataReleasing - ls.cv.Broadcast() - return newLoadStateLockGuard(ls, LoadStateDataLoaded, LoadStateOnlyMeta) - case LoadStateOnlyMeta: - // already transit to target state, do nothing. - return nil - case LoadStateReleased: - // do nothing for empty segment. - return nil - default: - panic(fmt.Sprintf("unreachable code: invalid state when releasing data, %s", ls.state.String())) - } + ls.waitOrPanic(ls.canReleaseData, func() { + switch ls.state { + case LoadStateDataLoaded: + ls.state = LoadStateDataReleasing + ls.cv.Broadcast() + g = newLoadStateLockGuard(ls, LoadStateDataLoaded, LoadStateOnlyMeta) + case LoadStateOnlyMeta: + // already transit to target state, do nothing. + g = nil + case LoadStateReleased: + // do nothing for empty segment. + g = nil + default: + panic(fmt.Sprintf("unreachable code: invalid state when releasing data, %s", ls.state.String())) + } + }) + return g } // StartReleaseAll wait until the segment is releasable and starts releasing all segment. func (ls *LoadStateLock) StartReleaseAll() (g LoadStateLockGuard) { - ls.cv.L.Lock() - defer ls.cv.L.Unlock() - - ls.waitUntilCanReleaseAll() + ls.waitOrPanic(ls.canReleaseAll, func() { + switch ls.state { + case LoadStateDataLoaded: + ls.state = LoadStateReleased + ls.cv.Broadcast() + g = newNopLoadStateLockGuard() + case LoadStateOnlyMeta: + ls.state = LoadStateReleased + ls.cv.Broadcast() + g = newNopLoadStateLockGuard() + case LoadStateReleased: + // already transit to target state, do nothing. + g = nil + default: + panic(fmt.Sprintf("unreachable code: invalid state when releasing data, %s", ls.state.String())) + } + }) - switch ls.state { - case LoadStateDataLoaded: - ls.state = LoadStateReleased - ls.cv.Broadcast() - return newNopLoadStateLockGuard() - case LoadStateOnlyMeta: - ls.state = LoadStateReleased - ls.cv.Broadcast() - return newNopLoadStateLockGuard() - case LoadStateReleased: - // already transit to target state, do nothing. - return nil - default: - panic(fmt.Sprintf("unreachable code: invalid state when releasing data, %s", ls.state.String())) - } + return g } // blockUntilDataLoadedOrReleased blocks until the segment is loaded or released. func (ls *LoadStateLock) BlockUntilDataLoadedOrReleased() { - ls.cv.L.Lock() - defer ls.cv.L.Unlock() - ls.waitOrPanic(func(state loadStateEnum) bool { return state == LoadStateDataLoaded || state == LoadStateReleased - }) + }, noop) } // waitUntilCanReleaseData waits until segment is release data able. -func (ls *LoadStateLock) waitUntilCanReleaseData() { - ls.waitOrPanic(func(state loadStateEnum) bool { - return state == LoadStateDataLoaded || state == LoadStateOnlyMeta || state == LoadStateReleased - }) +func (ls *LoadStateLock) canReleaseData(state loadStateEnum) bool { + return state == LoadStateDataLoaded || state == LoadStateOnlyMeta || state == LoadStateReleased } // waitUntilCanReleaseAll waits until segment is releasable. -func (ls *LoadStateLock) waitUntilCanReleaseAll() { - ls.waitOrPanic(func(state loadStateEnum) bool { - return (state == LoadStateDataLoaded || state == LoadStateOnlyMeta || state == LoadStateReleased) && ls.refCnt.Load() == 0 - }) +func (ls *LoadStateLock) canReleaseAll(state loadStateEnum) bool { + return (state == LoadStateDataLoaded || state == LoadStateOnlyMeta || state == LoadStateReleased) && ls.refCnt.Load() == 0 } -func (ls *LoadStateLock) waitOrPanic(ready func(state loadStateEnum) bool) { +func (ls *LoadStateLock) waitOrPanic(ready func(state loadStateEnum) bool, then func()) { ch := make(chan struct{}) + maxWaitTime := paramtable.Get().CommonCfg.MaxWLockConditionalWaitTime.GetAsDuration(time.Second) go func() { + ls.cv.L.Lock() + defer ls.cv.L.Unlock() defer close(ch) for !ready(ls.state) { ls.cv.Wait() } + then() }() - maxWaitTime := paramtable.Get().CommonCfg.MaxWLockConditionalWaitTime.GetAsDuration(time.Second) - select { case <-time.After(maxWaitTime): panic(fmt.Sprintf("max WLock wait time(%v) excceeded", maxWaitTime)) diff --git a/internal/querynodev2/segments/state/load_state_lock_test.go b/internal/querynodev2/segments/state/load_state_lock_test.go index 6273f6edd4321..0e282de494f06 100644 --- a/internal/querynodev2/segments/state/load_state_lock_test.go +++ b/internal/querynodev2/segments/state/load_state_lock_test.go @@ -197,13 +197,11 @@ func TestWaitOrPanic(t *testing.T) { defer paramtable.Get().Reset(paramtable.Get().CommonCfg.MaxWLockConditionalWaitTime.Key) l := NewLoadStateLock(LoadStateDataLoaded) - l.cv.L.Lock() - defer l.cv.L.Unlock() assert.NotPanics(t, func() { l.waitOrPanic(func(state loadStateEnum) bool { return state == LoadStateDataLoaded - }) + }, noop) }) }) @@ -212,12 +210,11 @@ func TestWaitOrPanic(t *testing.T) { defer paramtable.Get().Reset(paramtable.Get().CommonCfg.MaxWLockConditionalWaitTime.Key) l := NewLoadStateLock(LoadStateOnlyMeta) - l.cv.L.Lock() assert.Panics(t, func() { l.waitOrPanic(func(state loadStateEnum) bool { return state == LoadStateDataLoaded - }) + }, noop) }) }) }