Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle last_insert_id() in vgate #17372

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go/mysql/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,7 @@ func (c *Conn) ReadQueryResult(maxrows int, wantfields bool) (*sqltypes.Result,
result.SessionStateChanges = packetEof.sessionStateData
result.StatusFlags = packetEof.statusFlags
result.Info = packetEof.info
result.InsertID = packetEof.lastInsertID
}
return result, more, warnings, nil

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,11 @@ SELECT (~ (1 || 0)) IS NULL;

SELECT 1
WHERE (~ (1 || 0)) IS NULL;

SELECT c1 FROM t0 WHERE LAST_INSERT_ID(42);

SELECT LAST_INSERT_ID();

SELECT c1 FROM t0 WHERE LAST_INSERT_ID(0);

SELECT LAST_INSERT_ID();
4 changes: 2 additions & 2 deletions go/test/vschemawrapper/vschema_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,12 +344,12 @@ func (vw *VSchemaWrapper) IsViewsEnabled() bool {

// FindMirrorRule finds the mirror rule for the requested keyspace, table
// name, and the tablet type in the VSchema.
func (vs *VSchemaWrapper) FindMirrorRule(tab sqlparser.TableName) (*vindexes.MirrorRule, error) {
func (vw *VSchemaWrapper) FindMirrorRule(tab sqlparser.TableName) (*vindexes.MirrorRule, error) {
destKeyspace, destTabletType, _, err := topoproto.ParseDestination(tab.Qualifier.String(), topodatapb.TabletType_PRIMARY)
if err != nil {
return nil, err
}
mirrorRule, err := vs.V.FindMirrorRule(destKeyspace, tab.Name.String(), destTabletType)
mirrorRule, err := vw.V.FindMirrorRule(destKeyspace, tab.Name.String(), destTabletType)
if err != nil {
return nil, err
}
Expand Down
13 changes: 7 additions & 6 deletions go/vt/vtgate/engine/plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,13 @@ import (
// each node does its part by combining the results of the
// sub-nodes.
type Plan struct {
Type sqlparser.StatementType // The type of query we have
Original string // Original is the original query.
Instructions Primitive // Instructions contains the instructions needed to fulfil the query.
BindVarNeeds *sqlparser.BindVarNeeds // Stores BindVars needed to be provided as part of expression rewriting
Warnings []*query.QueryWarning // Warnings that need to be yielded every time this query runs
TablesUsed []string // TablesUsed is the list of tables that this plan will query
Type sqlparser.StatementType // The type of query we have
Original string // Original is the original query.
Instructions Primitive // Instructions contains the instructions needed to fulfil the query.
BindVarNeeds *sqlparser.BindVarNeeds // Stores BindVars needed to be provided as part of expression rewriting
Warnings []*query.QueryWarning // Warnings that need to be yielded every time this query runs
TablesUsed []string // TablesUsed is the list of tables that this plan will query
ForceReadLastInsertID bool // ForceReadLastInsertID is set to true when we need to set the session's last insert ID value to what this plan returns no matter what

ExecCount uint64 // Count of times this plan was executed
ExecTime uint64 // Total execution time
Expand Down
20 changes: 13 additions & 7 deletions go/vt/vtgate/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,12 +271,12 @@ type streaminResultReceiver struct {
callback func(*sqltypes.Result) error
}

func (s *streaminResultReceiver) storeResultStats(typ sqlparser.StatementType, qr *sqltypes.Result) error {
func (s *streaminResultReceiver) storeResultStats(typ sqlparser.StatementType, qr *sqltypes.Result, forceReadLastInsertID bool) error {
s.mu.Lock()
defer s.mu.Unlock()
s.rowsAffected += qr.RowsAffected
s.rowsReturned += len(qr.Rows)
if qr.InsertID != 0 {
if forceReadLastInsertID || qr.InsertID != 0 {
s.insertID = qr.InsertID
}
s.stmtType = typ
Expand Down Expand Up @@ -344,7 +344,7 @@ func (e *Executor) StreamExecute(

// 4: Execute!
err := vc.StreamExecutePrimitive(ctx, plan.Instructions, bindVars, true, func(qr *sqltypes.Result) error {
return srr.storeResultStats(plan.Type, qr)
return srr.storeResultStats(plan.Type, qr, plan.ForceReadLastInsertID)
})

// Check if there was partial DML execution. If so, rollback the effect of the partially executed query.
Expand Down Expand Up @@ -439,8 +439,14 @@ func (e *Executor) execute(ctx context.Context, mysqlCtx vtgateservice.MySQLConn
err = e.newExecute(ctx, mysqlCtx, safeSession, sql, bindVars, logStats, func(ctx context.Context, plan *engine.Plan, vc *econtext.VCursorImpl, bindVars map[string]*querypb.BindVariable, time time.Time) error {
stmtType = plan.Type
qr, err = e.executePlan(ctx, safeSession, plan, vc, bindVars, logStats, time)
return err
}, func(typ sqlparser.StatementType, result *sqltypes.Result) error {
if err != nil {
return err
}
if plan.ForceReadLastInsertID {
safeSession.LastInsertId = qr.InsertID
}
return nil
}, func(typ sqlparser.StatementType, result *sqltypes.Result, _ bool) error {
stmtType = typ
qr = result
return nil
Expand Down Expand Up @@ -1144,7 +1150,7 @@ func (e *Executor) getPlan(
logStats.SQL = comments.Leading + query + comments.Trailing
logStats.BindVariables = sqltypes.CopyBindVariables(bindVars)

return e.cacheAndBuildStatement(ctx, vcursor, query, stmt, reservedVars, bindVarNeeds, logStats)
return e.getFromCacheOrBuildStatement(ctx, vcursor, query, stmt, reservedVars, bindVarNeeds, logStats)
}

func (e *Executor) hashPlan(ctx context.Context, vcursor *econtext.VCursorImpl, query string) PlanCacheKey {
Expand Down Expand Up @@ -1179,7 +1185,7 @@ func (e *Executor) buildStatement(
return plan, err
}

func (e *Executor) cacheAndBuildStatement(
func (e *Executor) getFromCacheOrBuildStatement(
ctx context.Context,
vcursor *econtext.VCursorImpl,
query string,
Expand Down
4 changes: 2 additions & 2 deletions go/vt/vtgate/plan_execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import (
)

type planExec func(ctx context.Context, plan *engine.Plan, vc *econtext.VCursorImpl, bindVars map[string]*querypb.BindVariable, startTime time.Time) error
type txResult func(sqlparser.StatementType, *sqltypes.Result) error
type txResult func(sqlparser.StatementType, *sqltypes.Result, bool) error

var vschemaWaitTimeout = 30 * time.Second

Expand Down Expand Up @@ -157,7 +157,7 @@ func (e *Executor) newExecute(
return err
}
if result != nil {
return recResult(plan.Type, result)
return recResult(plan.Type, result, plan.ForceReadLastInsertID)
}

// 4: Prepare for execution.
Expand Down
45 changes: 25 additions & 20 deletions go/vt/vtgate/planbuilder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,16 @@ var (

type (
planResult struct {
primitive engine.Primitive
tables []string
primitive engine.Primitive
tables []string
forceReadLastInsertID bool
}

stmtPlanner func(sqlparser.Statement, *sqlparser.ReservedVars, plancontext.VSchema) (*planResult, error)
)

func newPlanResult(prim engine.Primitive, tablesUsed ...string) *planResult {
return &planResult{primitive: prim, tables: tablesUsed}
func newPlanResult(prim engine.Primitive, lastInsertID bool, tablesUsed ...string) *planResult {
return &planResult{primitive: prim, tables: tablesUsed, forceReadLastInsertID: lastInsertID}
}

func singleTable(ks, tbl string) string {
Expand Down Expand Up @@ -115,16 +116,20 @@ func BuildFromStmt(ctx context.Context, query string, stmt sqlparser.Statement,

var primitive engine.Primitive
var tablesUsed []string
var forceReadLastInsertID bool
if planResult != nil {
primitive = planResult.primitive
tablesUsed = planResult.tables
forceReadLastInsertID = planResult.forceReadLastInsertID
}

plan := &engine.Plan{
Type: sqlparser.ASTToStatementType(stmt),
Original: query,
Instructions: primitive,
BindVarNeeds: bindVarNeeds,
TablesUsed: tablesUsed,
Type: sqlparser.ASTToStatementType(stmt),
Original: query,
Instructions: primitive,
BindVarNeeds: bindVarNeeds,
TablesUsed: tablesUsed,
ForceReadLastInsertID: forceReadLastInsertID,
}
return plan, nil
}
Expand Down Expand Up @@ -239,7 +244,7 @@ func createInstructionFor(ctx context.Context, query string, stmt sqlparser.Stat
case *sqlparser.CommentOnly:
// There is only a comment in the input.
// This is essentially a No-op
return newPlanResult(engine.NewRowsPrimitive(nil, nil)), nil
return newPlanResult(engine.NewRowsPrimitive(nil, nil), false), nil
}

return nil, vterrors.VT13001(fmt.Sprintf("unexpected statement type: %T", stmt))
Expand Down Expand Up @@ -279,7 +284,7 @@ func buildAnalyzePlan(stmt sqlparser.Statement, _ *sqlparser.ReservedVars, vsche
TargetDestination: dest,
Query: sqlparser.String(analyzeStmt),
}
return newPlanResult(prim, sqlparser.String(analyzeStmt.Table)), nil
return newPlanResult(prim, false, sqlparser.String(analyzeStmt.Table)), nil
}

func buildDBDDLPlan(stmt sqlparser.Statement, _ *sqlparser.ReservedVars, vschema plancontext.VSchema) (*planResult, error) {
Expand All @@ -297,25 +302,25 @@ func buildDBDDLPlan(stmt sqlparser.Statement, _ *sqlparser.ReservedVars, vschema
switch dbDDL := dbDDLstmt.(type) {
case *sqlparser.DropDatabase:
if dbDDL.IfExists && !ksExists {
return newPlanResult(engine.NewRowsPrimitive(make([][]sqltypes.Value, 0), make([]*querypb.Field, 0))), nil
return newPlanResult(engine.NewRowsPrimitive(make([][]sqltypes.Value, 0), make([]*querypb.Field, 0)), false), nil
}
if !ksExists {
return nil, vterrors.VT05001(ksName)
}
return newPlanResult(engine.NewDBDDL(ksName, false, queryTimeout(dbDDL.Comments.Directives()))), nil
return newPlanResult(engine.NewDBDDL(ksName, false, queryTimeout(dbDDL.Comments.Directives())), false), nil
case *sqlparser.AlterDatabase:
if !ksExists {
return nil, vterrors.VT05002(ksName)
}
return nil, vterrors.VT12001("ALTER DATABASE")
case *sqlparser.CreateDatabase:
if dbDDL.IfNotExists && ksExists {
return newPlanResult(engine.NewRowsPrimitive(make([][]sqltypes.Value, 0), make([]*querypb.Field, 0))), nil
return newPlanResult(engine.NewRowsPrimitive(make([][]sqltypes.Value, 0), make([]*querypb.Field, 0)), false), nil
}
if !dbDDL.IfNotExists && ksExists {
return nil, vterrors.VT06001(ksName)
}
return newPlanResult(engine.NewDBDDL(ksName, true, queryTimeout(dbDDL.Comments.Directives()))), nil
return newPlanResult(engine.NewDBDDL(ksName, true, queryTimeout(dbDDL.Comments.Directives())), false), nil
}
return nil, vterrors.VT13001(fmt.Sprintf("database DDL not recognized: %s", sqlparser.String(dbDDLstmt)))
}
Expand All @@ -340,7 +345,7 @@ func buildLoadPlan(query string, vschema plancontext.VSchema) (*planResult, erro
Query: query,
IsDML: true,
SingleShardOnly: true,
}), nil
}, false), nil
}

func buildVSchemaDDLPlan(stmt *sqlparser.AlterVschema, vschema plancontext.VSchema) (*planResult, error) {
Expand All @@ -351,7 +356,7 @@ func buildVSchemaDDLPlan(stmt *sqlparser.AlterVschema, vschema plancontext.VSche
return newPlanResult(&engine.AlterVSchema{
Keyspace: keyspace,
AlterVschemaDDL: stmt,
}, singleTable(keyspace.Name, stmt.Table.Name.String())), nil
}, false, singleTable(keyspace.Name, stmt.Table.Name.String())), nil
}

func buildFlushPlan(stmt *sqlparser.Flush, vschema plancontext.VSchema) (*planResult, error) {
Expand Down Expand Up @@ -381,7 +386,7 @@ func buildFlushOptions(stmt *sqlparser.Flush, vschema plancontext.VSchema) (*pla
TargetDestination: dest,
Query: sqlparser.String(stmt),
ReservedConnectionNeeded: stmt.WithLock,
}), nil
}, false), nil
}

func buildFlushTables(stmt *sqlparser.Flush, vschema plancontext.VSchema) (*planResult, error) {
Expand Down Expand Up @@ -433,7 +438,7 @@ func buildFlushTables(stmt *sqlparser.Flush, vschema plancontext.VSchema) (*plan
TargetDestination: sendDest.dest,
Query: sqlparser.String(newFlushStmt(stmt, tables)),
ReservedConnectionNeeded: stmt.WithLock,
}, tc.getTables()...), nil
}, false, tc.getTables()...), nil
}
}

Expand All @@ -451,7 +456,7 @@ func buildFlushTables(stmt *sqlparser.Flush, vschema plancontext.VSchema) (*plan
}
sources = append(sources, plan)
}
return newPlanResult(engine.NewConcatenate(sources, nil), tc.getTables()...), nil
return newPlanResult(engine.NewConcatenate(sources, nil), false, tc.getTables()...), nil
}

type tableCollector struct {
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtgate/planbuilder/bypass.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func buildPlanForBypass(stmt sqlparser.Statement, _ *sqlparser.ReservedVars, vsc
MultishardAutocommit: hints.multiShardAutocommit,
QueryTimeout: hints.queryTimeout,
}
return newPlanResult(send), nil
return newPlanResult(send, false), nil
}

func GetShardRoute(vschema plancontext.VSchema, keyspace, shard string) (*vindexes.Keyspace, error) {
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtgate/planbuilder/call_proc.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func buildCallProcPlan(stmt *sqlparser.CallProc, vschema plancontext.VSchema) (*
Keyspace: keyspace,
TargetDestination: dest,
Query: sqlparser.String(stmt),
}), nil
}, false), nil
}

const errNotAllowWhenSharded = "CALL is not supported for sharded keyspace"
4 changes: 2 additions & 2 deletions go/vt/vtgate/planbuilder/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func buildGeneralDDLPlan(ctx context.Context, sql string, ddlStatement sqlparser
tc.addASTTable(normalDDLPlan.Keyspace.Name, tbl)
}

return newPlanResult(eddl, tc.getTables()...), nil
return newPlanResult(eddl, false, tc.getTables()...), nil
}

func buildByPassPlan(sql string, vschema plancontext.VSchema, isDDL bool) (*planResult, error) {
Expand All @@ -89,7 +89,7 @@ func buildByPassPlan(sql string, vschema plancontext.VSchema, isDDL bool) (*plan
Query: sql,
IsDDL: isDDL,
}
return newPlanResult(send), nil
return newPlanResult(send, false), nil
}

func buildDDLPlans(ctx context.Context, sql string, ddlStatement sqlparser.DDLStatement, reservedVars *sqlparser.ReservedVars, vschema plancontext.VSchema, cfg dynamicconfig.DDL) (*engine.Send, *engine.OnlineDDL, error) {
Expand Down
4 changes: 2 additions & 2 deletions go/vt/vtgate/planbuilder/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func gen4DeleteStmtPlanner(
if ks, tables := ctx.SemTable.SingleUnshardedKeyspace(); ks != nil {
if !ctx.SemTable.ForeignKeysPresent() {
plan := deleteUnshardedShortcut(deleteStmt, ks, tables)
return newPlanResult(plan, operators.QualifiedTables(ks, tables)...), nil
return newPlanResult(plan, false, operators.QualifiedTables(ks, tables)...), nil
}
}

Expand All @@ -83,7 +83,7 @@ func gen4DeleteStmtPlanner(
return nil, err
}

return newPlanResult(plan, operators.TablesUsed(op)...), nil
return newPlanResult(plan, false, operators.TablesUsed(op)...), nil
}

func rewriteSingleTbl(del *sqlparser.Delete) (*sqlparser.Delete, error) {
Expand Down
4 changes: 2 additions & 2 deletions go/vt/vtgate/planbuilder/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func gen4InsertStmtPlanner(version querypb.ExecuteOptions_PlannerVersion, insStm
if tables[0].AutoIncrement == nil && !ctx.SemTable.ForeignKeysPresent() {
plan := insertUnshardedShortcut(insStmt, ks, tables)
setCommentDirectivesOnPlan(plan, insStmt)
return newPlanResult(plan, operators.QualifiedTables(ks, tables)...), nil
return newPlanResult(plan, false, operators.QualifiedTables(ks, tables)...), nil
}
}

Expand All @@ -80,7 +80,7 @@ func gen4InsertStmtPlanner(version querypb.ExecuteOptions_PlannerVersion, insStm
return nil, err
}

return newPlanResult(plan, operators.TablesUsed(op)...), nil
return newPlanResult(plan, false, operators.TablesUsed(op)...), nil
}

func errOutIfPlanCannotBeConstructed(ctx *plancontext.PlanningContext, vTbl *vindexes.Table) error {
Expand Down
4 changes: 2 additions & 2 deletions go/vt/vtgate/planbuilder/locktables.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@ import (
// buildLockPlan plans lock tables statement.
func buildLockPlan(stmt sqlparser.Statement, _ *sqlparser.ReservedVars, _ plancontext.VSchema) (*planResult, error) {
log.Warningf("Lock Tables statement is ignored: %v", stmt)
return newPlanResult(engine.NewRowsPrimitive(make([][]sqltypes.Value, 0), make([]*querypb.Field, 0))), nil
return newPlanResult(engine.NewRowsPrimitive(make([][]sqltypes.Value, 0), make([]*querypb.Field, 0)), false), nil
}

// buildUnlockPlan plans lock tables statement.
func buildUnlockPlan(stmt sqlparser.Statement, _ *sqlparser.ReservedVars, _ plancontext.VSchema) (*planResult, error) {
return newPlanResult(&engine.Unlock{}), nil
return newPlanResult(&engine.Unlock{}, false), nil
}
8 changes: 4 additions & 4 deletions go/vt/vtgate/planbuilder/migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func buildAlterMigrationThrottleAppPlan(query string, alterMigration *sqlparser.
return newPlanResult(&engine.ThrottleApp{
Keyspace: keyspace,
ThrottledAppRule: throttledAppRule,
}), nil
}, false), nil
}

func buildAlterMigrationPlan(query string, alterMigration *sqlparser.AlterMigration, vschema plancontext.VSchema, cfg dynamicconfig.DDL) (*planResult, error) {
Expand Down Expand Up @@ -116,7 +116,7 @@ func buildAlterMigrationPlan(query string, alterMigration *sqlparser.AlterMigrat
TargetDestination: dest,
Query: query,
}
return newPlanResult(send), nil
return newPlanResult(send, false), nil
}

func buildRevertMigrationPlan(query string, stmt *sqlparser.RevertMigration, vschema plancontext.VSchema, cfg dynamicconfig.DDL) (*planResult, error) {
Expand Down Expand Up @@ -145,7 +145,7 @@ func buildRevertMigrationPlan(query string, stmt *sqlparser.RevertMigration, vsc
Stmt: stmt,
Query: query,
}
return newPlanResult(emig), nil
return newPlanResult(emig, false), nil
}

func buildShowMigrationLogsPlan(query string, vschema plancontext.VSchema, cfg dynamicconfig.DDL) (*planResult, error) {
Expand Down Expand Up @@ -173,5 +173,5 @@ func buildShowMigrationLogsPlan(query string, vschema plancontext.VSchema, cfg d
TargetDestination: dest,
Query: query,
}
return newPlanResult(send), nil
return newPlanResult(send, false), nil
}
2 changes: 1 addition & 1 deletion go/vt/vtgate/planbuilder/other_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,5 +37,5 @@ func buildOtherReadAndAdmin(sql string, vschema plancontext.VSchema) (*planResul
TargetDestination: destination,
Query: sql, // This is original sql query to be passed as the parser can provide partial ddl AST.
SingleShardOnly: true,
}), nil
}, false), nil
}
Loading
Loading