Skip to content

Commit

Permalink
feat: add new rpc to read the transaction status along with the state…
Browse files Browse the repository at this point in the history
…ments to be executed

Signed-off-by: Manan Gupta <[email protected]>
  • Loading branch information
GuptaManan100 committed Nov 1, 2024
1 parent 0beb273 commit 39c43fb
Show file tree
Hide file tree
Showing 26 changed files with 3,550 additions and 1,942 deletions.
25 changes: 2 additions & 23 deletions go/test/endtoend/transaction/twopc/stress/stress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func TestSettings(t *testing.T) {
defer twopcutil.DeleteFile(twopcutil.DebugDelayCommitShard)
defer twopcutil.DeleteFile(twopcutil.DebugDelayCommitTime)
var wg sync.WaitGroup
runMultiShardCommitWithDelay(t, conn, tt.commitDelayTime, &wg, tt.queries)
twopcutil.RunMultiShardCommitWithDelay(t, conn, tt.commitDelayTime, &wg, tt.queries)
// Allow enough time for the commit to have started.
time.Sleep(1 * time.Second)
// Run the vttablet restart to ensure that the transaction needs to be redone.
Expand Down Expand Up @@ -218,7 +218,7 @@ func TestDisruptions(t *testing.T) {
defer twopcutil.DeleteFile(twopcutil.DebugDelayCommitShard)
defer twopcutil.DeleteFile(twopcutil.DebugDelayCommitTime)
var wg sync.WaitGroup
runMultiShardCommitWithDelay(t, conn, tt.commitDelayTime, &wg, append([]string{"begin"}, getMultiShardInsertQueries()...))
twopcutil.RunMultiShardCommitWithDelay(t, conn, tt.commitDelayTime, &wg, append([]string{"begin"}, getMultiShardInsertQueries()...))
// Allow enough time for the commit to have started.
time.Sleep(1 * time.Second)
writeCtx, writeCancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -261,27 +261,6 @@ func getMultiShardInsertQueries() []string {
return queries
}

// runMultiShardCommitWithDelay runs a multi shard commit and configures it to wait for a certain amount of time in the commit phase.
func runMultiShardCommitWithDelay(t *testing.T, conn *mysql.Conn, commitDelayTime string, wg *sync.WaitGroup, queries []string) {
// Run all the queries to start the transaction.
for _, query := range queries {
utils.Exec(t, conn, query)
}
// We want to delay the commit on one of the shards to simulate slow commits on a shard.
twopcutil.WriteTestCommunicationFile(t, twopcutil.DebugDelayCommitShard, "80-")
twopcutil.WriteTestCommunicationFile(t, twopcutil.DebugDelayCommitTime, commitDelayTime)
// We will execute a commit in a go routine, because we know it will take some time to complete.
// While the commit is ongoing, we would like to run the disruption.
wg.Add(1)
go func() {
defer wg.Done()
_, err := utils.ExecAllowError(t, conn, "commit")
if err != nil {
log.Errorf("Error in commit - %v", err)
}
}()
}

func mergeShards(t *testing.T) error {
return twopcutil.RunReshard(t, clusterInstance, "TestDisruptions", keyspaceName, "40-80,80-", "40-")
}
Expand Down
54 changes: 54 additions & 0 deletions go/test/endtoend/transaction/twopc/twopc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ import (
"vitess.io/vitess/go/vt/callerid"
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
querypb "vitess.io/vitess/go/vt/proto/query"
tabletpb "vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/vtgate/vtgateconn"
"vitess.io/vitess/go/vt/vttablet/grpctmclient"
)

// TestDTCommit tests distributed transaction commit for insert, update and delete operations
Expand Down Expand Up @@ -1369,3 +1371,55 @@ func TestSemiSyncRequiredWithTwoPC(t *testing.T) {
require.Error(t, err)
require.ErrorContains(t, err, "two-pc is enabled, but semi-sync is not")
}

// TestReadTransactionStatus tests that read transaction state rpc works as expected.
func TestReadTransactionStatus(t *testing.T) {
conn, closer := start(t)
defer closer()
defer conn.Close()
defer twopcutil.DeleteFile(twopcutil.DebugDelayCommitShard)
defer twopcutil.DeleteFile(twopcutil.DebugDelayCommitTime)

// We create a multi shard commit and delay its commit on one of the shards.
// This allows us to query to transaction status and actually get some data back.
var wg sync.WaitGroup
twopcutil.RunMultiShardCommitWithDelay(t, conn, "10", &wg, []string{
"begin",
"insert into twopc_t1(id, col) values(4, 4)",
"insert into twopc_t1(id, col) values(6, 4)",
"insert into twopc_t1(id, col) values(9, 4)",
})
// Allow enough time for the commit to have started.
time.Sleep(1 * time.Second)

// Create a tablet manager client and use it to read the transaction state.
tmc := grpctmclient.NewClient()
defer tmc.Close()
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel()

primaryTablet := getTablet(clusterInstance.Keyspaces[0].Shards[2].FindPrimaryTablet().GrpcPort)
var unresTransaction *querypb.TransactionMetadata
for _, shard := range clusterInstance.Keyspaces[0].Shards {
urtRes, err := tmc.GetUnresolvedTransactions(ctx, getTablet(shard.FindPrimaryTablet().GrpcPort), 1)
require.NoError(t, err)
if len(urtRes) > 0 {
unresTransaction = urtRes[0]
}
}
require.NotNil(t, unresTransaction)
res, err := tmc.ReadTransactionState(ctx, primaryTablet, unresTransaction.Dtid)
require.NoError(t, err)
assert.Equal(t, "PREPARED", res.State)
assert.Equal(t, "", res.Message)
assert.Equal(t, []string{"insert into twopc_t1(id, col) values (9, 4)"}, res.Statements)

// Wait for the commit to have returned.
wg.Wait()
}

func getTablet(tabletGrpcPort int) *tabletpb.Tablet {
portMap := make(map[string]int32)
portMap["grpc"] = int32(tabletGrpcPort)
return &tabletpb.Tablet{Hostname: hostname, PortMap: portMap}
}
22 changes: 22 additions & 0 deletions go/test/endtoend/transaction/twopc/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"path"
"slices"
"strings"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -94,6 +95,27 @@ func WriteTestCommunicationFile(t *testing.T, fileName string, content string) {
require.NoError(t, err)
}

// RunMultiShardCommitWithDelay runs a multi shard commit and configures it to wait for a certain amount of time in the commit phase.
func RunMultiShardCommitWithDelay(t *testing.T, conn *mysql.Conn, commitDelayTime string, wg *sync.WaitGroup, queries []string) {
// Run all the queries to start the transaction.
for _, query := range queries {
utils.Exec(t, conn, query)
}
// We want to delay the commit on one of the shards to simulate slow commits on a shard.
WriteTestCommunicationFile(t, DebugDelayCommitShard, "80-")
WriteTestCommunicationFile(t, DebugDelayCommitTime, commitDelayTime)
// We will execute a commit in a go routine, because we know it will take some time to complete.
// While the commit is ongoing, we would like to run the disruption.
wg.Add(1)
go func() {
defer wg.Done()
_, err := utils.ExecAllowError(t, conn, "commit")
if err != nil {
log.Errorf("Error in commit - %v", err)
}
}()
}

// DeleteFile deletes the file specified.
func DeleteFile(fileName string) {
_ = os.Remove(path.Join(os.Getenv("VTDATAROOT"), fileName))
Expand Down
Loading

0 comments on commit 39c43fb

Please sign in to comment.