Skip to content

Commit

Permalink
fix: evicted segments in the serverlss mode(milvus-io#31959) (milvus-…
Browse files Browse the repository at this point in the history
…io#31961)

related: milvus-io#31959
1. reset segment index status after evicting to lazyload=true
2. reset num_rows to null_opt

Signed-off-by: MrPresent-Han <[email protected]>
  • Loading branch information
MrPresent-Han authored Apr 10, 2024
1 parent c4806b6 commit f3f2a5a
Show file tree
Hide file tree
Showing 6 changed files with 63 additions and 17 deletions.
2 changes: 1 addition & 1 deletion internal/core/src/segcore/SegmentSealedImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1129,7 +1129,7 @@ SegmentSealedImpl::ClearData() {
index_ready_bitset_.reset();
binlog_index_bitset_.reset();
system_ready_count_ = 0;
num_rows_ = 0;
num_rows_ = std::nullopt;
scalar_indexings_.clear();
vector_indexings_.clear();
insert_record_.clear();
Expand Down
33 changes: 33 additions & 0 deletions internal/querynodev2/segments/mock_segment.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

39 changes: 24 additions & 15 deletions internal/querynodev2/segments/segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,12 @@ func (s *LocalSegment) Indexes() []*IndexedFieldInfo {
return result
}

func (s *LocalSegment) ResetIndexesLazyLoad(lazyState bool) {
for _, indexInfo := range s.Indexes() {
indexInfo.LazyLoad = lazyState
}
}

func (s *LocalSegment) Search(ctx context.Context, searchReq *SearchRequest) (*SearchResult, error) {
/*
CStatus
Expand Down Expand Up @@ -1129,6 +1135,14 @@ func (s *LocalSegment) LoadIndex(ctx context.Context, indexInfo *querypb.FieldIn
opt(options)
}

log := log.Ctx(ctx).With(
zap.Int64("collectionID", s.Collection()),
zap.Int64("partitionID", s.Partition()),
zap.Int64("segmentID", s.ID()),
zap.Int64("fieldID", indexInfo.GetFieldID()),
zap.Int64("indexID", indexInfo.GetIndexID()),
)

if options.LoadStatus == LoadStatusMeta {
s.addIndex(indexInfo.GetFieldID(), &IndexedFieldInfo{
FieldBinlog: &datapb.FieldBinlog{
Expand All @@ -1149,14 +1163,6 @@ func (s *LocalSegment) LoadIndex(ctx context.Context, indexInfo *querypb.FieldIn
ctx, sp := otel.Tracer(typeutil.QueryNodeRole).Start(ctx, fmt.Sprintf("LoadIndex-%d-%d", s.ID(), indexInfo.GetFieldID()))
defer sp.End()

log := log.Ctx(ctx).With(
zap.Int64("collectionID", s.Collection()),
zap.Int64("partitionID", s.Partition()),
zap.Int64("segmentID", s.ID()),
zap.Int64("fieldID", indexInfo.GetFieldID()),
zap.Int64("indexID", indexInfo.GetIndexID()),
)

loadIndexInfo, err := newLoadIndexInfo(ctx)
if err != nil {
return err
Expand Down Expand Up @@ -1340,11 +1346,20 @@ func (s *LocalSegment) Release(opts ...releaseOption) {
// release will never fail
defer stateLockGuard.Done(nil)

log := log.With(zap.Int64("collectionID", s.Collection()),
zap.Int64("partitionID", s.Partition()),
zap.Int64("segmentID", s.ID()),
zap.String("segmentType", s.segmentType.String()),
zap.Int64("insertCount", s.InsertCount()),
)

// wait all read ops finished
ptr := s.ptr
if options.Scope == ReleaseScopeData {
s.loadStatus.Store(string(LoadStatusMeta))
C.ClearSegmentData(ptr)
s.ResetIndexesLazyLoad(true)
log.Debug("release segment data done and the field indexes info has been set lazy load=true")
return
}

Expand All @@ -1365,13 +1380,7 @@ func (s *LocalSegment) Release(opts ...releaseOption) {
metrics.QueryNodeDiskUsedSize.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Set(float64(localDiskUsage) / 1024 / 1024) // in MB
}

log.Info("delete segment from memory",
zap.Int64("collectionID", s.Collection()),
zap.Int64("partitionID", s.Partition()),
zap.Int64("segmentID", s.ID()),
zap.String("segmentType", s.segmentType.String()),
zap.Int64("insertCount", s.InsertCount()),
)
log.Info("delete segment from memory")
}

// StartLoadData starts the loading process of the segment.
Expand Down
1 change: 1 addition & 0 deletions internal/querynodev2/segments/segment_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,4 +97,5 @@ type Segment interface {
Search(ctx context.Context, searchReq *SearchRequest) (*SearchResult, error)
Retrieve(ctx context.Context, plan *RetrievePlan) (*segcorepb.RetrieveResults, error)
IsLazyLoad() bool
ResetIndexesLazyLoad(lazyState bool)
}
3 changes: 3 additions & 0 deletions internal/querynodev2/segments/segment_l0.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,9 @@ func (s *L0Segment) Indexes() []*IndexedFieldInfo {
return nil
}

func (s *L0Segment) ResetIndexesLazyLoad(lazyState bool) {
}

func (s *L0Segment) Type() SegmentType {
return s.segmentType
}
Expand Down
2 changes: 1 addition & 1 deletion internal/querynodev2/segments/segment_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -978,7 +978,7 @@ func (loader *segmentLoader) loadSealedSegment(ctx context.Context, loadInfo *qu
if err := segment.AddFieldDataInfo(ctx, loadInfo.GetNumOfRows(), loadInfo.GetBinlogPaths()); err != nil {
return err
}

log := log.Ctx(ctx).With(zap.Int64("segmentID", segment.ID()))
tr := timerecord.NewTimeRecorder("segmentLoader.LoadIndex")
log.Info("load fields...",
zap.Int64s("indexedFields", lo.Keys(indexedFieldInfos)),
Expand Down

0 comments on commit f3f2a5a

Please sign in to comment.