Skip to content

Commit

Permalink
VReplication: Support reversing read-only traffic in vtctldclient (#1…
Browse files Browse the repository at this point in the history
  • Loading branch information
mattlord authored Oct 20, 2024
1 parent aba477f commit 6c6499c
Show file tree
Hide file tree
Showing 8 changed files with 637 additions and 167 deletions.
5 changes: 4 additions & 1 deletion go/test/endtoend/vreplication/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,8 +331,11 @@ func executeOnTablet(t *testing.T, conn *mysql.Conn, tablet *cluster.VttabletPro

func assertQueryExecutesOnTablet(t *testing.T, conn *mysql.Conn, tablet *cluster.VttabletProcess, ksName string, query string, matchQuery string) {
t.Helper()
rr, err := vc.VtctldClient.ExecuteCommandWithOutput("GetRoutingRules")
require.NoError(t, err)
count0, body0, count1, body1 := executeOnTablet(t, conn, tablet, ksName, query, matchQuery)
assert.Equalf(t, count0+1, count1, "query %q did not execute in target;\ntried to match %q\nbefore:\n%s\n\nafter:\n%s\n\n", query, matchQuery, body0, body1)
require.Equalf(t, count0+1, count1, "query %q did not execute on destination %s (%s-%d);\ntried to match %q\nbefore:\n%s\n\nafter:\n%s\n\nrouting rules:\n%s\n\n",
query, ksName, tablet.Cell, tablet.TabletUID, matchQuery, body0, body1, rr)
}

func assertQueryDoesNotExecutesOnTablet(t *testing.T, conn *mysql.Conn, tablet *cluster.VttabletProcess, ksName string, query string, matchQuery string) {
Expand Down
9 changes: 8 additions & 1 deletion go/test/endtoend/vreplication/movetables_buffering_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,14 @@ import (
)

func TestMoveTablesBuffering(t *testing.T) {
defaultRdonly = 1
ogReplicas := defaultReplicas
ogRdOnly := defaultRdonly
defer func() {
defaultReplicas = ogReplicas
defaultRdonly = ogRdOnly
}()
defaultRdonly = 0
defaultReplicas = 0
vc = setupMinimalCluster(t)
defer vc.TearDown()

Expand Down
129 changes: 94 additions & 35 deletions go/test/endtoend/vreplication/resharding_workflows_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"fmt"
"math/rand/v2"
"net"
"slices"
"strconv"
"strings"
"sync"
Expand All @@ -35,9 +36,11 @@ import (
"vitess.io/vitess/go/test/endtoend/cluster"
"vitess.io/vitess/go/test/endtoend/throttler"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/wrangler"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata"
)

Expand Down Expand Up @@ -169,9 +172,7 @@ func tstWorkflowExec(t *testing.T, cells, workflow, sourceKs, targetKs, tables,
args = append(args, "--tablet-types", tabletTypes)
}
args = append(args, "--action_timeout=10m") // At this point something is up so fail the test
if debugMode {
t.Logf("Executing workflow command: vtctldclient %v", strings.Join(args, " "))
}
t.Logf("Executing workflow command: vtctldclient %s", strings.Join(args, " "))
output, err := vc.VtctldClient.ExecuteCommandWithOutput(args...)
lastOutput = output
if err != nil {
Expand Down Expand Up @@ -334,33 +335,44 @@ func tstWorkflowCancel(t *testing.T) error {
return tstWorkflowAction(t, workflowActionCancel, "", "")
}

func validateReadsRoute(t *testing.T, tabletTypes string, tablet *cluster.VttabletProcess) {
func validateReadsRoute(t *testing.T, tabletType string, tablet *cluster.VttabletProcess) {
if tablet == nil {
return
}
if tabletTypes == "" {
tabletTypes = "replica,rdonly"
}
vtgateConn, closeConn := getVTGateConn()
defer closeConn()
for _, tt := range []string{"replica", "rdonly"} {
destination := fmt.Sprintf("%s:%s@%s", tablet.Keyspace, tablet.Shard, tt)
if strings.Contains(tabletTypes, tt) {
readQuery := "select cid from customer limit 10"
assertQueryExecutesOnTablet(t, vtgateConn, tablet, destination, readQuery, "select cid from customer limit :vtg1")
}
}
// We do NOT want to target a shard as that goes around the routing rules and
// defeats the purpose here. We are using a query w/o a WHERE clause so for
// sharded keyspaces it should hit all shards as a SCATTER query. So all we
// care about is the keyspace and tablet type.
destination := fmt.Sprintf("%s@%s", tablet.Keyspace, strings.ToLower(tabletType))
readQuery := "select cid from customer limit 50"
assertQueryExecutesOnTablet(t, vtgateConn, tablet, destination, readQuery, "select cid from customer limit :vtg1")
}

func validateReadsRouteToSource(t *testing.T, tabletTypes string) {
if sourceReplicaTab != nil {
validateReadsRoute(t, tabletTypes, sourceReplicaTab)
tt, err := topoproto.ParseTabletTypes(tabletTypes)
require.NoError(t, err)
if slices.Contains(tt, topodatapb.TabletType_REPLICA) {
require.NotNil(t, sourceReplicaTab)
validateReadsRoute(t, topodatapb.TabletType_REPLICA.String(), sourceReplicaTab)
}
if slices.Contains(tt, topodatapb.TabletType_RDONLY) {
require.NotNil(t, sourceRdonlyTab)
validateReadsRoute(t, topodatapb.TabletType_RDONLY.String(), sourceRdonlyTab)
}
}

func validateReadsRouteToTarget(t *testing.T, tabletTypes string) {
if targetReplicaTab1 != nil {
validateReadsRoute(t, tabletTypes, targetReplicaTab1)
tt, err := topoproto.ParseTabletTypes(tabletTypes)
require.NoError(t, err)
if slices.Contains(tt, topodatapb.TabletType_REPLICA) {
require.NotNil(t, targetReplicaTab1)
validateReadsRoute(t, topodatapb.TabletType_REPLICA.String(), targetReplicaTab1)
}
if slices.Contains(tt, topodatapb.TabletType_RDONLY) {
require.NotNil(t, targetRdonlyTab1)
validateReadsRoute(t, topodatapb.TabletType_RDONLY.String(), targetRdonlyTab1)
}
}

Expand Down Expand Up @@ -411,6 +423,13 @@ func getCurrentStatus(t *testing.T) string {
// but CI currently fails on creating multiple clusters even after the previous ones are torn down

func TestBasicV2Workflows(t *testing.T) {
ogReplicas := defaultReplicas
ogRdOnly := defaultRdonly
defer func() {
defaultReplicas = ogReplicas
defaultRdonly = ogRdOnly
}()
defaultReplicas = 1
defaultRdonly = 1
extraVTTabletArgs = []string{
parallelInsertWorkers,
Expand Down Expand Up @@ -664,7 +683,7 @@ func testMoveTablesV2Workflow(t *testing.T) {
// If it's not then we'll get an error as the table doesn't exist in the vschema
createMoveTablesWorkflow(t, "customer,loadtest,vdiff_order,reftable,_vt_PURGE_4f9194b43b2011eb8a0104ed332e05c2_20221210194431")
waitForWorkflowState(t, vc, ksWorkflow, binlogdatapb.VReplicationWorkflowState_Running.String())
validateReadsRouteToSource(t, "replica")
validateReadsRouteToSource(t, "replica,rdonly")
validateWritesRouteToSource(t)

// Verify that we've properly ignored any internal operational tables
Expand Down Expand Up @@ -725,6 +744,12 @@ func testPartialSwitches(t *testing.T) {
tstWorkflowSwitchReads(t, "", "")
checkStates(t, nextState, nextState) // idempotency

tstWorkflowReverseReads(t, "replica,rdonly", "")
checkStates(t, wrangler.WorkflowStateReadsSwitched, wrangler.WorkflowStateNotSwitched)

tstWorkflowSwitchReads(t, "", "")
checkStates(t, wrangler.WorkflowStateNotSwitched, wrangler.WorkflowStateReadsSwitched)

tstWorkflowSwitchWrites(t)
currentState = nextState
nextState = wrangler.WorkflowStateAllSwitched
Expand Down Expand Up @@ -771,12 +796,12 @@ func testRestOfWorkflow(t *testing.T) {
waitForLowLag(t, "customer", "wf1")
tstWorkflowSwitchReads(t, "", "")
checkStates(t, wrangler.WorkflowStateNotSwitched, wrangler.WorkflowStateReadsSwitched)
validateReadsRouteToTarget(t, "replica")
validateReadsRouteToTarget(t, "replica,rdonly")
validateWritesRouteToSource(t)

tstWorkflowSwitchWrites(t)
checkStates(t, wrangler.WorkflowStateReadsSwitched, wrangler.WorkflowStateAllSwitched)
validateReadsRouteToTarget(t, "replica")
validateReadsRouteToTarget(t, "replica,rdonly")
validateWritesRouteToTarget(t)

// this function is called for both MoveTables and Reshard, so the reverse workflows exist in different keyspaces
Expand All @@ -787,42 +812,45 @@ func testRestOfWorkflow(t *testing.T) {
waitForLowLag(t, keyspace, "wf1_reverse")
tstWorkflowReverseReads(t, "", "")
checkStates(t, wrangler.WorkflowStateAllSwitched, wrangler.WorkflowStateWritesSwitched)
validateReadsRouteToSource(t, "replica")
validateReadsRouteToSource(t, "replica,rdonly")
validateWritesRouteToTarget(t)

tstWorkflowReverseWrites(t)
checkStates(t, wrangler.WorkflowStateWritesSwitched, wrangler.WorkflowStateNotSwitched)
validateReadsRouteToSource(t, "replica")
validateReadsRouteToSource(t, "replica,rdonly")
validateWritesRouteToSource(t)

waitForLowLag(t, "customer", "wf1")
tstWorkflowSwitchWrites(t)
checkStates(t, wrangler.WorkflowStateNotSwitched, wrangler.WorkflowStateWritesSwitched)
validateReadsRouteToSource(t, "replica")
validateReadsRouteToSource(t, "replica,rdonly")
validateWritesRouteToTarget(t)

waitForLowLag(t, keyspace, "wf1_reverse")
tstWorkflowReverseWrites(t)
validateReadsRouteToSource(t, "replica")
checkStates(t, wrangler.WorkflowStateWritesSwitched, wrangler.WorkflowStateNotSwitched)
validateReadsRouteToSource(t, "replica,rdonly")
validateWritesRouteToSource(t)

waitForLowLag(t, "customer", "wf1")
tstWorkflowSwitchReads(t, "", "")
validateReadsRouteToTarget(t, "replica")
checkStates(t, wrangler.WorkflowStateNotSwitched, wrangler.WorkflowStateReadsSwitched)
validateReadsRouteToTarget(t, "replica,rdonly")
validateWritesRouteToSource(t)

tstWorkflowReverseReads(t, "", "")
validateReadsRouteToSource(t, "replica")
checkStates(t, wrangler.WorkflowStateReadsSwitched, wrangler.WorkflowStateNotSwitched)
validateReadsRouteToSource(t, "replica,rdonly")
validateWritesRouteToSource(t)

tstWorkflowSwitchReadsAndWrites(t)
validateReadsRouteToTarget(t, "replica")
validateReadsRoute(t, "rdonly", targetRdonlyTab1)
checkStates(t, wrangler.WorkflowStateNotSwitched, wrangler.WorkflowStateAllSwitched)
validateReadsRouteToTarget(t, "replica,rdonly")
validateWritesRouteToTarget(t)
waitForLowLag(t, keyspace, "wf1_reverse")
tstWorkflowReverseReadsAndWrites(t)
validateReadsRoute(t, "rdonly", sourceRdonlyTab)
validateReadsRouteToSource(t, "replica")
checkStates(t, wrangler.WorkflowStateAllSwitched, wrangler.WorkflowStateNotSwitched)
validateReadsRouteToSource(t, "replica,rdonly")
validateWritesRouteToSource(t)

// trying to complete an unswitched workflow should error
Expand All @@ -835,8 +863,7 @@ func testRestOfWorkflow(t *testing.T) {
waitForLowLag(t, "customer", "customer_name")
waitForLowLag(t, "customer", "enterprise_customer")
tstWorkflowSwitchReadsAndWrites(t)
validateReadsRoute(t, "rdonly", targetRdonlyTab1)
validateReadsRouteToTarget(t, "replica")
validateReadsRouteToTarget(t, "replica,rdonly")
validateWritesRouteToTarget(t)

err = tstWorkflowComplete(t)
Expand Down Expand Up @@ -899,7 +926,7 @@ func setupMinimalCluster(t *testing.T) *VitessCluster {

zone1 := vc.Cells["zone1"]

vc.AddKeyspace(t, []*Cell{zone1}, "product", "0", initialProductVSchema, initialProductSchema, 0, 0, 100, nil)
vc.AddKeyspace(t, []*Cell{zone1}, "product", "0", initialProductVSchema, initialProductSchema, defaultReplicas, defaultRdonly, 100, nil)

verifyClusterHealth(t, vc)
insertInitialData(t)
Expand All @@ -912,7 +939,7 @@ func setupMinimalCluster(t *testing.T) *VitessCluster {
func setupMinimalCustomerKeyspace(t *testing.T) map[string]*cluster.VttabletProcess {
tablets := make(map[string]*cluster.VttabletProcess)
if _, err := vc.AddKeyspace(t, []*Cell{vc.Cells["zone1"]}, "customer", "-80,80-",
customerVSchema, customerSchema, 0, 0, 200, nil); err != nil {
customerVSchema, customerSchema, defaultReplicas, defaultRdonly, 200, nil); err != nil {
t.Fatal(err)
}
defaultCell := vc.Cells[vc.CellNames[0]]
Expand Down Expand Up @@ -1048,6 +1075,7 @@ func createAdditionalCustomerShards(t *testing.T, shards string) {
targetTab2 = custKs.Shards["80-c0"].Tablets["zone1-600"].Vttablet
targetTab1 = custKs.Shards["40-80"].Tablets["zone1-500"].Vttablet
targetReplicaTab1 = custKs.Shards["-40"].Tablets["zone1-401"].Vttablet
targetRdonlyTab1 = custKs.Shards["-40"].Tablets["zone1-402"].Vttablet

sourceTab = custKs.Shards["-80"].Tablets["zone1-200"].Vttablet
sourceReplicaTab = custKs.Shards["-80"].Tablets["zone1-201"].Vttablet
Expand All @@ -1059,3 +1087,34 @@ func tstApplySchemaOnlineDDL(t *testing.T, sql string, keyspace string) {
"--sql", sql, keyspace)
require.NoError(t, err, fmt.Sprintf("ApplySchema Error: %s", err))
}

func validateTableRoutingRule(t *testing.T, table, tabletType, fromKeyspace, toKeyspace string) {
tabletType = strings.ToLower(strings.TrimSpace(tabletType))
rr := getRoutingRules(t)
// We set matched = true by default because it is possible, if --no-routing-rules is set while creating
// a workflow, that the routing rules are empty when the workflow starts.
// We set it to false below when the rule is found, but before matching the routed keyspace.
matched := true
for _, r := range rr.GetRules() {
fromRule := fmt.Sprintf("%s.%s", fromKeyspace, table)
if tabletType != "" && tabletType != "primary" {
fromRule = fmt.Sprintf("%s@%s", fromRule, tabletType)
}
if r.FromTable == fromRule {
// We found the rule, so we can set matched to false here and check for the routed keyspace below.
matched = false
require.NotEmpty(t, r.ToTables)
toTable := r.ToTables[0]
// The ToTables value is of the form "routedKeyspace.table".
routedKeyspace, routedTable, ok := strings.Cut(toTable, ".")
require.True(t, ok)
require.Equal(t, table, routedTable)
if routedKeyspace == toKeyspace {
// We found the rule, the table and keyspace matches, so our search is done.
matched = true
break
}
}
}
require.Truef(t, matched, "routing rule for %s.%s from %s to %s not found", fromKeyspace, table, tabletType, toKeyspace)
}
Loading

0 comments on commit 6c6499c

Please sign in to comment.