diff --git a/go/vt/vttablet/onlineddl/executor.go b/go/vt/vttablet/onlineddl/executor.go index 22dd9447bb9..b430345bfac 100644 --- a/go/vt/vttablet/onlineddl/executor.go +++ b/go/vt/vttablet/onlineddl/executor.go @@ -785,7 +785,7 @@ func (e *Executor) killTableLockHoldersAndAccessors(ctx context.Context, tableNa } rs, err := conn.Conn.ExecuteFetch(query, -1, true) if err != nil { - return err + return vterrors.Wrapf(err, "finding queries potentially operating on table") } log.Infof("killTableLockHoldersAndAccessors: found %v potential queries", len(rs.Rows)) @@ -841,7 +841,7 @@ func (e *Executor) killTableLockHoldersAndAccessors(ctx context.Context, tableNa } rs, err := conn.Conn.ExecuteFetch(query, -1, true) if err != nil { - return err + return vterrors.Wrapf(err, "finding transactions locking table") } log.Infof("killTableLockHoldersAndAccessors: found %v locking transactions", len(rs.Rows)) for _, row := range rs.Named().Rows { @@ -861,7 +861,7 @@ func (e *Executor) killTableLockHoldersAndAccessors(ctx context.Context, tableNa // cutOverVReplMigration stops vreplication, then removes the _vt.vreplication entry for the given migration func (e *Executor) cutOverVReplMigration(ctx context.Context, s *VReplStream, shouldForceCutOver bool) error { if err := e.incrementCutoverAttempts(ctx, s.workflow); err != nil { - return err + return vterrors.Wrapf(err, "cutover: failed incrementing cutover attempts") } tmClient := e.tabletManagerClient() @@ -870,19 +870,19 @@ func (e *Executor) cutOverVReplMigration(ctx context.Context, s *VReplStream, sh // sanity checks: vreplTable, err := getVreplTable(s) if err != nil { - return err + return vterrors.Wrapf(err, "cutover: failed getting vreplication table") } // get topology client & entities: tablet, err := e.ts.GetTablet(ctx, e.tabletAlias) if err != nil { - return err + return vterrors.Wrapf(err, "cutover: failed reading vreplication table") } // information about source tablet onlineDDL, _, err := e.readMigration(ctx, s.workflow) if err != nil { - return err + return vterrors.Wrapf(err, "cutover: failed reading migration") } isVreplicationTestSuite := onlineDDL.StrategySetting().IsVreplicationTestSuite() e.updateMigrationStage(ctx, onlineDDL.UUID, "starting cut-over") @@ -896,7 +896,10 @@ func (e *Executor) cutOverVReplMigration(ctx context.Context, s *VReplStream, sh defer cancel() // Wait for target to reach the up-to-date pos if err := tmClient.VReplicationWaitForPos(ctx, tablet.Tablet, s.id, replication.EncodePosition(pos)); err != nil { - return err + if s, _ := e.readVReplStream(ctx, s.workflow, true); s != nil { + err = vterrors.Wrapf(err, "read vrepl position %v", s.pos) + } + return vterrors.Wrapf(err, "failed waiting for position %v", replication.EncodePosition(pos)) } // Target is now in sync with source! return nil @@ -910,14 +913,14 @@ func (e *Executor) cutOverVReplMigration(ctx context.Context, s *VReplStream, sh // in that place as possible. sentryTableName, err = schema.GenerateGCTableName(schema.HoldTableGCState, newGCTableRetainTime()) if err != nil { - return nil + return vterrors.Wrapf(err, "failed creating sentry table name") } // We create the sentry table before toggling writes, because this involves a WaitForPos, which takes some time. We // don't want to overload the buffering time with this excessive wait. if err := e.updateArtifacts(ctx, onlineDDL.UUID, sentryTableName); err != nil { - return err + return vterrors.Wrapf(err, "failed updating artifacts with sentry table name") } dropSentryTableQuery := sqlparser.BuildParsedQuery(sqlDropTableIfExists, sentryTableName) @@ -941,30 +944,30 @@ func (e *Executor) cutOverVReplMigration(ctx context.Context, s *VReplStream, sh }() parsed := sqlparser.BuildParsedQuery(sqlCreateSentryTable, sentryTableName) if _, err := e.execQuery(ctx, parsed.Query); err != nil { - return err + return vterrors.Wrapf(err, "failed creating sentry table") } e.updateMigrationStage(ctx, onlineDDL.UUID, "sentry table created: %s", sentryTableName) postSentryPos, err := e.primaryPosition(ctx) if err != nil { - return err + return vterrors.Wrapf(err, "failed getting primary pos after sentry creation") } e.updateMigrationStage(ctx, onlineDDL.UUID, "waiting for post-sentry pos: %v", replication.EncodePosition(postSentryPos)) if err := waitForPos(s, postSentryPos); err != nil { - return err + return vterrors.Wrapf(err, "failed waiting for pos after sentry creation") } e.updateMigrationStage(ctx, onlineDDL.UUID, "post-sentry pos reached") } lockConn, err := e.pool.Get(ctx, nil) if err != nil { - return err + return vterrors.Wrapf(err, "failed getting locking connection") } // Set large enough `@@lock_wait_timeout` so that it does not interfere with the cut-over operation. // The code will ensure everything that needs to be terminated by `migrationCutOverThreshold` will be terminated. lockConnRestoreLockWaitTimeout, err := e.initConnectionLockWaitTimeout(ctx, lockConn.Conn, 5*migrationCutOverThreshold) if err != nil { - return err + return vterrors.Wrapf(err, "failed setting lock_wait_timeout on locking connection") } defer lockConn.Recycle() defer lockConnRestoreLockWaitTimeout() @@ -974,13 +977,13 @@ func (e *Executor) cutOverVReplMigration(ctx context.Context, s *VReplStream, sh renameWasSuccessful := false renameConn, err := e.pool.Get(ctx, nil) if err != nil { - return err + return vterrors.Wrapf(err, "failed getting rename connection") } // Set large enough `@@lock_wait_timeout` so that it does not interfere with the cut-over operation. // The code will ensure everything that needs to be terminated by `migrationCutOverThreshold` will be terminated. renameConnRestoreLockWaitTimeout, err := e.initConnectionLockWaitTimeout(ctx, renameConn.Conn, 5*migrationCutOverThreshold*4) if err != nil { - return err + return vterrors.Wrapf(err, "failed setting lock_wait_timeout on rename connection") } defer renameConn.Recycle() defer func() { @@ -996,7 +999,7 @@ func (e *Executor) cutOverVReplMigration(ctx context.Context, s *VReplStream, sh // See if backend MySQL server supports 'rename_table_preserve_foreign_key' variable preserveFKSupported, err := e.isPreserveForeignKeySupported(ctx) if err != nil { - return err + return vterrors.Wrapf(err, "failed checking for 'rename_table_preserve_foreign_key' support") } if preserveFKSupported { // This code is only applicable when MySQL supports the 'rename_table_preserve_foreign_key' variable. This variable @@ -1019,7 +1022,7 @@ func (e *Executor) cutOverVReplMigration(ctx context.Context, s *VReplStream, sh for { renameProcessFound, err := e.doesConnectionInfoMatch(renameWaitCtx, renameConn.Conn.ID(), "rename") if err != nil { - return err + return vterrors.Wrapf(err, "searching for rename process") } if renameProcessFound { return nil @@ -1053,7 +1056,7 @@ func (e *Executor) cutOverVReplMigration(ctx context.Context, s *VReplStream, sh bufferingContextCancel() // force re-read of tables if err := tmClient.RefreshState(grpcCtx, tablet.Tablet); err != nil { - return err + return vterrors.Wrapf(err, "refreshing table state") } } log.Infof("toggled buffering: %t in migration %v", bufferQueries, onlineDDL.UUID) @@ -1073,7 +1076,7 @@ func (e *Executor) cutOverVReplMigration(ctx context.Context, s *VReplStream, sh err = toggleBuffering(true) defer reenableWritesOnce() if err != nil { - return err + return vterrors.Wrapf(err, "failed enabling buffering") } // Give a fraction of a second for a scenario where a query is in // query executor, it passed the ACLs and is _about to_ execute. This will be nicer to those queries: @@ -1086,10 +1089,10 @@ func (e *Executor) cutOverVReplMigration(ctx context.Context, s *VReplStream, sh // We should only proceed with forceful cut over if there is no pending atomic transaction for the table. // This will help in keeping the atomicity guarantee of a prepared transaction. if err := e.checkOnPreparedPool(ctx, onlineDDL.Table, 100*time.Millisecond); err != nil { - return err + return vterrors.Wrapf(err, "checking prepared pool for table") } if err := e.killTableLockHoldersAndAccessors(ctx, onlineDDL.Table); err != nil { - return err + return vterrors.Wrapf(err, "failed killing table lock holders and accessors") } } @@ -1112,7 +1115,7 @@ func (e *Executor) cutOverVReplMigration(ctx context.Context, s *VReplStream, sh defer cancel() lockTableQuery := sqlparser.BuildParsedQuery(sqlLockTwoTablesWrite, sentryTableName, onlineDDL.Table) if _, err := lockConn.Conn.Exec(lockCtx, lockTableQuery.Query, 1, false); err != nil { - return err + return vterrors.Wrapf(err, "failed locking tables") } e.updateMigrationStage(ctx, onlineDDL.UUID, "renaming tables") @@ -1124,7 +1127,7 @@ func (e *Executor) cutOverVReplMigration(ctx context.Context, s *VReplStream, sh // the rename should block, because of the LOCK. Wait for it to show up. e.updateMigrationStage(ctx, onlineDDL.UUID, "waiting for RENAME to block") if err := waitForRenameProcess(); err != nil { - return err + return vterrors.Wrapf(err, "failed waiting for rename process") } e.updateMigrationStage(ctx, onlineDDL.UUID, "RENAME found") } @@ -1132,7 +1135,7 @@ func (e *Executor) cutOverVReplMigration(ctx context.Context, s *VReplStream, sh e.updateMigrationStage(ctx, onlineDDL.UUID, "reading post-lock pos") postWritesPos, err := e.primaryPosition(ctx) if err != nil { - return err + return vterrors.Wrapf(err, "failed reading pos after locking") } // Right now: new queries are buffered, any existing query will have executed, and worst case scenario is @@ -1144,19 +1147,19 @@ func (e *Executor) cutOverVReplMigration(ctx context.Context, s *VReplStream, sh // Writes are now disabled on table. Read up-to-date vreplication info, specifically to get latest (and fixed) pos: s, err = e.readVReplStream(ctx, s.workflow, false) if err != nil { - return err + return vterrors.Wrapf(err, "failed reading vreplication table after locking") } e.updateMigrationStage(ctx, onlineDDL.UUID, "waiting for post-lock pos: %v", replication.EncodePosition(postWritesPos)) if err := waitForPos(s, postWritesPos); err != nil { e.updateMigrationStage(ctx, onlineDDL.UUID, "timeout while waiting for post-lock pos: %v", err) - return err + return vterrors.Wrapf(err, "failed waiting for pos after locking") } go log.Infof("cutOverVReplMigration %v: done waiting for position %v", s.workflow, replication.EncodePosition(postWritesPos)) // Stop vreplication e.updateMigrationStage(ctx, onlineDDL.UUID, "stopping vreplication") if _, err := e.vreplicationExec(ctx, tablet.Tablet, binlogplayer.StopVReplication(s.id, "stopped for online DDL cutover")); err != nil { - return err + return vterrors.Wrapf(err, "failed stopping vreplication") } go log.Infof("cutOverVReplMigration %v: stopped vreplication", s.workflow) @@ -1173,7 +1176,7 @@ func (e *Executor) cutOverVReplMigration(ctx context.Context, s *VReplStream, sh } else { e.updateMigrationStage(ctx, onlineDDL.UUID, "validating rename is still in place") if err := waitForRenameProcess(); err != nil { - return err + return vterrors.Wrapf(err, "failed waiting for rename process before dropping sentry table") } // Normal (non-testing) alter table @@ -1184,7 +1187,7 @@ func (e *Executor) cutOverVReplMigration(ctx context.Context, s *VReplStream, sh lockCtx, cancel := context.WithTimeout(ctx, migrationCutOverThreshold) defer cancel() if _, err := lockConn.Conn.Exec(lockCtx, dropTableQuery.Query, 1, false); err != nil { - return err + return vterrors.Wrapf(err, "failed dropping sentry table") } } { @@ -1192,7 +1195,7 @@ func (e *Executor) cutOverVReplMigration(ctx context.Context, s *VReplStream, sh defer cancel() e.updateMigrationStage(ctx, onlineDDL.UUID, "unlocking tables") if _, err := lockConn.Conn.Exec(lockCtx, sqlUnlockTables, 1, false); err != nil { - return err + return vterrors.Wrapf(err, "failed unlocking tables") } } { @@ -1200,7 +1203,7 @@ func (e *Executor) cutOverVReplMigration(ctx context.Context, s *VReplStream, sh defer cancel() e.updateMigrationStage(lockCtx, onlineDDL.UUID, "waiting for RENAME to complete") if err := <-renameCompleteChan; err != nil { - return err + return vterrors.Wrapf(err, "failed waiting for rename to complete") } renameWasSuccessful = true } @@ -3784,8 +3787,10 @@ func (e *Executor) reviewRunningMigrations(ctx context.Context) (countRunnning i if err := e.cutOverVReplMigration(ctx, s, shouldForceCutOver); err != nil { _ = e.updateMigrationMessage(ctx, uuid, err.Error()) log.Errorf("cutOverVReplMigration failed: err=%v", err) - if merr, ok := err.(*sqlerror.SQLError); ok { - switch merr.Num { + + if sqlErr, isSQLErr := sqlerror.NewSQLErrorFromError(err).(*sqlerror.SQLError); isSQLErr && sqlErr != nil { + // let's see if this error is actually acceptable + switch sqlErr.Num { case sqlerror.ERTooLongIdent: go e.CancelMigration(ctx, uuid, err.Error(), false) } @@ -5160,6 +5165,7 @@ func (e *Executor) OnSchemaMigrationStatus(ctx context.Context, return e.onSchemaMigrationStatus(ctx, uuidParam, status, dryRun, progressPct, etaSeconds, rowsCopied, hint) } +// checkOnPreparedPool checks if there are any cross-shard prepared transactions on the given table func (e *Executor) checkOnPreparedPool(ctx context.Context, table string, waitTime time.Duration) error { if e.isPreparedPoolEmpty(table) { return nil