Skip to content

Commit

Permalink
Working e2e tests with and without transaction batching. Leaving comm…
Browse files Browse the repository at this point in the history
…ents added for debugging. Code needs better organization

Signed-off-by: Rohit Nayak <[email protected]>
  • Loading branch information
rohit-nayak-ps committed Nov 7, 2024
1 parent 86cf2d5 commit 4e053df
Show file tree
Hide file tree
Showing 5 changed files with 98 additions and 33 deletions.
8 changes: 0 additions & 8 deletions go/mysql/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ import (
"strconv"
"strings"

"vitess.io/vitess/go/vt/log"

"vitess.io/vitess/go/mysql/collations"
"vitess.io/vitess/go/mysql/sqlerror"
"vitess.io/vitess/go/sqltypes"
Expand Down Expand Up @@ -350,12 +348,6 @@ func (c *Conn) drainMoreResults(more bool, err error) error {
// It returns an additional 'more' flag. If it is set, you must fetch the additional
// results using ReadQueryResult.
func (c *Conn) ExecuteFetchMulti(query string, maxrows int, wantfields bool) (result *sqltypes.Result, more bool, err error) {
if strings.Contains(query, "rollback") || strings.Contains(query, "commit") ||
strings.Contains(query, "begin") ||
strings.Contains(query, "admins") && strings.Contains(query, "update") {

log.Infof("$$$$$$$$$$$$$$ ExecuteFetch: %s", query)
}
defer func() {
if err != nil {
if sqlerr, ok := err.(*sqlerror.SQLError); ok {
Expand Down
6 changes: 5 additions & 1 deletion go/test/endtoend/vreplication/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ func getClusterConfig(idx int, dataRootDir string) *ClusterConfig {
func init() {
// for local debugging set this variable so that each run uses VTDATAROOT instead of a random dir
// and also does not teardown the cluster for inspecting logs and the databases
if os.Getenv("VREPLICATION_E2E_DEBUG") != "" {
if os.Getenv("VREPLICATION_E2E_DEBUG") != "on" {
debugMode = true
}
originalVtdataroot = os.Getenv("VTDATAROOT")
Expand Down Expand Up @@ -440,7 +440,10 @@ func (vc *VitessCluster) CleanupDataroot(t *testing.T, recreate bool) {
// This is always set to "true" on GitHub Actions runners:
// https://docs.github.com/en/actions/learn-github-actions/variables#default-environment-variables
ci, ok := os.LookupEnv("CI")
fmt.Printf(">>>>>>>>>>>>>>. os.LookupEnv(\"CI\") returns ci, ok: %s, %t\n", ci, ok)
if !ok || strings.ToLower(ci) != "true" {
fmt.Println("Not running in CI, skipping cleanup")
//panic("leaving directory")
// Leave the directory in place to support local debugging.
return
}
Expand All @@ -459,6 +462,7 @@ func (vc *VitessCluster) CleanupDataroot(t *testing.T, recreate bool) {
time.Sleep(1 * time.Second)
}
require.NoError(t, err)
log.Infof("Recreating vtdataroot %q: %t", dir, recreate)
if recreate {
err = os.Mkdir(dir, 0700)
require.NoError(t, err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,16 @@ import (
)

func TestWorkflowDuplicateKeyBackoff(t *testing.T) {
t.Run("TestWorkflowDuplicateKeyBackoff with batching off", func(t *testing.T) {
testWorkflowDuplicateKeyBackoff(t, false)
})
t.Run("TestWorkflowDuplicateKeyBackoff with batching on", func(t *testing.T) {
testWorkflowDuplicateKeyBackoff(t, true)
})
}

func testWorkflowDuplicateKeyBackoff(t *testing.T, setExperimentalFlags bool) {
debugMode = false
setSidecarDBName("_vt")
origDefaultRdonly := defaultRdonly
origDefailtReplica := defaultReplicas
Expand All @@ -23,7 +33,9 @@ func TestWorkflowDuplicateKeyBackoff(t *testing.T) {
}()
defaultRdonly = 0
defaultReplicas = 0
setAllVTTabletExperimentalFlags()
if setExperimentalFlags {
setAllVTTabletExperimentalFlags()
}

setupMinimalCluster(t)
vttablet.InitVReplicationConfigDefaults()
Expand Down
4 changes: 4 additions & 0 deletions go/vt/vttablet/tabletmanager/vreplication/vdbclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,10 @@ func (vc *vdbClient) CommitTrxQueryBatch() error {
return nil
}

func (vc *vdbClient) GetQueries() []string {
return vc.queries
}

func (vc *vdbClient) Rollback() error {
if !vc.InTransaction {
return nil
Expand Down
99 changes: 76 additions & 23 deletions go/vt/vttablet/tabletmanager/vreplication/vplayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ type vplayer struct {
replicatorPlan *ReplicatorPlan
tablePlans map[string]*TablePlan

ctx context.Context

// These are set when creating the VPlayer based on whether the VPlayer
// is in batch (stmt and trx) execution mode or not.
query func(ctx context.Context, sql string) (*sqltypes.Result, error)
Expand Down Expand Up @@ -184,11 +186,45 @@ func newVPlayer(vr *vreplicator, settings binlogplayer.VRSettings, copyState map
}
return nil, vr.dbClient.AddQueryToTrxBatch(sql) // Should become part of the trx batch
}
unrollBatch := func() error {
batchedQueries := vr.dbClient.GetQueries()
if len(batchedQueries) == 0 {
return nil
}
for _, query := range batchedQueries {
log.Infof("Unrolling batch: exec %v", query)

_, err := vr.dbClient.Execute(query)
if err != nil {
log.Infof("Unrolling batch: failed to exec %v: %v", query, err)
if vp.mustBackoff(err) {
log.Infof("Unrolling batch: backoff needed for query: %v", query)
if vp.hasSkippedCommit {
log.Infof("Unrolling batch: found skipped Commit, issuing a commit before retrying the query: %v", query)
if err := vr.dbClient.Commit(); err != nil {
return err
}
if err := vr.dbClient.Begin(); err != nil {
return err
}
}
_, err2 := vp.backoffAndRetry(vp.ctx, query)
if err2 != nil {
return err2
}
}
} else {
log.Infof("Unrolling batch: exec %v succeeded", query)
}
}
return vr.dbClient.Commit()
}
commitFunc = func() error {
log.Infof("Batch Commit func: In Transaction? %v", vr.dbClient.InTransaction)
log.Infof("Batch Commit func: In Transaction %v", vr.dbClient.InTransaction)
if vp.inBackoff {
// We get into backoff when there is a ERDupQuery error.
vr.dbClient.PopLastQueryFromBatch()
// We get into backoff when there is a ERDupQuery error. So one of the queries in the batch is
// causing the issue. We need to run all queries until that one first and then backoff/retry that one
return unrollBatch()
}
return vr.dbClient.CommitTrxQueryBatch() // Commit the current trx batch
}
Expand All @@ -197,21 +233,24 @@ func newVPlayer(vr *vreplicator, settings binlogplayer.VRSettings, copyState map

wrappedCommitFunc := func() error {
vp.hasSkippedCommit = false

err := commitFunc()
if !vp.batchMode {
return err
}
vp.inBackoff = true
defer func() {
vp.inBackoff = false
}()
log.Infof("In backoff in wrapped commit func for batch mode, batched queries: %v", vp.vr.dbClient.GetQueries())
return commitFunc()
}
vp.commit = wrappedCommitFunc

wrappedQueryFunc := func(ctx context.Context, sql string) (*sqltypes.Result, error) {
result, err := queryFunc(ctx, sql)
var sqlErr *sqlerror.SQLError
isSqlErr := errors.As(err, &sqlErr)
if err != nil {
log.Errorf(">>>>>>>>>>>> Error executing query: %v, isSqlErr %t ,err: %v", sql, isSqlErr, err)
}
if err != nil && isSqlErr &&
sqlErr.Number() == sqlerror.ERDupEntry && vp.isMergeWorkflow {
return vp.backoffAndRetry(ctx, sql, err)
log.Infof("wrapped query func: %v, err: %v", sql, err)
if err != nil && vp.mustBackoff(err) {
return vp.backoffAndRetry(ctx, sql)
}
return result, err
}
Expand All @@ -227,6 +266,17 @@ func (vp *vplayer) isRetryable(err error) bool {
return false
}

func (vp *vplayer) mustBackoff(err error) bool {
var sqlErr *sqlerror.SQLError
isSqlErr := errors.As(err, &sqlErr)
if err != nil && isSqlErr &&
sqlErr.Number() == sqlerror.ERDupEntry && vp.isMergeWorkflow {
log.Infof("mustBackoff for err: %v", err)
return true
}
return false
}

func (vp *vplayer) executeWithRetryAndBackoff(ctx context.Context, query string) (*sqltypes.Result, error) {
// We will retry the query if it fails with a duplicate entry error. Since this will be a non-recoverable error
// we should wait for a longer time than we would usually do. The backoff is intended to let the other streams catch up
Expand Down Expand Up @@ -267,7 +317,8 @@ func (vp *vplayer) executeWithRetryAndBackoff(ctx context.Context, query string)
}

// backoffAndRetry retries the query after a backoff period.
func (vp *vplayer) backoffAndRetry(ctx context.Context, sql string, err error) (*sqltypes.Result, error) {
func (vp *vplayer) backoffAndRetry(ctx context.Context, sql string) (*sqltypes.Result, error) {
vp.ctx = ctx
vp.dontSkipCommits = true
log.Infof("Setting inBackoff to true for query: %v", sql)
vp.inBackoff = true
Expand All @@ -276,17 +327,19 @@ func (vp *vplayer) backoffAndRetry(ctx context.Context, sql string, err error) (
vp.inBackoff = false
}()
//FIXME : set dontSkipCommits to false after some time?
if vp.hasSkippedCommit {
log.Infof(">>>>>>>> found skipped Commit, issuing a commit before retrying the query: %v", sql)
if err := vp.commit(); err != nil {
return nil, err
} // vp.hasSkippedCommit is reset in the wrapped commit function vp.commit()
if err := vp.vr.dbClient.Begin(); err != nil {
return nil, err
if !vp.batchMode {
if vp.hasSkippedCommit {
log.Infof(">>>>>>>> found skipped Commit, issuing a commit before retrying the query: %v", sql)
if err := vp.commit(); err != nil {
return nil, err
} // vp.hasSkippedCommit is reset in the wrapped commit function vp.commit()
if err := vp.vr.dbClient.Begin(); err != nil {
return nil, err
}
}
return vp.executeWithRetryAndBackoff(ctx, sql)
}
return vp.executeWithRetryAndBackoff(ctx, sql)

return nil, vp.commit() // is batch mode
}

// play is the entry point for playing binlogs.
Expand Down Expand Up @@ -662,7 +715,7 @@ func (vp *vplayer) applyEvents(ctx context.Context, relay *relayLog) error {
// applying the next set of events as part of the current transaction. This approach
// also handles the case where the last transaction is partial. In that case,
// we only group the transactions with commits we've seen so far.
if /*vp.dontSkipCommits && */ hasAnotherCommit(items, i, j+1) {
if vp.dontSkipCommits && hasAnotherCommit(items, i, j+1) {
log.Infof(">>>>>>>> skipping commit")
vp.hasSkippedCommit = true
continue
Expand Down

0 comments on commit 4e053df

Please sign in to comment.