-
Notifications
You must be signed in to change notification settings - Fork 2.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Online DDL: better error messages in cut-over phase #17052
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -785,7 +785,7 @@ func (e *Executor) killTableLockHoldersAndAccessors(ctx context.Context, tableNa | |
} | ||
rs, err := conn.Conn.ExecuteFetch(query, -1, true) | ||
if err != nil { | ||
return err | ||
return vterrors.Wrapf(err, "finding queries potentially operating on table") | ||
} | ||
|
||
log.Infof("killTableLockHoldersAndAccessors: found %v potential queries", len(rs.Rows)) | ||
|
@@ -841,7 +841,7 @@ func (e *Executor) killTableLockHoldersAndAccessors(ctx context.Context, tableNa | |
} | ||
rs, err := conn.Conn.ExecuteFetch(query, -1, true) | ||
if err != nil { | ||
return err | ||
return vterrors.Wrapf(err, "finding transactions locking table") | ||
} | ||
log.Infof("killTableLockHoldersAndAccessors: found %v locking transactions", len(rs.Rows)) | ||
for _, row := range rs.Named().Rows { | ||
|
@@ -861,7 +861,7 @@ func (e *Executor) killTableLockHoldersAndAccessors(ctx context.Context, tableNa | |
// cutOverVReplMigration stops vreplication, then removes the _vt.vreplication entry for the given migration | ||
func (e *Executor) cutOverVReplMigration(ctx context.Context, s *VReplStream, shouldForceCutOver bool) error { | ||
if err := e.incrementCutoverAttempts(ctx, s.workflow); err != nil { | ||
return err | ||
return vterrors.Wrapf(err, "cutover: failed incrementing cutover attempts") | ||
} | ||
|
||
tmClient := e.tabletManagerClient() | ||
|
@@ -870,19 +870,19 @@ func (e *Executor) cutOverVReplMigration(ctx context.Context, s *VReplStream, sh | |
// sanity checks: | ||
vreplTable, err := getVreplTable(s) | ||
if err != nil { | ||
return err | ||
return vterrors.Wrapf(err, "cutover: failed getting vreplication table") | ||
} | ||
|
||
// get topology client & entities: | ||
tablet, err := e.ts.GetTablet(ctx, e.tabletAlias) | ||
if err != nil { | ||
return err | ||
return vterrors.Wrapf(err, "cutover: failed reading vreplication table") | ||
} | ||
|
||
// information about source tablet | ||
onlineDDL, _, err := e.readMigration(ctx, s.workflow) | ||
if err != nil { | ||
return err | ||
return vterrors.Wrapf(err, "cutover: failed reading migration") | ||
} | ||
isVreplicationTestSuite := onlineDDL.StrategySetting().IsVreplicationTestSuite() | ||
e.updateMigrationStage(ctx, onlineDDL.UUID, "starting cut-over") | ||
|
@@ -896,7 +896,10 @@ func (e *Executor) cutOverVReplMigration(ctx context.Context, s *VReplStream, sh | |
defer cancel() | ||
// Wait for target to reach the up-to-date pos | ||
if err := tmClient.VReplicationWaitForPos(ctx, tablet.Tablet, s.id, replication.EncodePosition(pos)); err != nil { | ||
return err | ||
if s, _ := e.readVReplStream(ctx, s.workflow, true); s != nil { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As added bonus, why not also report what position we did land at. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is also in the error message from the VReplication Engine FWIW. |
||
err = vterrors.Wrapf(err, "read vrepl position %v", s.pos) | ||
} | ||
return vterrors.Wrapf(err, "failed waiting for position %v", replication.EncodePosition(pos)) | ||
} | ||
// Target is now in sync with source! | ||
return nil | ||
|
@@ -910,14 +913,14 @@ func (e *Executor) cutOverVReplMigration(ctx context.Context, s *VReplStream, sh | |
// in that place as possible. | ||
sentryTableName, err = schema.GenerateGCTableName(schema.HoldTableGCState, newGCTableRetainTime()) | ||
if err != nil { | ||
return nil | ||
return vterrors.Wrapf(err, "failed creating sentry table name") | ||
} | ||
|
||
// We create the sentry table before toggling writes, because this involves a WaitForPos, which takes some time. We | ||
// don't want to overload the buffering time with this excessive wait. | ||
|
||
if err := e.updateArtifacts(ctx, onlineDDL.UUID, sentryTableName); err != nil { | ||
return err | ||
return vterrors.Wrapf(err, "failed updating artifacts with sentry table name") | ||
} | ||
|
||
dropSentryTableQuery := sqlparser.BuildParsedQuery(sqlDropTableIfExists, sentryTableName) | ||
|
@@ -941,30 +944,30 @@ func (e *Executor) cutOverVReplMigration(ctx context.Context, s *VReplStream, sh | |
}() | ||
parsed := sqlparser.BuildParsedQuery(sqlCreateSentryTable, sentryTableName) | ||
if _, err := e.execQuery(ctx, parsed.Query); err != nil { | ||
return err | ||
return vterrors.Wrapf(err, "failed creating sentry table") | ||
} | ||
e.updateMigrationStage(ctx, onlineDDL.UUID, "sentry table created: %s", sentryTableName) | ||
|
||
postSentryPos, err := e.primaryPosition(ctx) | ||
if err != nil { | ||
return err | ||
return vterrors.Wrapf(err, "failed getting primary pos after sentry creation") | ||
} | ||
e.updateMigrationStage(ctx, onlineDDL.UUID, "waiting for post-sentry pos: %v", replication.EncodePosition(postSentryPos)) | ||
if err := waitForPos(s, postSentryPos); err != nil { | ||
return err | ||
return vterrors.Wrapf(err, "failed waiting for pos after sentry creation") | ||
} | ||
e.updateMigrationStage(ctx, onlineDDL.UUID, "post-sentry pos reached") | ||
} | ||
|
||
lockConn, err := e.pool.Get(ctx, nil) | ||
if err != nil { | ||
return err | ||
return vterrors.Wrapf(err, "failed getting locking connection") | ||
} | ||
// Set large enough `@@lock_wait_timeout` so that it does not interfere with the cut-over operation. | ||
// The code will ensure everything that needs to be terminated by `migrationCutOverThreshold` will be terminated. | ||
lockConnRestoreLockWaitTimeout, err := e.initConnectionLockWaitTimeout(ctx, lockConn.Conn, 5*migrationCutOverThreshold) | ||
if err != nil { | ||
return err | ||
return vterrors.Wrapf(err, "failed setting lock_wait_timeout on locking connection") | ||
} | ||
defer lockConn.Recycle() | ||
defer lockConnRestoreLockWaitTimeout() | ||
|
@@ -974,13 +977,13 @@ func (e *Executor) cutOverVReplMigration(ctx context.Context, s *VReplStream, sh | |
renameWasSuccessful := false | ||
renameConn, err := e.pool.Get(ctx, nil) | ||
if err != nil { | ||
return err | ||
return vterrors.Wrapf(err, "failed getting rename connection") | ||
} | ||
// Set large enough `@@lock_wait_timeout` so that it does not interfere with the cut-over operation. | ||
// The code will ensure everything that needs to be terminated by `migrationCutOverThreshold` will be terminated. | ||
renameConnRestoreLockWaitTimeout, err := e.initConnectionLockWaitTimeout(ctx, renameConn.Conn, 5*migrationCutOverThreshold*4) | ||
if err != nil { | ||
return err | ||
return vterrors.Wrapf(err, "failed setting lock_wait_timeout on rename connection") | ||
} | ||
defer renameConn.Recycle() | ||
defer func() { | ||
|
@@ -996,7 +999,7 @@ func (e *Executor) cutOverVReplMigration(ctx context.Context, s *VReplStream, sh | |
// See if backend MySQL server supports 'rename_table_preserve_foreign_key' variable | ||
preserveFKSupported, err := e.isPreserveForeignKeySupported(ctx) | ||
if err != nil { | ||
return err | ||
return vterrors.Wrapf(err, "failed checking for 'rename_table_preserve_foreign_key' support") | ||
} | ||
if preserveFKSupported { | ||
// This code is only applicable when MySQL supports the 'rename_table_preserve_foreign_key' variable. This variable | ||
|
@@ -1019,7 +1022,7 @@ func (e *Executor) cutOverVReplMigration(ctx context.Context, s *VReplStream, sh | |
for { | ||
renameProcessFound, err := e.doesConnectionInfoMatch(renameWaitCtx, renameConn.Conn.ID(), "rename") | ||
if err != nil { | ||
return err | ||
return vterrors.Wrapf(err, "searching for rename process") | ||
} | ||
if renameProcessFound { | ||
return nil | ||
|
@@ -1053,7 +1056,7 @@ func (e *Executor) cutOverVReplMigration(ctx context.Context, s *VReplStream, sh | |
bufferingContextCancel() | ||
// force re-read of tables | ||
if err := tmClient.RefreshState(grpcCtx, tablet.Tablet); err != nil { | ||
return err | ||
return vterrors.Wrapf(err, "refreshing table state") | ||
} | ||
} | ||
log.Infof("toggled buffering: %t in migration %v", bufferQueries, onlineDDL.UUID) | ||
|
@@ -1073,7 +1076,7 @@ func (e *Executor) cutOverVReplMigration(ctx context.Context, s *VReplStream, sh | |
err = toggleBuffering(true) | ||
defer reenableWritesOnce() | ||
if err != nil { | ||
return err | ||
return vterrors.Wrapf(err, "failed enabling buffering") | ||
} | ||
// Give a fraction of a second for a scenario where a query is in | ||
// query executor, it passed the ACLs and is _about to_ execute. This will be nicer to those queries: | ||
|
@@ -1086,10 +1089,10 @@ func (e *Executor) cutOverVReplMigration(ctx context.Context, s *VReplStream, sh | |
// We should only proceed with forceful cut over if there is no pending atomic transaction for the table. | ||
// This will help in keeping the atomicity guarantee of a prepared transaction. | ||
if err := e.checkOnPreparedPool(ctx, onlineDDL.Table, 100*time.Millisecond); err != nil { | ||
return err | ||
return vterrors.Wrapf(err, "checking prepared pool for table") | ||
} | ||
if err := e.killTableLockHoldersAndAccessors(ctx, onlineDDL.Table); err != nil { | ||
return err | ||
return vterrors.Wrapf(err, "failed killing table lock holders and accessors") | ||
} | ||
} | ||
|
||
|
@@ -1112,7 +1115,7 @@ func (e *Executor) cutOverVReplMigration(ctx context.Context, s *VReplStream, sh | |
defer cancel() | ||
lockTableQuery := sqlparser.BuildParsedQuery(sqlLockTwoTablesWrite, sentryTableName, onlineDDL.Table) | ||
if _, err := lockConn.Conn.Exec(lockCtx, lockTableQuery.Query, 1, false); err != nil { | ||
return err | ||
return vterrors.Wrapf(err, "failed locking tables") | ||
} | ||
|
||
e.updateMigrationStage(ctx, onlineDDL.UUID, "renaming tables") | ||
|
@@ -1124,15 +1127,15 @@ func (e *Executor) cutOverVReplMigration(ctx context.Context, s *VReplStream, sh | |
// the rename should block, because of the LOCK. Wait for it to show up. | ||
e.updateMigrationStage(ctx, onlineDDL.UUID, "waiting for RENAME to block") | ||
if err := waitForRenameProcess(); err != nil { | ||
return err | ||
return vterrors.Wrapf(err, "failed waiting for rename process") | ||
} | ||
e.updateMigrationStage(ctx, onlineDDL.UUID, "RENAME found") | ||
} | ||
|
||
e.updateMigrationStage(ctx, onlineDDL.UUID, "reading post-lock pos") | ||
postWritesPos, err := e.primaryPosition(ctx) | ||
if err != nil { | ||
return err | ||
return vterrors.Wrapf(err, "failed reading pos after locking") | ||
} | ||
|
||
// Right now: new queries are buffered, any existing query will have executed, and worst case scenario is | ||
|
@@ -1144,19 +1147,19 @@ func (e *Executor) cutOverVReplMigration(ctx context.Context, s *VReplStream, sh | |
// Writes are now disabled on table. Read up-to-date vreplication info, specifically to get latest (and fixed) pos: | ||
s, err = e.readVReplStream(ctx, s.workflow, false) | ||
if err != nil { | ||
return err | ||
return vterrors.Wrapf(err, "failed reading vreplication table after locking") | ||
} | ||
|
||
e.updateMigrationStage(ctx, onlineDDL.UUID, "waiting for post-lock pos: %v", replication.EncodePosition(postWritesPos)) | ||
if err := waitForPos(s, postWritesPos); err != nil { | ||
e.updateMigrationStage(ctx, onlineDDL.UUID, "timeout while waiting for post-lock pos: %v", err) | ||
return err | ||
return vterrors.Wrapf(err, "failed waiting for pos after locking") | ||
} | ||
go log.Infof("cutOverVReplMigration %v: done waiting for position %v", s.workflow, replication.EncodePosition(postWritesPos)) | ||
// Stop vreplication | ||
e.updateMigrationStage(ctx, onlineDDL.UUID, "stopping vreplication") | ||
if _, err := e.vreplicationExec(ctx, tablet.Tablet, binlogplayer.StopVReplication(s.id, "stopped for online DDL cutover")); err != nil { | ||
return err | ||
return vterrors.Wrapf(err, "failed stopping vreplication") | ||
} | ||
go log.Infof("cutOverVReplMigration %v: stopped vreplication", s.workflow) | ||
|
||
|
@@ -1173,7 +1176,7 @@ func (e *Executor) cutOverVReplMigration(ctx context.Context, s *VReplStream, sh | |
} else { | ||
e.updateMigrationStage(ctx, onlineDDL.UUID, "validating rename is still in place") | ||
if err := waitForRenameProcess(); err != nil { | ||
return err | ||
return vterrors.Wrapf(err, "failed waiting for rename process before dropping sentry table") | ||
} | ||
|
||
// Normal (non-testing) alter table | ||
|
@@ -1184,23 +1187,23 @@ func (e *Executor) cutOverVReplMigration(ctx context.Context, s *VReplStream, sh | |
lockCtx, cancel := context.WithTimeout(ctx, migrationCutOverThreshold) | ||
defer cancel() | ||
if _, err := lockConn.Conn.Exec(lockCtx, dropTableQuery.Query, 1, false); err != nil { | ||
return err | ||
return vterrors.Wrapf(err, "failed dropping sentry table") | ||
} | ||
} | ||
{ | ||
lockCtx, cancel := context.WithTimeout(ctx, migrationCutOverThreshold) | ||
defer cancel() | ||
e.updateMigrationStage(ctx, onlineDDL.UUID, "unlocking tables") | ||
if _, err := lockConn.Conn.Exec(lockCtx, sqlUnlockTables, 1, false); err != nil { | ||
return err | ||
return vterrors.Wrapf(err, "failed unlocking tables") | ||
} | ||
} | ||
{ | ||
lockCtx, cancel := context.WithTimeout(ctx, migrationCutOverThreshold) | ||
defer cancel() | ||
e.updateMigrationStage(lockCtx, onlineDDL.UUID, "waiting for RENAME to complete") | ||
if err := <-renameCompleteChan; err != nil { | ||
return err | ||
return vterrors.Wrapf(err, "failed waiting for rename to complete") | ||
} | ||
renameWasSuccessful = true | ||
} | ||
|
@@ -3784,8 +3787,10 @@ func (e *Executor) reviewRunningMigrations(ctx context.Context) (countRunnning i | |
if err := e.cutOverVReplMigration(ctx, s, shouldForceCutOver); err != nil { | ||
_ = e.updateMigrationMessage(ctx, uuid, err.Error()) | ||
log.Errorf("cutOverVReplMigration failed: err=%v", err) | ||
if merr, ok := err.(*sqlerror.SQLError); ok { | ||
switch merr.Num { | ||
|
||
if sqlErr, isSQLErr := sqlerror.NewSQLErrorFromError(err).(*sqlerror.SQLError); isSQLErr && sqlErr != nil { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This change is required because the underlying SQL error is now likely to be wrapped by a vitess error. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not sure if we have any wrapping, but I think places where we cast errors directly probably should use https://pkg.go.dev/errors#example-As these days. So if we're changing this, maybe also switch to that? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
It's not about this function though? It's about the cast after? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah, the cast at that same line... Rich story. We tried to get rid of it altogether, but hit the (weird, IMO) Reference: #12574 |
||
// let's see if this error is actually acceptable | ||
switch sqlErr.Num { | ||
case sqlerror.ERTooLongIdent: | ||
go e.CancelMigration(ctx, uuid, err.Error(), false) | ||
} | ||
|
@@ -5160,6 +5165,7 @@ func (e *Executor) OnSchemaMigrationStatus(ctx context.Context, | |
return e.onSchemaMigrationStatus(ctx, uuidParam, status, dryRun, progressPct, etaSeconds, rowsCopied, hint) | ||
} | ||
|
||
// checkOnPreparedPool checks if there are any cross-shard prepared transactions on the given table | ||
func (e *Executor) checkOnPreparedPool(ctx context.Context, table string, waitTime time.Duration) error { | ||
if e.isPreparedPoolEmpty(table) { | ||
return nil | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it not make more sense to say ~ "failed to find queries ..." and similar for some others like the one just below?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This error is later wrapped in:
https://github.com/vitessio/vitess/pull/17052/files#diff-059c9f46e8d270d9c5514ef2b08679035eb0daaa8d95074e34ef43a81d50dc37R1095
Which prepends "failed killing table lock holders and accessors".