Skip to content

Commit

Permalink
fix: [2.5] Add scalar index engine version for compatibility (#39236)
Browse files Browse the repository at this point in the history
issue: #39203 

master pr: #39204

---------

Signed-off-by: Cai Zhang <[email protected]>
  • Loading branch information
xiaocai2333 authored Jan 14, 2025
1 parent b91c0a8 commit 4270174
Show file tree
Hide file tree
Showing 20 changed files with 1,324 additions and 1,037 deletions.
51 changes: 48 additions & 3 deletions internal/datacoord/index_engine_version_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,21 @@ type IndexEngineVersionManager interface {

GetCurrentIndexEngineVersion() int32
GetMinimalIndexEngineVersion() int32

GetCurrentScalarIndexEngineVersion() int32
GetMinimalScalarIndexEngineVersion() int32
}

type versionManagerImpl struct {
mu lock.Mutex
versions map[int64]sessionutil.IndexEngineVersion
mu lock.Mutex
versions map[int64]sessionutil.IndexEngineVersion
scalarIndexVersions map[int64]sessionutil.IndexEngineVersion
}

func newIndexEngineVersionManager() IndexEngineVersionManager {
return &versionManagerImpl{
versions: map[int64]sessionutil.IndexEngineVersion{},
versions: map[int64]sessionutil.IndexEngineVersion{},
scalarIndexVersions: map[int64]sessionutil.IndexEngineVersion{},
}
}

Expand All @@ -52,6 +57,7 @@ func (m *versionManagerImpl) RemoveNode(session *sessionutil.Session) {
defer m.mu.Unlock()

delete(m.versions, session.ServerID)
delete(m.scalarIndexVersions, session.ServerID)
}

func (m *versionManagerImpl) Update(session *sessionutil.Session) {
Expand All @@ -64,6 +70,7 @@ func (m *versionManagerImpl) Update(session *sessionutil.Session) {
func (m *versionManagerImpl) addOrUpdate(session *sessionutil.Session) {
log.Info("addOrUpdate version", zap.Int64("nodeId", session.ServerID), zap.Int32("minimal", session.IndexEngineVersion.MinimalIndexVersion), zap.Int32("current", session.IndexEngineVersion.CurrentIndexVersion))
m.versions[session.ServerID] = session.IndexEngineVersion
m.scalarIndexVersions[session.ServerID] = session.ScalarIndexEngineVersion
}

func (m *versionManagerImpl) GetCurrentIndexEngineVersion() int32 {
Expand Down Expand Up @@ -103,3 +110,41 @@ func (m *versionManagerImpl) GetMinimalIndexEngineVersion() int32 {
log.Info("Merged minimal version", zap.Int32("minimal", minimal))
return minimal
}

func (m *versionManagerImpl) GetCurrentScalarIndexEngineVersion() int32 {
m.mu.Lock()
defer m.mu.Unlock()

if len(m.scalarIndexVersions) == 0 {
log.Info("scalar index versions is empty")
return 0
}

current := int32(math.MaxInt32)
for _, version := range m.scalarIndexVersions {
if version.CurrentIndexVersion < current {
current = version.CurrentIndexVersion
}
}
log.Info("Merged current scalar index version", zap.Int32("current", current))
return current
}

func (m *versionManagerImpl) GetMinimalScalarIndexEngineVersion() int32 {
m.mu.Lock()
defer m.mu.Unlock()

if len(m.scalarIndexVersions) == 0 {
log.Info("scalar index versions is empty")
return 0
}

minimal := int32(0)
for _, version := range m.scalarIndexVersions {
if version.MinimalIndexVersion > minimal {
minimal = version.MinimalIndexVersion
}
}
log.Info("Merged minimal scalar index version", zap.Int32("minimal", minimal))
return minimal
}
49 changes: 49 additions & 0 deletions internal/datacoord/index_engine_version_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,52 @@ func Test_IndexEngineVersionManager_GetMergedIndexVersion(t *testing.T) {
assert.Equal(t, int32(20), m.GetCurrentIndexEngineVersion())
assert.Equal(t, int32(0), m.GetMinimalIndexEngineVersion())
}

func Test_IndexEngineVersionManager_GetMergedScalarIndexVersion(t *testing.T) {
m := newIndexEngineVersionManager()

// empty
assert.Zero(t, m.GetCurrentScalarIndexEngineVersion())

// startup
m.Startup(map[string]*sessionutil.Session{
"1": {
SessionRaw: sessionutil.SessionRaw{
ServerID: 1,
ScalarIndexEngineVersion: sessionutil.IndexEngineVersion{CurrentIndexVersion: 20, MinimalIndexVersion: 0},
},
},
})
assert.Equal(t, int32(20), m.GetCurrentScalarIndexEngineVersion())
assert.Equal(t, int32(0), m.GetMinimalScalarIndexEngineVersion())

// add node
m.AddNode(&sessionutil.Session{
SessionRaw: sessionutil.SessionRaw{
ServerID: 2,
ScalarIndexEngineVersion: sessionutil.IndexEngineVersion{CurrentIndexVersion: 10, MinimalIndexVersion: 5},
},
})
assert.Equal(t, int32(10), m.GetCurrentScalarIndexEngineVersion())
assert.Equal(t, int32(5), m.GetMinimalScalarIndexEngineVersion())

// update
m.Update(&sessionutil.Session{
SessionRaw: sessionutil.SessionRaw{
ServerID: 2,
ScalarIndexEngineVersion: sessionutil.IndexEngineVersion{CurrentIndexVersion: 5, MinimalIndexVersion: 2},
},
})
assert.Equal(t, int32(5), m.GetCurrentScalarIndexEngineVersion())
assert.Equal(t, int32(2), m.GetMinimalScalarIndexEngineVersion())

// remove
m.RemoveNode(&sessionutil.Session{
SessionRaw: sessionutil.SessionRaw{
ServerID: 2,
IndexEngineVersion: sessionutil.IndexEngineVersion{CurrentIndexVersion: 5, MinimalIndexVersion: 3},
},
})
assert.Equal(t, int32(20), m.GetCurrentScalarIndexEngineVersion())
assert.Equal(t, int32(0), m.GetMinimalScalarIndexEngineVersion())
}
1 change: 1 addition & 0 deletions internal/datacoord/index_meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -796,6 +796,7 @@ func (m *indexMeta) FinishTask(taskInfo *workerpb.IndexTaskInfo) error {
segIdx.IndexMemSize = taskInfo.GetMemSize()
segIdx.CurrentIndexVersion = taskInfo.GetCurrentIndexVersion()
segIdx.FinishedUTCTime = uint64(time.Now().Unix())
segIdx.CurrentScalarIndexVersion = taskInfo.GetCurrentScalarIndexVersion()
return m.alterSegmentIndexes([]*model.SegmentIndex{segIdx})
}

Expand Down
90 changes: 90 additions & 0 deletions internal/datacoord/mock_index_engine_version_manager.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

41 changes: 21 additions & 20 deletions internal/datacoord/task_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,26 +236,27 @@ func (it *indexBuildTask) PreCheck(ctx context.Context, dependency *taskSchedule
}

it.req = &workerpb.CreateJobRequest{
ClusterID: Params.CommonCfg.ClusterPrefix.GetValue(),
IndexFilePrefix: path.Join(dependency.chunkManager.RootPath(), common.SegmentIndexPath),
BuildID: it.taskID,
IndexVersion: segIndex.IndexVersion + 1,
StorageConfig: createStorageConfig(),
IndexParams: indexParams,
TypeParams: typeParams,
NumRows: segIndex.NumRows,
CurrentIndexVersion: dependency.indexEngineVersionManager.GetCurrentIndexEngineVersion(),
CollectionID: segment.GetCollectionID(),
PartitionID: segment.GetPartitionID(),
SegmentID: segment.GetID(),
FieldID: fieldID,
FieldName: field.GetName(),
FieldType: field.GetDataType(),
Dim: int64(dim),
DataIds: binlogIDs,
OptionalScalarFields: optionalFields,
Field: field,
PartitionKeyIsolation: partitionKeyIsolation,
ClusterID: Params.CommonCfg.ClusterPrefix.GetValue(),
IndexFilePrefix: path.Join(dependency.chunkManager.RootPath(), common.SegmentIndexPath),
BuildID: it.taskID,
IndexVersion: segIndex.IndexVersion + 1,
StorageConfig: createStorageConfig(),
IndexParams: indexParams,
TypeParams: typeParams,
NumRows: segIndex.NumRows,
CurrentIndexVersion: dependency.indexEngineVersionManager.GetCurrentIndexEngineVersion(),
CurrentScalarIndexVersion: dependency.indexEngineVersionManager.GetCurrentScalarIndexEngineVersion(),
CollectionID: segment.GetCollectionID(),
PartitionID: segment.GetPartitionID(),
SegmentID: segment.GetID(),
FieldID: fieldID,
FieldName: field.GetName(),
FieldType: field.GetDataType(),
Dim: int64(dim),
DataIds: binlogIDs,
OptionalScalarFields: optionalFields,
Field: field,
PartitionKeyIsolation: partitionKeyIsolation,
}

log.Ctx(ctx).Info("index task pre check successfully", zap.Int64("taskID", it.GetTaskID()),
Expand Down
8 changes: 8 additions & 0 deletions internal/indexnode/indexnode.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,14 @@ func getCurrentIndexVersion(v int32) int32 {
return v
}

func getCurrentScalarIndexVersion(v int32) int32 {
cCurrent := common.CurrentScalarIndexEngineVersion
if cCurrent < v {
return cCurrent
}
return v
}

type taskKey struct {
ClusterID string
TaskID UniqueID
Expand Down
16 changes: 9 additions & 7 deletions internal/indexnode/indexnode_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -443,13 +443,14 @@ func (i *IndexNode) QueryJobsV2(ctx context.Context, req *workerpb.QueryJobsV2Re
i.foreachIndexTaskInfo(func(ClusterID string, buildID UniqueID, info *indexTaskInfo) {
if ClusterID == req.GetClusterID() {
infos[buildID] = &indexTaskInfo{
state: info.state,
fileKeys: common.CloneStringList(info.fileKeys),
serializedSize: info.serializedSize,
memSize: info.memSize,
failReason: info.failReason,
currentIndexVersion: info.currentIndexVersion,
indexStoreVersion: info.indexStoreVersion,
state: info.state,
fileKeys: common.CloneStringList(info.fileKeys),
serializedSize: info.serializedSize,
memSize: info.memSize,
failReason: info.failReason,
currentIndexVersion: info.currentIndexVersion,
indexStoreVersion: info.indexStoreVersion,
currentScalarIndexVersion: info.currentScalarIndexVersion,
}
}
})
Expand All @@ -469,6 +470,7 @@ func (i *IndexNode) QueryJobsV2(ctx context.Context, req *workerpb.QueryJobsV2Re
results[i].FailReason = info.failReason
results[i].CurrentIndexVersion = info.currentIndexVersion
results[i].IndexStoreVersion = info.indexStoreVersion
results[i].CurrentScalarIndexVersion = info.currentScalarIndexVersion
}
}
log.Debug("query index jobs result success", zap.Any("results", results))
Expand Down
22 changes: 12 additions & 10 deletions internal/indexnode/indexnode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,16 +424,17 @@ func (s *IndexNodeSuite) Test_CreateIndexJob_Compatibility() {
Key: "dim", Value: "8",
},
},
NumRows: s.numRows,
CurrentIndexVersion: 0,
CollectionID: s.collID,
PartitionID: s.partID,
SegmentID: s.segID,
FieldID: s.fieldID,
FieldName: "floatVector",
FieldType: schemapb.DataType_FloatVector,
Dim: 8,
DataIds: []int64{s.logID + 13},
NumRows: s.numRows,
CurrentIndexVersion: 0,
CurrentScalarIndexVersion: 1,
CollectionID: s.collID,
PartitionID: s.partID,
SegmentID: s.segID,
FieldID: s.fieldID,
FieldName: "floatVector",
FieldType: schemapb.DataType_FloatVector,
Dim: 8,
DataIds: []int64{s.logID + 13},
Field: &schemapb.FieldSchema{
FieldID: s.fieldID,
Name: "floatVector",
Expand Down Expand Up @@ -501,6 +502,7 @@ func (s *IndexNodeSuite) Test_CreateIndexJob_ScalarIndex() {
Name: "int64",
DataType: schemapb.DataType_Int64,
},
CurrentScalarIndexVersion: 1,
}

status, err := s.in.CreateJob(ctx, req)
Expand Down
Loading

0 comments on commit 4270174

Please sign in to comment.