Skip to content

Commit

Permalink
Atomic Transaction StartCommit api to return the commit state (#17116)
Browse files Browse the repository at this point in the history
Signed-off-by: Harshit Gangal <[email protected]>
  • Loading branch information
harshit-gangal authored Nov 8, 2024
1 parent de16aff commit 78f7db2
Show file tree
Hide file tree
Showing 23 changed files with 1,102 additions and 831 deletions.
1,540 changes: 803 additions & 737 deletions go/vt/proto/query/query.pb.go

Large diffs are not rendered by default.

28 changes: 28 additions & 0 deletions go/vt/proto/query/query_vtproto.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions go/vt/vtcombo/tablet_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -539,9 +539,9 @@ func (itc *internalTabletConn) CreateTransaction(
}

// StartCommit is part of queryservice.QueryService
func (itc *internalTabletConn) StartCommit(ctx context.Context, target *querypb.Target, transactionID int64, dtid string) error {
err := itc.tablet.qsc.QueryService().StartCommit(ctx, target, transactionID, dtid)
return tabletconn.ErrorFromGRPC(vterrors.ToGRPC(err))
func (itc *internalTabletConn) StartCommit(ctx context.Context, target *querypb.Target, transactionID int64, dtid string) (querypb.StartCommitState, error) {
state, err := itc.tablet.qsc.QueryService().StartCommit(ctx, target, transactionID, dtid)
return state, tabletconn.ErrorFromGRPC(vterrors.ToGRPC(err))
}

// SetRollback is part of queryservice.QueryService
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtexplain/vtexplain_vttablet.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ func (t *explainTablet) CreateTransaction(ctx context.Context, target *querypb.T
}

// StartCommit is part of the QueryService interface.
func (t *explainTablet) StartCommit(ctx context.Context, target *querypb.Target, transactionID int64, dtid string) (err error) {
func (t *explainTablet) StartCommit(ctx context.Context, target *querypb.Target, transactionID int64, dtid string) (state querypb.StartCommitState, err error) {
t.mu.Lock()
t.currentTime = t.vte.batchTime.Wait()
t.mu.Unlock()
Expand Down
44 changes: 35 additions & 9 deletions go/vt/vtgate/tx_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,14 @@ const (
Commit2pcConclude
)

var phaseMessage = map[commitPhase]string{
Commit2pcCreateTransaction: "Create Transaction",
Commit2pcPrepare: "Prepare",
Commit2pcStartCommit: "Start Commit",
Commit2pcPrepareCommit: "Prepare Commit",
Commit2pcConclude: "Conclude",
}

// Begin begins a new transaction. If one is already in progress, it commits it
// and starts a new one.
func (txc *TxConn) Begin(ctx context.Context, session *SafeSession, txAccessModes []sqlparser.TxAccessMode) error {
Expand Down Expand Up @@ -221,11 +229,12 @@ func (txc *TxConn) commit2PC(ctx context.Context, session *SafeSession) (err err
}

var txPhase commitPhase
var startCommitState querypb.StartCommitState
defer func() {
if err == nil {
return
}
txc.errActionAndLogWarn(ctx, session, txPhase, dtid, mmShard, rmShards)
txc.errActionAndLogWarn(ctx, session, txPhase, startCommitState, dtid, mmShard, rmShards)
}()

txPhase = Commit2pcCreateTransaction
Expand Down Expand Up @@ -259,7 +268,7 @@ func (txc *TxConn) commit2PC(ctx context.Context, session *SafeSession) (err err
}

txPhase = Commit2pcStartCommit
err = txc.tabletGateway.StartCommit(ctx, mmShard.Target, mmShard.TransactionId, dtid)
startCommitState, err = txc.tabletGateway.StartCommit(ctx, mmShard.Target, mmShard.TransactionId, dtid)
if err != nil {
return err
}
Expand Down Expand Up @@ -298,21 +307,38 @@ func (txc *TxConn) checkValidCondition(session *SafeSession) error {
return nil
}

func (txc *TxConn) errActionAndLogWarn(ctx context.Context, session *SafeSession, txPhase commitPhase, dtid string, mmShard *vtgatepb.Session_ShardSession, rmShards []*vtgatepb.Session_ShardSession) {
func (txc *TxConn) errActionAndLogWarn(
ctx context.Context,
session *SafeSession,
txPhase commitPhase,
startCommitState querypb.StartCommitState,
dtid string,
mmShard *vtgatepb.Session_ShardSession,
rmShards []*vtgatepb.Session_ShardSession,
) {
var rollbackErr error
switch txPhase {
case Commit2pcCreateTransaction:
// Normal rollback is safe because nothing was prepared yet.
if rollbackErr := txc.Rollback(ctx, session); rollbackErr != nil {
log.Warningf("Rollback failed after Create Transaction failure: %v", rollbackErr)
}
rollbackErr = txc.Rollback(ctx, session)
case Commit2pcPrepare:
// Rollback the prepared and unprepared transactions.
if resumeErr := txc.rollbackTx(ctx, dtid, mmShard, rmShards, session.logging); resumeErr != nil {
log.Warningf("Rollback failed after Prepare failure: %v", resumeErr)
rollbackErr = txc.rollbackTx(ctx, dtid, mmShard, rmShards, session.logging)
case Commit2pcStartCommit:
// Failed to store the commit decision on MM.
// If the failure state is certain, then the only option is to rollback the prepared transactions on the RMs.
if startCommitState == querypb.StartCommitState_Fail {
rollbackErr = txc.rollbackTx(ctx, dtid, mmShard, rmShards, session.logging)
}
case Commit2pcStartCommit, Commit2pcPrepareCommit:
fallthrough
case Commit2pcPrepareCommit:
commitUnresolved.Add(1)
}
if rollbackErr != nil {
log.Warningf("Rollback failed after %s failure: %v", phaseMessage[txPhase], rollbackErr)
commitUnresolved.Add(1)
}

session.RecordWarning(&querypb.QueryWarning{
Code: uint32(sqlerror.ERInAtomicRecovery),
Message: createWarningMessage(dtid, txPhase)})
Expand Down
38 changes: 26 additions & 12 deletions go/vt/vtgate/tx_conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -994,9 +994,7 @@ func TestTxConnCommit2PCCreateTransactionFail(t *testing.T) {
sbc0.MustFailCreateTransaction = 1
session.TransactionMode = vtgatepb.TransactionMode_TWOPC
err := sc.txConn.Commit(ctx, session)
want := "error: err"
require.Error(t, err)
assert.Contains(t, err.Error(), want, "Commit")
require.ErrorContains(t, err, "target: TestTxConnCommit2PCCreateTransactionFail.0.primary: error: err")
assert.EqualValues(t, 1, sbc0.CreateTransactionCount.Load(), "sbc0.CreateTransactionCount")
assert.EqualValues(t, 1, sbc0.RollbackCount.Load(), "sbc0.RollbackCount")
assert.EqualValues(t, 1, sbc1.RollbackCount.Load(), "sbc1.RollbackCount")
Expand All @@ -1018,9 +1016,7 @@ func TestTxConnCommit2PCPrepareFail(t *testing.T) {
sbc1.MustFailPrepare = 1
session.TransactionMode = vtgatepb.TransactionMode_TWOPC
err := sc.txConn.Commit(ctx, session)
want := "error: err"
require.Error(t, err)
assert.Contains(t, err.Error(), want, "Commit")
require.ErrorContains(t, err, "target: TestTxConnCommit2PCPrepareFail.1.primary: error: err")
assert.EqualValues(t, 1, sbc0.CreateTransactionCount.Load(), "sbc0.CreateTransactionCount")
assert.EqualValues(t, 1, sbc1.PrepareCount.Load(), "sbc1.PrepareCount")
// Prepared failed on RM, so no commit on MM or RMs.
Expand All @@ -1046,13 +1042,33 @@ func TestTxConnCommit2PCStartCommitFail(t *testing.T) {
sbc0.MustFailStartCommit = 1
session.TransactionMode = vtgatepb.TransactionMode_TWOPC
err := sc.txConn.Commit(ctx, session)
want := "error: err"
require.Error(t, err)
assert.Contains(t, err.Error(), want, "Commit")
require.ErrorContains(t, err, "target: TestTxConnCommit2PCStartCommitFail.0.primary: error: err")
assert.EqualValues(t, 1, sbc0.CreateTransactionCount.Load(), "sbc0.CreateTransactionCount")
assert.EqualValues(t, 1, sbc1.PrepareCount.Load(), "sbc1.PrepareCount")
assert.EqualValues(t, 1, sbc0.StartCommitCount.Load(), "sbc0.StartCommitCount")
assert.EqualValues(t, 0, sbc1.CommitPreparedCount.Load(), "sbc1.CommitPreparedCount")
assert.EqualValues(t, 1, sbc0.SetRollbackCount.Load(), "MM")
assert.EqualValues(t, 1, sbc1.RollbackPreparedCount.Load(), "RM")
assert.EqualValues(t, 1, sbc0.ConcludeTransactionCount.Load(), "sbc0.ConcludeTransactionCount")

sbc0.ResetCounter()
sbc1.ResetCounter()

session = NewSafeSession(&vtgatepb.Session{InTransaction: true})
sc.ExecuteMultiShard(ctx, nil, rss0, queries, session, false, false, nullResultsObserver{})
sc.ExecuteMultiShard(ctx, nil, rss01, twoQueries, session, false, false, nullResultsObserver{})

// Here the StartCommit failure is in uncertain state so rollback is not called and neither conclude.
sbc0.MustFailStartCommitUncertain = 1
session.TransactionMode = vtgatepb.TransactionMode_TWOPC
err = sc.txConn.Commit(ctx, session)
require.ErrorContains(t, err, "target: TestTxConnCommit2PCStartCommitFail.0.primary: uncertain error")
assert.EqualValues(t, 1, sbc0.CreateTransactionCount.Load(), "sbc0.CreateTransactionCount")
assert.EqualValues(t, 1, sbc1.PrepareCount.Load(), "sbc1.PrepareCount")
assert.EqualValues(t, 1, sbc0.StartCommitCount.Load(), "sbc0.StartCommitCount")
assert.EqualValues(t, 0, sbc1.CommitPreparedCount.Load(), "sbc1.CommitPreparedCount")
assert.EqualValues(t, 0, sbc0.SetRollbackCount.Load(), "MM")
assert.EqualValues(t, 0, sbc1.RollbackPreparedCount.Load(), "RM")
assert.EqualValues(t, 0, sbc0.ConcludeTransactionCount.Load(), "sbc0.ConcludeTransactionCount")
}

Expand All @@ -1068,9 +1084,7 @@ func TestTxConnCommit2PCCommitPreparedFail(t *testing.T) {
sbc1.MustFailCommitPrepared = 1
session.TransactionMode = vtgatepb.TransactionMode_TWOPC
err := sc.txConn.Commit(ctx, session)
want := "error: err"
require.Error(t, err)
assert.Contains(t, err.Error(), want, "Commit")
require.ErrorContains(t, err, "target: TestTxConnCommit2PCCommitPreparedFail.1.primary: error: err")
assert.EqualValues(t, 1, sbc0.CreateTransactionCount.Load(), "sbc0.CreateTransactionCount")
assert.EqualValues(t, 1, sbc1.PrepareCount.Load(), "sbc1.PrepareCount")
assert.EqualValues(t, 1, sbc0.StartCommitCount.Load(), "sbc0.StartCommitCount")
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/endtoend/framework/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ func (client *QueryClient) CreateTransaction(dtid string, participants []*queryp
}

// StartCommit issues a StartCommit to TabletServer for the current transaction.
func (client *QueryClient) StartCommit(dtid string) error {
func (client *QueryClient) StartCommit(dtid string) (querypb.StartCommitState, error) {
defer func() { client.transactionID = 0 }()
return client.server.StartCommit(client.ctx, client.target, client.transactionID, dtid)
}
Expand Down
3 changes: 2 additions & 1 deletion go/vt/vttablet/endtoend/transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -501,8 +501,9 @@ func TestMMCommitFlow(t *testing.T) {
require.Error(t, err)
require.Contains(t, err.Error(), "Duplicate entry")

err = client.StartCommit("aa")
state, err := client.StartCommit("aa")
require.NoError(t, err)
assert.Equal(t, querypb.StartCommitState_Success, state)

err = client.SetRollback("aa", 0)
require.EqualError(t, err, "could not transition to ROLLBACK: aa (CallerID: dev)")
Expand Down
7 changes: 2 additions & 5 deletions go/vt/vttablet/grpcqueryservice/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,11 +183,8 @@ func (q *query) StartCommit(ctx context.Context, request *querypb.StartCommitReq
request.EffectiveCallerId,
request.ImmediateCallerId,
)
if err := q.server.StartCommit(ctx, request.Target, request.TransactionId, request.Dtid); err != nil {
return nil, vterrors.ToGRPC(err)
}

return &querypb.StartCommitResponse{}, nil
state, err := q.server.StartCommit(ctx, request.Target, request.TransactionId, request.Dtid)
return &querypb.StartCommitResponse{State: state}, vterrors.ToGRPC(err)
}

// SetRollback is part of the queryservice.QueryServer interface
Expand Down
14 changes: 8 additions & 6 deletions go/vt/vttablet/grpctabletconn/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,11 +351,12 @@ func (conn *gRPCQueryClient) CreateTransaction(ctx context.Context, target *quer

// StartCommit atomically commits the transaction along with the
// decision to commit the associated 2pc transaction.
func (conn *gRPCQueryClient) StartCommit(ctx context.Context, target *querypb.Target, transactionID int64, dtid string) error {
func (conn *gRPCQueryClient) StartCommit(ctx context.Context, target *querypb.Target, transactionID int64, dtid string) (querypb.StartCommitState, error) {
conn.mu.RLock()
defer conn.mu.RUnlock()
if conn.cc == nil {
return tabletconn.ConnClosed
// This can be marked as fail as not other process will try to commit this transaction.
return querypb.StartCommitState_Fail, tabletconn.ConnClosed
}

req := &querypb.StartCommitRequest{
Expand All @@ -365,11 +366,12 @@ func (conn *gRPCQueryClient) StartCommit(ctx context.Context, target *querypb.Ta
TransactionId: transactionID,
Dtid: dtid,
}
_, err := conn.c.StartCommit(ctx, req)
if err != nil {
return tabletconn.ErrorFromGRPC(err)
resp, err := conn.c.StartCommit(ctx, req)
err = tabletconn.ErrorFromGRPC(err)
if resp != nil {
return resp.State, err
}
return nil
return querypb.StartCommitState_Unknown, err
}

// SetRollback transitions the 2pc transaction to the Rollback state.
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/queryservice/queryservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ type QueryService interface {

// StartCommit atomically commits the transaction along with the
// decision to commit the associated 2pc transaction.
StartCommit(ctx context.Context, target *querypb.Target, transactionID int64, dtid string) (err error)
StartCommit(ctx context.Context, target *querypb.Target, transactionID int64, dtid string) (state querypb.StartCommitState, err error)

// SetRollback transitions the 2pc transaction to the Rollback state.
// If a transaction id is provided, that transaction is also rolled back.
Expand Down
8 changes: 5 additions & 3 deletions go/vt/vttablet/queryservice/wrapped.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,11 +146,13 @@ func (ws *wrappedService) CreateTransaction(ctx context.Context, target *querypb
})
}

func (ws *wrappedService) StartCommit(ctx context.Context, target *querypb.Target, transactionID int64, dtid string) (err error) {
return ws.wrapper(ctx, target, ws.impl, "StartCommit", true, func(ctx context.Context, target *querypb.Target, conn QueryService) (bool, error) {
innerErr := conn.StartCommit(ctx, target, transactionID, dtid)
func (ws *wrappedService) StartCommit(ctx context.Context, target *querypb.Target, transactionID int64, dtid string) (state querypb.StartCommitState, err error) {
err = ws.wrapper(ctx, target, ws.impl, "StartCommit", true, func(ctx context.Context, target *querypb.Target, conn QueryService) (bool, error) {
var innerErr error
state, innerErr = conn.StartCommit(ctx, target, transactionID, dtid)
return canRetry(ctx, innerErr), innerErr
})
return state, err
}

func (ws *wrappedService) SetRollback(ctx context.Context, target *querypb.Target, dtid string, transactionID int64) (err error) {
Expand Down
50 changes: 40 additions & 10 deletions go/vt/vttablet/sandboxconn/sandboxconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,14 @@ type SandboxConn struct {

// These errors are triggered only for specific functions.
// For now these are just for the 2PC functions.
MustFailPrepare int
MustFailCommitPrepared int
MustFailRollbackPrepared int
MustFailCreateTransaction int
MustFailStartCommit int
MustFailSetRollback int
MustFailConcludeTransaction int
MustFailPrepare int
MustFailCommitPrepared int
MustFailRollbackPrepared int
MustFailCreateTransaction int
MustFailStartCommit int
MustFailStartCommitUncertain int
MustFailSetRollback int
MustFailConcludeTransaction int
// MustFailExecute is keyed by the statement type and stores the number
// of times to fail when it sees that statement type.
// Once, exhausted it will start returning non-error response.
Expand Down Expand Up @@ -157,6 +158,27 @@ func NewSandboxConn(t *topodatapb.Tablet) *SandboxConn {
}
}

// ResetCounter resets the counters in the sandboxconn.
func (sbc *SandboxConn) ResetCounter() {
sbc.ExecCount.Store(0)
sbc.BeginCount.Store(0)
sbc.CommitCount.Store(0)
sbc.RollbackCount.Store(0)
sbc.AsTransactionCount.Store(0)
sbc.PrepareCount.Store(0)
sbc.CommitPreparedCount.Store(0)
sbc.RollbackPreparedCount.Store(0)
sbc.CreateTransactionCount.Store(0)
sbc.StartCommitCount.Store(0)
sbc.SetRollbackCount.Store(0)
sbc.ConcludeTransactionCount.Store(0)
sbc.ReadTransactionCount.Store(0)
sbc.UnresolvedTransactionsCount.Store(0)
sbc.ReserveCount.Store(0)
sbc.ReleaseCount.Store(0)
sbc.GetSchemaCount.Store(0)
}

// RequireQueriesLocking sets the sandboxconn to require locking the access of Queries field.
func (sbc *SandboxConn) RequireQueriesLocking() {
sbc.queriesRequireLocking = true
Expand Down Expand Up @@ -404,14 +426,22 @@ func (sbc *SandboxConn) CreateTransaction(ctx context.Context, target *querypb.T

// StartCommit atomically commits the transaction along with the
// decision to commit the associated 2pc transaction.
func (sbc *SandboxConn) StartCommit(ctx context.Context, target *querypb.Target, transactionID int64, dtid string) (err error) {
func (sbc *SandboxConn) StartCommit(context.Context, *querypb.Target, int64, string) (state querypb.StartCommitState, err error) {
sbc.panicIfNeeded()
sbc.StartCommitCount.Add(1)
if sbc.MustFailStartCommit > 0 {
sbc.MustFailStartCommit--
return vterrors.New(vtrpcpb.Code_FAILED_PRECONDITION, "error: err")
return querypb.StartCommitState_Fail, vterrors.New(vtrpcpb.Code_FAILED_PRECONDITION, "error: err")
}
return sbc.getError()
if sbc.MustFailStartCommitUncertain > 0 {
sbc.MustFailStartCommitUncertain--
return querypb.StartCommitState_Unknown, vterrors.New(vtrpcpb.Code_FAILED_PRECONDITION, "uncertain error")
}
err = sbc.getError()
if err != nil {
return querypb.StartCommitState_Unknown, err
}
return querypb.StartCommitState_Success, nil
}

// SetRollback transitions the 2pc transaction to the Rollback state.
Expand Down
Loading

0 comments on commit 78f7db2

Please sign in to comment.