Skip to content

Commit

Permalink
Add schema update time verification to insert and upsert so as to use…
Browse files Browse the repository at this point in the history
… the cache

Signed-off-by: Xianhui.Lin <[email protected]>

fix unitest

Signed-off-by: Xianhui.Lin <[email protected]>

improve

Signed-off-by: Xianhui.Lin <[email protected]>

improve

Signed-off-by: Xianhui.Lin <[email protected]>

fix code

Signed-off-by: Xianhui.Lin <[email protected]>

fix test

Signed-off-by: Xianhui.Lin <[email protected]>

fix go test

Signed-off-by: Xianhui.Lin <[email protected]>

add go unitest

Signed-off-by: Xianhui.Lin <[email protected]>

fix unitest error

Signed-off-by: Xianhui.Lin <[email protected]>

fix ut

Signed-off-by: Xianhui.Lin <[email protected]>

improve

Signed-off-by: Xianhui.Lin <[email protected]>

createcollection use creats

Signed-off-by: Xianhui.Lin <[email protected]>

save to etcd

Signed-off-by: Xianhui.Lin <[email protected]>

fix

Signed-off-by: Xianhui.Lin <[email protected]>

fix

Signed-off-by: Xianhui.Lin <[email protected]>
  • Loading branch information
JsDove committed Jan 17, 2025
1 parent 57ba8a7 commit 0d55a55
Show file tree
Hide file tree
Showing 19 changed files with 233 additions and 150 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ require (
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
github.com/klauspost/compress v1.17.9
github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d
github.com/milvus-io/milvus-proto/go-api/v2 v2.4.21
github.com/milvus-io/milvus-proto/go-api/v2 v2.4.22-0.20250117031653-a796005be4f7
github.com/minio/minio-go/v7 v7.0.73
github.com/pingcap/log v1.1.1-0.20221015072633-39906604fb81
github.com/prometheus/client_golang v1.14.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -614,8 +614,8 @@ github.com/milvus-io/cgosymbolizer v0.0.0-20240722103217-b7dee0e50119 h1:9VXijWu
github.com/milvus-io/cgosymbolizer v0.0.0-20240722103217-b7dee0e50119/go.mod h1:DvXTE/K/RtHehxU8/GtDs4vFtfw64jJ3PaCnFri8CRg=
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b h1:TfeY0NxYxZzUfIfYe5qYDBzt4ZYRqzUjTR6CvUzjat8=
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b/go.mod h1:iwW+9cWfIzzDseEBCCeDSN5SD16Tidvy8cwQ7ZY8Qj4=
github.com/milvus-io/milvus-proto/go-api/v2 v2.4.21 h1:Imb0uGFp/kyPPI5f6dCne8GCJIceuQWzI1H20p5aa4c=
github.com/milvus-io/milvus-proto/go-api/v2 v2.4.21/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
github.com/milvus-io/milvus-proto/go-api/v2 v2.4.22-0.20250117031653-a796005be4f7 h1:putot5l1gpiucE4CBrYzLoPCAci/BdYFe76GP15xsg4=
github.com/milvus-io/milvus-proto/go-api/v2 v2.4.22-0.20250117031653-a796005be4f7/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
github.com/milvus-io/milvus-storage/go v0.0.0-20231227072638-ebd0b8e56d70 h1:Z+sp64fmAOxAG7mU0dfVOXvAXlwRB0c8a96rIM5HevI=
github.com/milvus-io/milvus-storage/go v0.0.0-20231227072638-ebd0b8e56d70/go.mod h1:GPETMcTZq1gLY1WA6Na5kiNAKnq8SEMMiVKUZrM3sho=
github.com/milvus-io/pulsar-client-go v0.6.10 h1:eqpJjU+/QX0iIhEo3nhOqMNXL+TyInAs1IAHZCrCM/A=
Expand Down
4 changes: 4 additions & 0 deletions internal/metastore/model/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type Collection struct {
Properties []*commonpb.KeyValuePair
State pb.CollectionState
EnableDynamicField bool
UpdateTimestamp uint64
}

func (c *Collection) Available() bool {
Expand All @@ -54,6 +55,7 @@ func (c *Collection) Clone() *Collection {
Properties: common.CloneKeyValuePairs(c.Properties),
State: c.State,
EnableDynamicField: c.EnableDynamicField,
UpdateTimestamp: c.UpdateTimestamp,
}
}

Expand Down Expand Up @@ -110,6 +112,7 @@ func UnmarshalCollectionModel(coll *pb.CollectionInfo) *Collection {
State: coll.State,
Properties: coll.Properties,
EnableDynamicField: coll.Schema.EnableDynamicField,
UpdateTimestamp: coll.UpdateTimestamp,
}
}

Expand Down Expand Up @@ -171,6 +174,7 @@ func marshalCollectionModelWithConfig(coll *Collection, c *config) *pb.Collectio
StartPositions: coll.StartPositions,
State: coll.State,
Properties: coll.Properties,
UpdateTimestamp: coll.UpdateTimestamp,
}

if c.withPartitions {
Expand Down
18 changes: 10 additions & 8 deletions internal/proxy/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -2610,10 +2610,11 @@ func (node *Proxy) Insert(ctx context.Context, request *milvuspb.InsertRequest)
Version: msgpb.InsertDataVersion_ColumnBased,
},
},
idAllocator: node.rowIDAllocator,
segIDAssigner: node.segAssigner,
chMgr: node.chMgr,
chTicker: node.chTicker,
idAllocator: node.rowIDAllocator,
segIDAssigner: node.segAssigner,
chMgr: node.chMgr,
chTicker: node.chTicker,
schemaTimestamp: request.SchemaTimestamp,
}

constructFailedResponse := func(err error) *milvuspb.MutationResult {
Expand Down Expand Up @@ -2847,10 +2848,11 @@ func (node *Proxy) Upsert(ctx context.Context, request *milvuspb.UpsertRequest)
},
},

idAllocator: node.rowIDAllocator,
segIDAssigner: node.segAssigner,
chMgr: node.chMgr,
chTicker: node.chTicker,
idAllocator: node.rowIDAllocator,
segIDAssigner: node.segAssigner,
chMgr: node.chMgr,
chTicker: node.chTicker,
schemaTimestamp: request.SchemaTimestamp,
}

log.Debug("Enqueue upsert request in Proxy",
Expand Down
2 changes: 2 additions & 0 deletions internal/proxy/meta_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ type collectionInfo struct {
createdUtcTimestamp uint64
consistencyLevel commonpb.ConsistencyLevel
partitionKeyIsolation bool
updateTimestamp uint64
}

type databaseInfo struct {
Expand Down Expand Up @@ -478,6 +479,7 @@ func (m *MetaCache) update(ctx context.Context, database, collectionName string,
createdUtcTimestamp: collection.CreatedUtcTimestamp,
consistencyLevel: collection.ConsistencyLevel,
partitionKeyIsolation: isolation,
updateTimestamp: collection.UpdateTimestamp,
}

log.Ctx(ctx).Info("meta update success", zap.String("database", database), zap.String("collectionName", collectionName),
Expand Down
1 change: 1 addition & 0 deletions internal/proxy/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -667,6 +667,7 @@ func (t *describeCollectionTask) Execute(ctx context.Context) error {
t.result.Properties = result.Properties
t.result.DbName = result.GetDbName()
t.result.NumPartitions = result.NumPartitions
t.result.UpdateTimestamp = result.UpdateTimestamp
for _, field := range result.Schema.Fields {
if field.IsDynamic {
continue
Expand Down
37 changes: 28 additions & 9 deletions internal/proxy/task_insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,16 @@ type insertTask struct {
insertMsg *BaseInsertTask
ctx context.Context

result *milvuspb.MutationResult
idAllocator *allocator.IDAllocator
segIDAssigner *segIDAssigner
chMgr channelsMgr
chTicker channelsTimeTicker
vChannels []vChan
pChannels []pChan
schema *schemapb.CollectionSchema
partitionKeys *schemapb.FieldData
result *milvuspb.MutationResult
idAllocator *allocator.IDAllocator
segIDAssigner *segIDAssigner
chMgr channelsMgr
chTicker channelsTimeTicker
vChannels []vChan
pChannels []pChan
schema *schemapb.CollectionSchema
partitionKeys *schemapb.FieldData
schemaTimestamp uint64
}

// TraceCtx returns insertTask context
Expand Down Expand Up @@ -125,6 +126,24 @@ func (it *insertTask) PreExecute(ctx context.Context) error {
return merr.WrapErrAsInputError(merr.WrapErrParameterTooLarge("insert request size exceeds maxInsertSize"))
}

collID, err := globalMetaCache.GetCollectionID(context.Background(), it.insertMsg.GetDbName(), collectionName)
if err != nil {
log.Ctx(ctx).Warn("fail to get collection id", zap.Error(err))
return err
}

Check warning on line 133 in internal/proxy/task_insert.go

View check run for this annotation

Codecov / codecov/patch

internal/proxy/task_insert.go#L131-L133

Added lines #L131 - L133 were not covered by tests
colInfo, err := globalMetaCache.GetCollectionInfo(ctx, it.insertMsg.GetDbName(), collectionName, collID)
if err != nil {
log.Ctx(ctx).Warn("fail to get collection info", zap.Error(err))
return err
}

Check warning on line 138 in internal/proxy/task_insert.go

View check run for this annotation

Codecov / codecov/patch

internal/proxy/task_insert.go#L136-L138

Added lines #L136 - L138 were not covered by tests
if it.schemaTimestamp != 0 {
if it.schemaTimestamp != colInfo.updateTimestamp {
err := merr.WrapErrCollectionSchemaMisMatch(collectionName)
log.Ctx(ctx).Warn("collection schema mismatch", zap.String("collectionName", collectionName), zap.Error(err))
return err
}

Check warning on line 144 in internal/proxy/task_insert.go

View check run for this annotation

Codecov / codecov/patch

internal/proxy/task_insert.go#L140-L144

Added lines #L140 - L144 were not covered by tests
}

schema, err := globalMetaCache.GetCollectionSchema(ctx, it.insertMsg.GetDbName(), collectionName)
if err != nil {
log.Warn("get collection schema from global meta cache failed", zap.String("collectionName", collectionName), zap.Error(err))
Expand Down
21 changes: 20 additions & 1 deletion internal/proxy/task_upsert.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ type upsertTask struct {
partitionKeys *schemapb.FieldData
// automatic generate pk as new pk wehen autoID == true
// delete task need use the oldIds
oldIds *schemapb.IDs
oldIds *schemapb.IDs
schemaTimestamp uint64
}

// TraceCtx returns upsertTask context
Expand Down Expand Up @@ -292,6 +293,24 @@ func (it *upsertTask) PreExecute(ctx context.Context) error {
Timestamp: it.EndTs(),
}

collID, err := globalMetaCache.GetCollectionID(context.Background(), it.req.GetDbName(), collectionName)
if err != nil {
log.Warn("fail to get collection id", zap.Error(err))
return err
}

Check warning on line 300 in internal/proxy/task_upsert.go

View check run for this annotation

Codecov / codecov/patch

internal/proxy/task_upsert.go#L298-L300

Added lines #L298 - L300 were not covered by tests
colInfo, err := globalMetaCache.GetCollectionInfo(ctx, it.req.GetDbName(), collectionName, collID)
if err != nil {
log.Warn("fail to get collection info", zap.Error(err))
return err
}

Check warning on line 305 in internal/proxy/task_upsert.go

View check run for this annotation

Codecov / codecov/patch

internal/proxy/task_upsert.go#L303-L305

Added lines #L303 - L305 were not covered by tests
if it.schemaTimestamp != 0 {
if it.schemaTimestamp != colInfo.updateTimestamp {
err := merr.WrapErrCollectionSchemaMisMatch(collectionName)
log.Warn("collection schema mismatch", zap.String("collectionName", collectionName), zap.Error(err))
return err
}

Check warning on line 311 in internal/proxy/task_upsert.go

View check run for this annotation

Codecov / codecov/patch

internal/proxy/task_upsert.go#L307-L311

Added lines #L307 - L311 were not covered by tests
}

schema, err := globalMetaCache.GetCollectionSchema(ctx, it.req.GetDbName(), collectionName)
if err != nil {
log.Warn("Failed to get collection schema",
Expand Down
12 changes: 12 additions & 0 deletions internal/rootcoord/alter_collection_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,12 @@ func (a *alterCollectionTask) Execute(ctx context.Context) error {
}

ts := a.GetTs()

tso, err := a.core.tsoAllocator.GenerateTSO(1)
if err == nil {
newColl.UpdateTimestamp = tso
}

redoTask := newBaseRedoTask(a.core.stepExecutor)
redoTask.AddSyncStep(&AlterCollectionStep{
baseStep: baseStep{core: a.core},
Expand Down Expand Up @@ -185,6 +191,12 @@ func (a *alterCollectionFieldTask) Execute(ctx context.Context) error {
return err
}
ts := a.GetTs()

tso, err := a.core.tsoAllocator.GenerateTSO(1)
if err == nil {
newColl.UpdateTimestamp = tso
}

Check warning on line 198 in internal/rootcoord/alter_collection_task.go

View check run for this annotation

Codecov / codecov/patch

internal/rootcoord/alter_collection_task.go#L194-L198

Added lines #L194 - L198 were not covered by tests

redoTask := newBaseRedoTask(a.core.stepExecutor)
redoTask.AddSyncStep(&AlterCollectionStep{
baseStep: baseStep{core: a.core},
Expand Down
11 changes: 6 additions & 5 deletions internal/rootcoord/alter_collection_task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func Test_alterCollectionTask_Execute(t *testing.T) {
).Return(errors.New("err"))
meta.On("ListAliasesByID", mock.Anything).Return([]string{})

core := newTestCore(withValidProxyManager(), withMeta(meta))
core := newTestCore(withValidProxyManager(), withMeta(meta), withInvalidTsoAllocator())
task := &alterCollectionTask{
baseTask: newBaseTask(context.Background(), core),
Req: &milvuspb.AlterCollectionRequest{
Expand Down Expand Up @@ -129,7 +129,7 @@ func Test_alterCollectionTask_Execute(t *testing.T) {
return errors.New("err")
}

core := newTestCore(withValidProxyManager(), withMeta(meta), withBroker(broker))
core := newTestCore(withValidProxyManager(), withMeta(meta), withBroker(broker), withInvalidTsoAllocator())
task := &alterCollectionTask{
baseTask: newBaseTask(context.Background(), core),
Req: &milvuspb.AlterCollectionRequest{
Expand Down Expand Up @@ -164,7 +164,7 @@ func Test_alterCollectionTask_Execute(t *testing.T) {
return errors.New("err")
}

core := newTestCore(withInvalidProxyManager(), withMeta(meta), withBroker(broker))
core := newTestCore(withInvalidProxyManager(), withMeta(meta), withBroker(broker), withInvalidTsoAllocator())
task := &alterCollectionTask{
baseTask: newBaseTask(context.Background(), core),
Req: &milvuspb.AlterCollectionRequest{
Expand Down Expand Up @@ -198,7 +198,7 @@ func Test_alterCollectionTask_Execute(t *testing.T) {
},
},
}, nil)
core := newTestCore(withValidProxyManager(), withMeta(meta))
core := newTestCore(withValidProxyManager(), withMeta(meta), withInvalidTsoAllocator())
task := &alterCollectionTask{
baseTask: newBaseTask(context.Background(), core),
Req: &milvuspb.AlterCollectionRequest{
Expand Down Expand Up @@ -238,7 +238,8 @@ func Test_alterCollectionTask_Execute(t *testing.T) {
return nil
}

core := newTestCore(withValidProxyManager(), withMeta(meta), withBroker(broker))
core := newTestCore(withValidProxyManager(), withMeta(meta), withBroker(broker), withInvalidTsoAllocator())

task := &alterCollectionTask{
baseTask: newBaseTask(context.Background(), core),
Req: &milvuspb.AlterCollectionRequest{
Expand Down
2 changes: 1 addition & 1 deletion internal/rootcoord/create_collection_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,6 @@ func (t *createCollectionTask) Execute(ctx context.Context) error {
State: pb.PartitionState_PartitionCreated,
}
}

collInfo := model.Collection{
CollectionID: collID,
DBID: t.dbID,
Expand All @@ -492,6 +491,7 @@ func (t *createCollectionTask) Execute(ctx context.Context) error {
Partitions: partitions,
Properties: t.Req.Properties,
EnableDynamicField: t.schema.EnableDynamicField,
UpdateTimestamp: ts,
}

// We cannot check the idempotency inside meta table when adding collection, since we'll execute duplicate steps
Expand Down
1 change: 1 addition & 0 deletions internal/rootcoord/root_coord.go
Original file line number Diff line number Diff line change
Expand Up @@ -1178,6 +1178,7 @@ func convertModelToDesc(collInfo *model.Collection, aliases []string, dbName str
resp.Properties = collInfo.Properties
resp.NumPartitions = int64(len(collInfo.Partitions))
resp.DbId = collInfo.DBID
resp.UpdateTimestamp = collInfo.UpdateTimestamp
return resp
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ require (
github.com/expr-lang/expr v1.15.7
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
github.com/klauspost/compress v1.17.7
github.com/milvus-io/milvus-proto/go-api/v2 v2.4.21
github.com/milvus-io/milvus-proto/go-api/v2 v2.4.22-0.20250117031653-a796005be4f7
github.com/nats-io/nats-server/v2 v2.10.12
github.com/nats-io/nats.go v1.34.1
github.com/panjf2000/ants/v2 v2.7.2
Expand Down
4 changes: 2 additions & 2 deletions pkg/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -503,8 +503,8 @@ github.com/milvus-io/cgosymbolizer v0.0.0-20240722103217-b7dee0e50119 h1:9VXijWu
github.com/milvus-io/cgosymbolizer v0.0.0-20240722103217-b7dee0e50119/go.mod h1:DvXTE/K/RtHehxU8/GtDs4vFtfw64jJ3PaCnFri8CRg=
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b h1:TfeY0NxYxZzUfIfYe5qYDBzt4ZYRqzUjTR6CvUzjat8=
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b/go.mod h1:iwW+9cWfIzzDseEBCCeDSN5SD16Tidvy8cwQ7ZY8Qj4=
github.com/milvus-io/milvus-proto/go-api/v2 v2.4.21 h1:Imb0uGFp/kyPPI5f6dCne8GCJIceuQWzI1H20p5aa4c=
github.com/milvus-io/milvus-proto/go-api/v2 v2.4.21/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
github.com/milvus-io/milvus-proto/go-api/v2 v2.4.22-0.20250117031653-a796005be4f7 h1:putot5l1gpiucE4CBrYzLoPCAci/BdYFe76GP15xsg4=
github.com/milvus-io/milvus-proto/go-api/v2 v2.4.22-0.20250117031653-a796005be4f7/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
github.com/milvus-io/pulsar-client-go v0.6.10 h1:eqpJjU+/QX0iIhEo3nhOqMNXL+TyInAs1IAHZCrCM/A=
github.com/milvus-io/pulsar-client-go v0.6.10/go.mod h1:lQqCkgwDF8YFYjKA+zOheTk1tev2B+bKj5j7+nm8M1w=
github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g=
Expand Down
1 change: 1 addition & 0 deletions pkg/proto/etcd_meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ message CollectionInfo {
CollectionState state = 13; // To keep compatible with older version, default state is `Created`.
repeated common.KeyValuePair properties = 14;
int64 db_id = 15;
uint64 UpdateTimestamp = 16;
}

message PartitionInfo {
Expand Down
Loading

0 comments on commit 0d55a55

Please sign in to comment.