diff --git a/go/mysql/query.go b/go/mysql/query.go index 26ba7095585..46060a39c94 100644 --- a/go/mysql/query.go +++ b/go/mysql/query.go @@ -23,8 +23,6 @@ import ( "strconv" "strings" - "vitess.io/vitess/go/vt/log" - "vitess.io/vitess/go/mysql/collations" "vitess.io/vitess/go/mysql/sqlerror" "vitess.io/vitess/go/sqltypes" @@ -350,12 +348,6 @@ func (c *Conn) drainMoreResults(more bool, err error) error { // It returns an additional 'more' flag. If it is set, you must fetch the additional // results using ReadQueryResult. func (c *Conn) ExecuteFetchMulti(query string, maxrows int, wantfields bool) (result *sqltypes.Result, more bool, err error) { - if strings.Contains(query, "rollback") || strings.Contains(query, "commit") || - strings.Contains(query, "begin") || - strings.Contains(query, "admins") && strings.Contains(query, "update") { - - log.Infof("$$$$$$$$$$$$$$ ExecuteFetch: %s", query) - } defer func() { if err != nil { if sqlerr, ok := err.(*sqlerror.SQLError); ok { diff --git a/go/test/endtoend/vreplication/cluster_test.go b/go/test/endtoend/vreplication/cluster_test.go index 13268fc749c..83545913587 100644 --- a/go/test/endtoend/vreplication/cluster_test.go +++ b/go/test/endtoend/vreplication/cluster_test.go @@ -345,7 +345,7 @@ func getClusterConfig(idx int, dataRootDir string) *ClusterConfig { func init() { // for local debugging set this variable so that each run uses VTDATAROOT instead of a random dir // and also does not teardown the cluster for inspecting logs and the databases - if os.Getenv("VREPLICATION_E2E_DEBUG") != "" { + if os.Getenv("VREPLICATION_E2E_DEBUG") != "on" { debugMode = true } originalVtdataroot = os.Getenv("VTDATAROOT") @@ -440,7 +440,10 @@ func (vc *VitessCluster) CleanupDataroot(t *testing.T, recreate bool) { // This is always set to "true" on GitHub Actions runners: // https://docs.github.com/en/actions/learn-github-actions/variables#default-environment-variables ci, ok := os.LookupEnv("CI") + fmt.Printf(">>>>>>>>>>>>>>. os.LookupEnv(\"CI\") returns ci, ok: %s, %t\n", ci, ok) if !ok || strings.ToLower(ci) != "true" { + fmt.Println("Not running in CI, skipping cleanup") + //panic("leaving directory") // Leave the directory in place to support local debugging. return } @@ -459,6 +462,7 @@ func (vc *VitessCluster) CleanupDataroot(t *testing.T, recreate bool) { time.Sleep(1 * time.Second) } require.NoError(t, err) + log.Infof("Recreating vtdataroot %q: %t", dir, recreate) if recreate { err = os.Mkdir(dir, 0700) require.NoError(t, err) diff --git a/go/test/endtoend/vreplication/workflow_duplicate_key_backoff_test.go b/go/test/endtoend/vreplication/workflow_duplicate_key_backoff_test.go index 0105106a7a4..81c2bb3a905 100644 --- a/go/test/endtoend/vreplication/workflow_duplicate_key_backoff_test.go +++ b/go/test/endtoend/vreplication/workflow_duplicate_key_backoff_test.go @@ -14,6 +14,16 @@ import ( ) func TestWorkflowDuplicateKeyBackoff(t *testing.T) { + t.Run("TestWorkflowDuplicateKeyBackoff with batching off", func(t *testing.T) { + testWorkflowDuplicateKeyBackoff(t, false) + }) + t.Run("TestWorkflowDuplicateKeyBackoff with batching on", func(t *testing.T) { + testWorkflowDuplicateKeyBackoff(t, true) + }) +} + +func testWorkflowDuplicateKeyBackoff(t *testing.T, setExperimentalFlags bool) { + debugMode = false setSidecarDBName("_vt") origDefaultRdonly := defaultRdonly origDefailtReplica := defaultReplicas @@ -23,7 +33,9 @@ func TestWorkflowDuplicateKeyBackoff(t *testing.T) { }() defaultRdonly = 0 defaultReplicas = 0 - setAllVTTabletExperimentalFlags() + if setExperimentalFlags { + setAllVTTabletExperimentalFlags() + } setupMinimalCluster(t) vttablet.InitVReplicationConfigDefaults() diff --git a/go/vt/vttablet/tabletmanager/vreplication/vdbclient.go b/go/vt/vttablet/tabletmanager/vreplication/vdbclient.go index 5a0f7255ace..d52ac306c39 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vdbclient.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vdbclient.go @@ -103,6 +103,10 @@ func (vc *vdbClient) CommitTrxQueryBatch() error { return nil } +func (vc *vdbClient) GetQueries() []string { + return vc.queries +} + func (vc *vdbClient) Rollback() error { if !vc.InTransaction { return nil diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go index b813b6d006a..25696c05afd 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go @@ -63,6 +63,8 @@ type vplayer struct { replicatorPlan *ReplicatorPlan tablePlans map[string]*TablePlan + ctx context.Context + // These are set when creating the VPlayer based on whether the VPlayer // is in batch (stmt and trx) execution mode or not. query func(ctx context.Context, sql string) (*sqltypes.Result, error) @@ -184,11 +186,45 @@ func newVPlayer(vr *vreplicator, settings binlogplayer.VRSettings, copyState map } return nil, vr.dbClient.AddQueryToTrxBatch(sql) // Should become part of the trx batch } + unrollBatch := func() error { + batchedQueries := vr.dbClient.GetQueries() + if len(batchedQueries) == 0 { + return nil + } + for _, query := range batchedQueries { + log.Infof("Unrolling batch: exec %v", query) + + _, err := vr.dbClient.Execute(query) + if err != nil { + log.Infof("Unrolling batch: failed to exec %v: %v", query, err) + if vp.mustBackoff(err) { + log.Infof("Unrolling batch: backoff needed for query: %v", query) + if vp.hasSkippedCommit { + log.Infof("Unrolling batch: found skipped Commit, issuing a commit before retrying the query: %v", query) + if err := vr.dbClient.Commit(); err != nil { + return err + } + if err := vr.dbClient.Begin(); err != nil { + return err + } + } + _, err2 := vp.backoffAndRetry(vp.ctx, query) + if err2 != nil { + return err2 + } + } + } else { + log.Infof("Unrolling batch: exec %v succeeded", query) + } + } + return vr.dbClient.Commit() + } commitFunc = func() error { - log.Infof("Batch Commit func: In Transaction? %v", vr.dbClient.InTransaction) + log.Infof("Batch Commit func: In Transaction %v", vr.dbClient.InTransaction) if vp.inBackoff { - // We get into backoff when there is a ERDupQuery error. - vr.dbClient.PopLastQueryFromBatch() + // We get into backoff when there is a ERDupQuery error. So one of the queries in the batch is + // causing the issue. We need to run all queries until that one first and then backoff/retry that one + return unrollBatch() } return vr.dbClient.CommitTrxQueryBatch() // Commit the current trx batch } @@ -197,21 +233,24 @@ func newVPlayer(vr *vreplicator, settings binlogplayer.VRSettings, copyState map wrappedCommitFunc := func() error { vp.hasSkippedCommit = false - + err := commitFunc() + if !vp.batchMode { + return err + } + vp.inBackoff = true + defer func() { + vp.inBackoff = false + }() + log.Infof("In backoff in wrapped commit func for batch mode, batched queries: %v", vp.vr.dbClient.GetQueries()) return commitFunc() } vp.commit = wrappedCommitFunc wrappedQueryFunc := func(ctx context.Context, sql string) (*sqltypes.Result, error) { result, err := queryFunc(ctx, sql) - var sqlErr *sqlerror.SQLError - isSqlErr := errors.As(err, &sqlErr) - if err != nil { - log.Errorf(">>>>>>>>>>>> Error executing query: %v, isSqlErr %t ,err: %v", sql, isSqlErr, err) - } - if err != nil && isSqlErr && - sqlErr.Number() == sqlerror.ERDupEntry && vp.isMergeWorkflow { - return vp.backoffAndRetry(ctx, sql, err) + log.Infof("wrapped query func: %v, err: %v", sql, err) + if err != nil && vp.mustBackoff(err) { + return vp.backoffAndRetry(ctx, sql) } return result, err } @@ -227,6 +266,17 @@ func (vp *vplayer) isRetryable(err error) bool { return false } +func (vp *vplayer) mustBackoff(err error) bool { + var sqlErr *sqlerror.SQLError + isSqlErr := errors.As(err, &sqlErr) + if err != nil && isSqlErr && + sqlErr.Number() == sqlerror.ERDupEntry && vp.isMergeWorkflow { + log.Infof("mustBackoff for err: %v", err) + return true + } + return false +} + func (vp *vplayer) executeWithRetryAndBackoff(ctx context.Context, query string) (*sqltypes.Result, error) { // We will retry the query if it fails with a duplicate entry error. Since this will be a non-recoverable error // we should wait for a longer time than we would usually do. The backoff is intended to let the other streams catch up @@ -267,7 +317,8 @@ func (vp *vplayer) executeWithRetryAndBackoff(ctx context.Context, query string) } // backoffAndRetry retries the query after a backoff period. -func (vp *vplayer) backoffAndRetry(ctx context.Context, sql string, err error) (*sqltypes.Result, error) { +func (vp *vplayer) backoffAndRetry(ctx context.Context, sql string) (*sqltypes.Result, error) { + vp.ctx = ctx vp.dontSkipCommits = true log.Infof("Setting inBackoff to true for query: %v", sql) vp.inBackoff = true @@ -276,17 +327,19 @@ func (vp *vplayer) backoffAndRetry(ctx context.Context, sql string, err error) ( vp.inBackoff = false }() //FIXME : set dontSkipCommits to false after some time? - if vp.hasSkippedCommit { - log.Infof(">>>>>>>> found skipped Commit, issuing a commit before retrying the query: %v", sql) - if err := vp.commit(); err != nil { - return nil, err - } // vp.hasSkippedCommit is reset in the wrapped commit function vp.commit() - if err := vp.vr.dbClient.Begin(); err != nil { - return nil, err + if !vp.batchMode { + if vp.hasSkippedCommit { + log.Infof(">>>>>>>> found skipped Commit, issuing a commit before retrying the query: %v", sql) + if err := vp.commit(); err != nil { + return nil, err + } // vp.hasSkippedCommit is reset in the wrapped commit function vp.commit() + if err := vp.vr.dbClient.Begin(); err != nil { + return nil, err + } } + return vp.executeWithRetryAndBackoff(ctx, sql) } - return vp.executeWithRetryAndBackoff(ctx, sql) - + return nil, vp.commit() // is batch mode } // play is the entry point for playing binlogs. @@ -662,7 +715,7 @@ func (vp *vplayer) applyEvents(ctx context.Context, relay *relayLog) error { // applying the next set of events as part of the current transaction. This approach // also handles the case where the last transaction is partial. In that case, // we only group the transactions with commits we've seen so far. - if /*vp.dontSkipCommits && */ hasAnotherCommit(items, i, j+1) { + if vp.dontSkipCommits && hasAnotherCommit(items, i, j+1) { log.Infof(">>>>>>>> skipping commit") vp.hasSkippedCommit = true continue