diff --git a/go/test/endtoend/vreplication/helper_test.go b/go/test/endtoend/vreplication/helper_test.go index 44c35d0acea..3795b6f52d5 100644 --- a/go/test/endtoend/vreplication/helper_test.go +++ b/go/test/endtoend/vreplication/helper_test.go @@ -331,8 +331,11 @@ func executeOnTablet(t *testing.T, conn *mysql.Conn, tablet *cluster.VttabletPro func assertQueryExecutesOnTablet(t *testing.T, conn *mysql.Conn, tablet *cluster.VttabletProcess, ksName string, query string, matchQuery string) { t.Helper() + rr, err := vc.VtctldClient.ExecuteCommandWithOutput("GetRoutingRules") + require.NoError(t, err) count0, body0, count1, body1 := executeOnTablet(t, conn, tablet, ksName, query, matchQuery) - assert.Equalf(t, count0+1, count1, "query %q did not execute in target;\ntried to match %q\nbefore:\n%s\n\nafter:\n%s\n\n", query, matchQuery, body0, body1) + require.Equalf(t, count0+1, count1, "query %q did not execute on destination %s (%s-%d);\ntried to match %q\nbefore:\n%s\n\nafter:\n%s\n\nrouting rules:\n%s\n\n", + query, ksName, tablet.Cell, tablet.TabletUID, matchQuery, body0, body1, rr) } func assertQueryDoesNotExecutesOnTablet(t *testing.T, conn *mysql.Conn, tablet *cluster.VttabletProcess, ksName string, query string, matchQuery string) { diff --git a/go/test/endtoend/vreplication/movetables_buffering_test.go b/go/test/endtoend/vreplication/movetables_buffering_test.go index f456c32bfd5..eed96768fc5 100644 --- a/go/test/endtoend/vreplication/movetables_buffering_test.go +++ b/go/test/endtoend/vreplication/movetables_buffering_test.go @@ -12,7 +12,14 @@ import ( ) func TestMoveTablesBuffering(t *testing.T) { - defaultRdonly = 1 + ogReplicas := defaultReplicas + ogRdOnly := defaultRdonly + defer func() { + defaultReplicas = ogReplicas + defaultRdonly = ogRdOnly + }() + defaultRdonly = 0 + defaultReplicas = 0 vc = setupMinimalCluster(t) defer vc.TearDown() diff --git a/go/test/endtoend/vreplication/resharding_workflows_v2_test.go b/go/test/endtoend/vreplication/resharding_workflows_v2_test.go index 4c6dea61912..28ffc762ecd 100644 --- a/go/test/endtoend/vreplication/resharding_workflows_v2_test.go +++ b/go/test/endtoend/vreplication/resharding_workflows_v2_test.go @@ -22,6 +22,7 @@ import ( "fmt" "math/rand/v2" "net" + "slices" "strconv" "strings" "sync" @@ -35,9 +36,11 @@ import ( "vitess.io/vitess/go/test/endtoend/cluster" "vitess.io/vitess/go/test/endtoend/throttler" "vitess.io/vitess/go/vt/log" + "vitess.io/vitess/go/vt/topo/topoproto" "vitess.io/vitess/go/vt/wrangler" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata" ) @@ -169,9 +172,7 @@ func tstWorkflowExec(t *testing.T, cells, workflow, sourceKs, targetKs, tables, args = append(args, "--tablet-types", tabletTypes) } args = append(args, "--action_timeout=10m") // At this point something is up so fail the test - if debugMode { - t.Logf("Executing workflow command: vtctldclient %v", strings.Join(args, " ")) - } + t.Logf("Executing workflow command: vtctldclient %s", strings.Join(args, " ")) output, err := vc.VtctldClient.ExecuteCommandWithOutput(args...) lastOutput = output if err != nil { @@ -334,33 +335,44 @@ func tstWorkflowCancel(t *testing.T) error { return tstWorkflowAction(t, workflowActionCancel, "", "") } -func validateReadsRoute(t *testing.T, tabletTypes string, tablet *cluster.VttabletProcess) { +func validateReadsRoute(t *testing.T, tabletType string, tablet *cluster.VttabletProcess) { if tablet == nil { return } - if tabletTypes == "" { - tabletTypes = "replica,rdonly" - } vtgateConn, closeConn := getVTGateConn() defer closeConn() - for _, tt := range []string{"replica", "rdonly"} { - destination := fmt.Sprintf("%s:%s@%s", tablet.Keyspace, tablet.Shard, tt) - if strings.Contains(tabletTypes, tt) { - readQuery := "select cid from customer limit 10" - assertQueryExecutesOnTablet(t, vtgateConn, tablet, destination, readQuery, "select cid from customer limit :vtg1") - } - } + // We do NOT want to target a shard as that goes around the routing rules and + // defeats the purpose here. We are using a query w/o a WHERE clause so for + // sharded keyspaces it should hit all shards as a SCATTER query. So all we + // care about is the keyspace and tablet type. + destination := fmt.Sprintf("%s@%s", tablet.Keyspace, strings.ToLower(tabletType)) + readQuery := "select cid from customer limit 50" + assertQueryExecutesOnTablet(t, vtgateConn, tablet, destination, readQuery, "select cid from customer limit :vtg1") } func validateReadsRouteToSource(t *testing.T, tabletTypes string) { - if sourceReplicaTab != nil { - validateReadsRoute(t, tabletTypes, sourceReplicaTab) + tt, err := topoproto.ParseTabletTypes(tabletTypes) + require.NoError(t, err) + if slices.Contains(tt, topodatapb.TabletType_REPLICA) { + require.NotNil(t, sourceReplicaTab) + validateReadsRoute(t, topodatapb.TabletType_REPLICA.String(), sourceReplicaTab) + } + if slices.Contains(tt, topodatapb.TabletType_RDONLY) { + require.NotNil(t, sourceRdonlyTab) + validateReadsRoute(t, topodatapb.TabletType_RDONLY.String(), sourceRdonlyTab) } } func validateReadsRouteToTarget(t *testing.T, tabletTypes string) { - if targetReplicaTab1 != nil { - validateReadsRoute(t, tabletTypes, targetReplicaTab1) + tt, err := topoproto.ParseTabletTypes(tabletTypes) + require.NoError(t, err) + if slices.Contains(tt, topodatapb.TabletType_REPLICA) { + require.NotNil(t, targetReplicaTab1) + validateReadsRoute(t, topodatapb.TabletType_REPLICA.String(), targetReplicaTab1) + } + if slices.Contains(tt, topodatapb.TabletType_RDONLY) { + require.NotNil(t, targetRdonlyTab1) + validateReadsRoute(t, topodatapb.TabletType_RDONLY.String(), targetRdonlyTab1) } } @@ -411,6 +423,13 @@ func getCurrentStatus(t *testing.T) string { // but CI currently fails on creating multiple clusters even after the previous ones are torn down func TestBasicV2Workflows(t *testing.T) { + ogReplicas := defaultReplicas + ogRdOnly := defaultRdonly + defer func() { + defaultReplicas = ogReplicas + defaultRdonly = ogRdOnly + }() + defaultReplicas = 1 defaultRdonly = 1 extraVTTabletArgs = []string{ parallelInsertWorkers, @@ -664,7 +683,7 @@ func testMoveTablesV2Workflow(t *testing.T) { // If it's not then we'll get an error as the table doesn't exist in the vschema createMoveTablesWorkflow(t, "customer,loadtest,vdiff_order,reftable,_vt_PURGE_4f9194b43b2011eb8a0104ed332e05c2_20221210194431") waitForWorkflowState(t, vc, ksWorkflow, binlogdatapb.VReplicationWorkflowState_Running.String()) - validateReadsRouteToSource(t, "replica") + validateReadsRouteToSource(t, "replica,rdonly") validateWritesRouteToSource(t) // Verify that we've properly ignored any internal operational tables @@ -725,6 +744,12 @@ func testPartialSwitches(t *testing.T) { tstWorkflowSwitchReads(t, "", "") checkStates(t, nextState, nextState) // idempotency + tstWorkflowReverseReads(t, "replica,rdonly", "") + checkStates(t, wrangler.WorkflowStateReadsSwitched, wrangler.WorkflowStateNotSwitched) + + tstWorkflowSwitchReads(t, "", "") + checkStates(t, wrangler.WorkflowStateNotSwitched, wrangler.WorkflowStateReadsSwitched) + tstWorkflowSwitchWrites(t) currentState = nextState nextState = wrangler.WorkflowStateAllSwitched @@ -771,12 +796,12 @@ func testRestOfWorkflow(t *testing.T) { waitForLowLag(t, "customer", "wf1") tstWorkflowSwitchReads(t, "", "") checkStates(t, wrangler.WorkflowStateNotSwitched, wrangler.WorkflowStateReadsSwitched) - validateReadsRouteToTarget(t, "replica") + validateReadsRouteToTarget(t, "replica,rdonly") validateWritesRouteToSource(t) tstWorkflowSwitchWrites(t) checkStates(t, wrangler.WorkflowStateReadsSwitched, wrangler.WorkflowStateAllSwitched) - validateReadsRouteToTarget(t, "replica") + validateReadsRouteToTarget(t, "replica,rdonly") validateWritesRouteToTarget(t) // this function is called for both MoveTables and Reshard, so the reverse workflows exist in different keyspaces @@ -787,42 +812,45 @@ func testRestOfWorkflow(t *testing.T) { waitForLowLag(t, keyspace, "wf1_reverse") tstWorkflowReverseReads(t, "", "") checkStates(t, wrangler.WorkflowStateAllSwitched, wrangler.WorkflowStateWritesSwitched) - validateReadsRouteToSource(t, "replica") + validateReadsRouteToSource(t, "replica,rdonly") validateWritesRouteToTarget(t) tstWorkflowReverseWrites(t) checkStates(t, wrangler.WorkflowStateWritesSwitched, wrangler.WorkflowStateNotSwitched) - validateReadsRouteToSource(t, "replica") + validateReadsRouteToSource(t, "replica,rdonly") validateWritesRouteToSource(t) waitForLowLag(t, "customer", "wf1") tstWorkflowSwitchWrites(t) checkStates(t, wrangler.WorkflowStateNotSwitched, wrangler.WorkflowStateWritesSwitched) - validateReadsRouteToSource(t, "replica") + validateReadsRouteToSource(t, "replica,rdonly") validateWritesRouteToTarget(t) waitForLowLag(t, keyspace, "wf1_reverse") tstWorkflowReverseWrites(t) - validateReadsRouteToSource(t, "replica") + checkStates(t, wrangler.WorkflowStateWritesSwitched, wrangler.WorkflowStateNotSwitched) + validateReadsRouteToSource(t, "replica,rdonly") validateWritesRouteToSource(t) waitForLowLag(t, "customer", "wf1") tstWorkflowSwitchReads(t, "", "") - validateReadsRouteToTarget(t, "replica") + checkStates(t, wrangler.WorkflowStateNotSwitched, wrangler.WorkflowStateReadsSwitched) + validateReadsRouteToTarget(t, "replica,rdonly") validateWritesRouteToSource(t) tstWorkflowReverseReads(t, "", "") - validateReadsRouteToSource(t, "replica") + checkStates(t, wrangler.WorkflowStateReadsSwitched, wrangler.WorkflowStateNotSwitched) + validateReadsRouteToSource(t, "replica,rdonly") validateWritesRouteToSource(t) tstWorkflowSwitchReadsAndWrites(t) - validateReadsRouteToTarget(t, "replica") - validateReadsRoute(t, "rdonly", targetRdonlyTab1) + checkStates(t, wrangler.WorkflowStateNotSwitched, wrangler.WorkflowStateAllSwitched) + validateReadsRouteToTarget(t, "replica,rdonly") validateWritesRouteToTarget(t) waitForLowLag(t, keyspace, "wf1_reverse") tstWorkflowReverseReadsAndWrites(t) - validateReadsRoute(t, "rdonly", sourceRdonlyTab) - validateReadsRouteToSource(t, "replica") + checkStates(t, wrangler.WorkflowStateAllSwitched, wrangler.WorkflowStateNotSwitched) + validateReadsRouteToSource(t, "replica,rdonly") validateWritesRouteToSource(t) // trying to complete an unswitched workflow should error @@ -835,8 +863,7 @@ func testRestOfWorkflow(t *testing.T) { waitForLowLag(t, "customer", "customer_name") waitForLowLag(t, "customer", "enterprise_customer") tstWorkflowSwitchReadsAndWrites(t) - validateReadsRoute(t, "rdonly", targetRdonlyTab1) - validateReadsRouteToTarget(t, "replica") + validateReadsRouteToTarget(t, "replica,rdonly") validateWritesRouteToTarget(t) err = tstWorkflowComplete(t) @@ -899,7 +926,7 @@ func setupMinimalCluster(t *testing.T) *VitessCluster { zone1 := vc.Cells["zone1"] - vc.AddKeyspace(t, []*Cell{zone1}, "product", "0", initialProductVSchema, initialProductSchema, 0, 0, 100, nil) + vc.AddKeyspace(t, []*Cell{zone1}, "product", "0", initialProductVSchema, initialProductSchema, defaultReplicas, defaultRdonly, 100, nil) verifyClusterHealth(t, vc) insertInitialData(t) @@ -912,7 +939,7 @@ func setupMinimalCluster(t *testing.T) *VitessCluster { func setupMinimalCustomerKeyspace(t *testing.T) map[string]*cluster.VttabletProcess { tablets := make(map[string]*cluster.VttabletProcess) if _, err := vc.AddKeyspace(t, []*Cell{vc.Cells["zone1"]}, "customer", "-80,80-", - customerVSchema, customerSchema, 0, 0, 200, nil); err != nil { + customerVSchema, customerSchema, defaultReplicas, defaultRdonly, 200, nil); err != nil { t.Fatal(err) } defaultCell := vc.Cells[vc.CellNames[0]] @@ -1048,6 +1075,7 @@ func createAdditionalCustomerShards(t *testing.T, shards string) { targetTab2 = custKs.Shards["80-c0"].Tablets["zone1-600"].Vttablet targetTab1 = custKs.Shards["40-80"].Tablets["zone1-500"].Vttablet targetReplicaTab1 = custKs.Shards["-40"].Tablets["zone1-401"].Vttablet + targetRdonlyTab1 = custKs.Shards["-40"].Tablets["zone1-402"].Vttablet sourceTab = custKs.Shards["-80"].Tablets["zone1-200"].Vttablet sourceReplicaTab = custKs.Shards["-80"].Tablets["zone1-201"].Vttablet @@ -1059,3 +1087,34 @@ func tstApplySchemaOnlineDDL(t *testing.T, sql string, keyspace string) { "--sql", sql, keyspace) require.NoError(t, err, fmt.Sprintf("ApplySchema Error: %s", err)) } + +func validateTableRoutingRule(t *testing.T, table, tabletType, fromKeyspace, toKeyspace string) { + tabletType = strings.ToLower(strings.TrimSpace(tabletType)) + rr := getRoutingRules(t) + // We set matched = true by default because it is possible, if --no-routing-rules is set while creating + // a workflow, that the routing rules are empty when the workflow starts. + // We set it to false below when the rule is found, but before matching the routed keyspace. + matched := true + for _, r := range rr.GetRules() { + fromRule := fmt.Sprintf("%s.%s", fromKeyspace, table) + if tabletType != "" && tabletType != "primary" { + fromRule = fmt.Sprintf("%s@%s", fromRule, tabletType) + } + if r.FromTable == fromRule { + // We found the rule, so we can set matched to false here and check for the routed keyspace below. + matched = false + require.NotEmpty(t, r.ToTables) + toTable := r.ToTables[0] + // The ToTables value is of the form "routedKeyspace.table". + routedKeyspace, routedTable, ok := strings.Cut(toTable, ".") + require.True(t, ok) + require.Equal(t, table, routedTable) + if routedKeyspace == toKeyspace { + // We found the rule, the table and keyspace matches, so our search is done. + matched = true + break + } + } + } + require.Truef(t, matched, "routing rule for %s.%s from %s to %s not found", fromKeyspace, table, tabletType, toKeyspace) +} diff --git a/go/test/endtoend/vreplication/vreplication_vtctldclient_cli_test.go b/go/test/endtoend/vreplication/vreplication_vtctldclient_cli_test.go index 1b26a4b05ba..1dc07a90abc 100644 --- a/go/test/endtoend/vreplication/vreplication_vtctldclient_cli_test.go +++ b/go/test/endtoend/vreplication/vreplication_vtctldclient_cli_test.go @@ -31,6 +31,7 @@ import ( "vitess.io/vitess/go/json2" "vitess.io/vitess/go/test/endtoend/cluster" "vitess.io/vitess/go/vt/topo/topoproto" + "vitess.io/vitess/go/vt/wrangler" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" @@ -64,7 +65,19 @@ func TestVtctldclientCLI(t *testing.T) { targetKeyspaceName := "customer" var mt iMoveTables workflowName := "wf1" + + sourceReplicaTab = vc.Cells["zone1"].Keyspaces[sourceKeyspaceName].Shards["0"].Tablets["zone1-101"].Vttablet + require.NotNil(t, sourceReplicaTab) + sourceTab = vc.Cells["zone1"].Keyspaces[sourceKeyspaceName].Shards["0"].Tablets["zone1-100"].Vttablet + require.NotNil(t, sourceTab) + targetTabs := setupMinimalCustomerKeyspace(t) + targetTab1 = targetTabs["-80"] + require.NotNil(t, targetTab1) + targetTab2 = targetTabs["80-"] + require.NotNil(t, targetTab2) + targetReplicaTab1 = vc.Cells["zone1"].Keyspaces[targetKeyspaceName].Shards["-80"].Tablets["zone1-201"].Vttablet + require.NotNil(t, targetReplicaTab1) t.Run("RoutingRulesApply", func(t *testing.T) { testRoutingRulesApplyCommands(t) @@ -95,6 +108,19 @@ func TestVtctldclientCLI(t *testing.T) { "-40": targetKeyspace.Shards["-40"].Tablets["zone1-400"].Vttablet, "40-80": targetKeyspace.Shards["40-80"].Tablets["zone1-500"].Vttablet, } + + sourceReplicaTab = vc.Cells["zone1"].Keyspaces[targetKeyspaceName].Shards["-80"].Tablets["zone1-201"].Vttablet + require.NotNil(t, sourceReplicaTab) + sourceTab = vc.Cells["zone1"].Keyspaces[targetKeyspaceName].Shards["-80"].Tablets["zone1-200"].Vttablet + require.NotNil(t, sourceTab) + + targetTab1 = tablets["-40"] + require.NotNil(t, targetTab1) + targetTab2 = tablets["40-80"] + require.NotNil(t, targetTab2) + targetReplicaTab1 = vc.Cells["zone1"].Keyspaces[targetKeyspaceName].Shards["-40"].Tablets["zone1-401"].Vttablet + require.NotNil(t, targetReplicaTab1) + splitShard(t, targetKeyspaceName, reshardWorkflowName, sourceShard, newShards, tablets) }) } @@ -144,6 +170,7 @@ func getMoveTablesShowResponse(mt *iMoveTables) *vtctldatapb.GetWorkflowsRespons // Validates some of the flags created from the previous test. func testMoveTablesFlags2(t *testing.T, mt *iMoveTables, sourceKeyspace, targetKeyspace, workflowName string, targetTabs map[string]*cluster.VttabletProcess) { ksWorkflow := fmt.Sprintf("%s.%s", targetKeyspace, workflowName) + wf := (*mt).(iWorkflow) (*mt).Start() // Need to start because we set auto-start to false. waitForWorkflowState(t, vc, ksWorkflow, binlogdatapb.VReplicationWorkflowState_Stopped.String()) confirmNoRoutingRules(t) @@ -163,7 +190,85 @@ func testMoveTablesFlags2(t *testing.T, mt *iMoveTables, sourceKeyspace, targetK for _, tab := range targetTabs { catchup(t, tab, workflowName, "MoveTables") } + + (*mt).SwitchReads() + validateReadsRouteToTarget(t, "replica") + validateTableRoutingRule(t, "customer", "replica", sourceKs, targetKs) + validateTableRoutingRule(t, "customer", "", targetKs, sourceKs) + confirmStates(t, &wf, wrangler.WorkflowStateNotSwitched, wrangler.WorkflowStateReadsSwitched) + + (*mt).ReverseReads() + validateReadsRouteToSource(t, "replica") + validateTableRoutingRule(t, "customer", "replica", targetKs, sourceKs) + validateTableRoutingRule(t, "customer", "", targetKs, sourceKs) + confirmStates(t, &wf, wrangler.WorkflowStateReadsSwitched, wrangler.WorkflowStateNotSwitched) + + (*mt).SwitchReadsAndWrites() + validateReadsRouteToTarget(t, "replica") + validateTableRoutingRule(t, "customer", "replica", sourceKs, targetKs) + validateWritesRouteToTarget(t) + validateTableRoutingRule(t, "customer", "", sourceKs, targetKs) + confirmStates(t, &wf, wrangler.WorkflowStateNotSwitched, wrangler.WorkflowStateAllSwitched) + + (*mt).ReverseReadsAndWrites() + validateReadsRouteToSource(t, "replica") + validateTableRoutingRule(t, "customer", "replica", targetKs, sourceKs) + validateWritesRouteToSource(t) + validateTableRoutingRule(t, "customer", "", targetKs, sourceKs) + confirmStates(t, &wf, wrangler.WorkflowStateAllSwitched, wrangler.WorkflowStateNotSwitched) + + (*mt).SwitchReadsAndWrites() + validateReadsRouteToTarget(t, "replica") + validateTableRoutingRule(t, "customer", "replica", sourceKs, targetKs) + validateWritesRouteToTarget(t) + validateTableRoutingRule(t, "customer", "", sourceKs, targetKs) + confirmStates(t, &wf, wrangler.WorkflowStateNotSwitched, wrangler.WorkflowStateAllSwitched) + + (*mt).ReverseReads() + validateReadsRouteToSource(t, "replica") + validateTableRoutingRule(t, "customer", "replica", targetKs, sourceKs) + validateWritesRouteToTarget(t) + validateTableRoutingRule(t, "customer", "", sourceKs, targetKs) + confirmStates(t, &wf, wrangler.WorkflowStateAllSwitched, wrangler.WorkflowStateWritesSwitched) + + (*mt).ReverseWrites() + validateReadsRouteToSource(t, "replica") + validateTableRoutingRule(t, "customer", "replica", targetKs, sourceKs) + validateWritesRouteToSource(t) + validateTableRoutingRule(t, "customer", "", targetKs, sourceKs) + confirmStates(t, &wf, wrangler.WorkflowStateWritesSwitched, wrangler.WorkflowStateNotSwitched) + + (*mt).SwitchReadsAndWrites() + validateReadsRouteToTarget(t, "replica") + validateTableRoutingRule(t, "customer", "replica", sourceKs, targetKs) + validateWritesRouteToTarget(t) + validateTableRoutingRule(t, "customer", "", sourceKs, targetKs) + confirmStates(t, &wf, wrangler.WorkflowStateNotSwitched, wrangler.WorkflowStateAllSwitched) + + (*mt).ReverseWrites() + validateReadsRouteToTarget(t, "replica") + validateTableRoutingRule(t, "customer", "replica", sourceKs, targetKs) + validateWritesRouteToSource(t) + validateTableRoutingRule(t, "customer", "", targetKs, sourceKs) + confirmStates(t, &wf, wrangler.WorkflowStateAllSwitched, wrangler.WorkflowStateReadsSwitched) + + (*mt).ReverseReads() + validateReadsRouteToSource(t, "replica") + validateTableRoutingRule(t, "customer", "replica", targetKs, sourceKs) + validateWritesRouteToSource(t) + validateTableRoutingRule(t, "customer", "", targetKs, sourceKs) + confirmStates(t, &wf, wrangler.WorkflowStateReadsSwitched, wrangler.WorkflowStateNotSwitched) + + // Confirm that everything is still in sync after our switch fest. + vdiff(t, targetKeyspace, workflowName, "zone1", false, true, nil) + (*mt).SwitchReadsAndWrites() + validateReadsRouteToTarget(t, "replica") + validateTableRoutingRule(t, "customer", "replica", sourceKs, targetKs) + validateWritesRouteToTarget(t) + validateTableRoutingRule(t, "customer", "", sourceKs, targetKs) + confirmStates(t, &wf, wrangler.WorkflowStateNotSwitched, wrangler.WorkflowStateAllSwitched) + (*mt).Complete() confirmRoutingRulesExist(t) // Confirm that --keep-data was honored. @@ -354,6 +459,7 @@ func splitShard(t *testing.T, keyspace, workflowName, sourceShards, targetShards }, workflowFlavorVtctld) ksWorkflow := fmt.Sprintf("%s.%s", keyspace, workflowName) + wf := rs.(iWorkflow) rs.Create() validateReshardResponse(rs) validateOverrides(t, targetTabs, overrides) @@ -381,17 +487,123 @@ func splitShard(t *testing.T, keyspace, workflowName, sourceShards, targetShards } vdiff(t, keyspace, workflowName, "zone1", false, true, nil) + shardReadsRouteToSource := func() { + require.True(t, getShardRoute(t, keyspace, "-80", "replica")) + } + + shardReadsRouteToTarget := func() { + require.True(t, getShardRoute(t, keyspace, "-40", "replica")) + } + + shardWritesRouteToSource := func() { + require.True(t, getShardRoute(t, keyspace, "-80", "primary")) + } + + shardWritesRouteToTarget := func() { + require.True(t, getShardRoute(t, keyspace, "-40", "primary")) + } + rs.SwitchReadsAndWrites() waitForLowLag(t, keyspace, workflowName+"_reverse") vdiff(t, keyspace, workflowName+"_reverse", "zone1", true, false, nil) + shardReadsRouteToTarget() + shardWritesRouteToTarget() + confirmStates(t, &wf, wrangler.WorkflowStateNotSwitched, wrangler.WorkflowStateAllSwitched) rs.ReverseReadsAndWrites() waitForLowLag(t, keyspace, workflowName) vdiff(t, keyspace, workflowName, "zone1", false, true, nil) + shardReadsRouteToSource() + shardWritesRouteToSource() + confirmStates(t, &wf, wrangler.WorkflowStateAllSwitched, wrangler.WorkflowStateNotSwitched) + + rs.SwitchReads() + shardReadsRouteToTarget() + shardWritesRouteToSource() + confirmStates(t, &wf, wrangler.WorkflowStateNotSwitched, wrangler.WorkflowStateReadsSwitched) + + rs.ReverseReads() + shardReadsRouteToSource() + shardWritesRouteToSource() + confirmStates(t, &wf, wrangler.WorkflowStateReadsSwitched, wrangler.WorkflowStateNotSwitched) + + rs.SwitchReadsAndWrites() + shardReadsRouteToTarget() + shardWritesRouteToTarget() + confirmStates(t, &wf, wrangler.WorkflowStateNotSwitched, wrangler.WorkflowStateAllSwitched) + + rs.ReverseReadsAndWrites() + shardReadsRouteToSource() + shardWritesRouteToSource() + confirmStates(t, &wf, wrangler.WorkflowStateAllSwitched, wrangler.WorkflowStateNotSwitched) + + rs.SwitchReadsAndWrites() + shardReadsRouteToTarget() + shardWritesRouteToTarget() + confirmStates(t, &wf, wrangler.WorkflowStateNotSwitched, wrangler.WorkflowStateAllSwitched) + + rs.ReverseReads() + shardReadsRouteToSource() + shardWritesRouteToTarget() + confirmStates(t, &wf, wrangler.WorkflowStateAllSwitched, wrangler.WorkflowStateWritesSwitched) + + rs.ReverseWrites() + shardReadsRouteToSource() + shardWritesRouteToSource() + confirmStates(t, &wf, wrangler.WorkflowStateWritesSwitched, wrangler.WorkflowStateNotSwitched) + + rs.SwitchReadsAndWrites() + shardReadsRouteToTarget() + shardWritesRouteToTarget() + confirmStates(t, &wf, wrangler.WorkflowStateNotSwitched, wrangler.WorkflowStateAllSwitched) + + rs.ReverseWrites() + shardReadsRouteToTarget() + shardWritesRouteToSource() + confirmStates(t, &wf, wrangler.WorkflowStateAllSwitched, wrangler.WorkflowStateReadsSwitched) + + rs.ReverseReads() + shardReadsRouteToSource() + shardWritesRouteToSource() + confirmStates(t, &wf, wrangler.WorkflowStateReadsSwitched, wrangler.WorkflowStateNotSwitched) + + // Confirm that everything is still in sync after our switch fest. + vdiff(t, keyspace, workflowName, "zone1", false, true, nil) + rs.SwitchReadsAndWrites() + shardReadsRouteToTarget() + shardWritesRouteToTarget() + confirmStates(t, &wf, wrangler.WorkflowStateNotSwitched, wrangler.WorkflowStateAllSwitched) + rs.Complete() } +func getSrvKeyspace(t *testing.T, keyspace string) *topodatapb.SrvKeyspace { + output, err := vc.VtctldClient.ExecuteCommandWithOutput("GetSrvKeyspaces", keyspace, "zone1") + require.NoError(t, err) + var srvKeyspaces map[string]*topodatapb.SrvKeyspace + err = json2.Unmarshal([]byte(output), &srvKeyspaces) + require.NoError(t, err) + require.Equal(t, 1, len(srvKeyspaces)) + return srvKeyspaces["zone1"] +} + +func getShardRoute(t *testing.T, keyspace, shard string, tabletType string) bool { + srvKeyspace := getSrvKeyspace(t, keyspace) + for _, partition := range srvKeyspace.Partitions { + tt, err := topoproto.ParseTabletType(tabletType) + require.NoError(t, err) + if partition.ServedType == tt { + for _, shardReference := range partition.ShardReferences { + if shardReference.Name == shard { + return true + } + } + } + } + return false +} + func getReshardShowResponse(rs *iReshard) *vtctldatapb.GetWorkflowsResponse { (*rs).Show() reshardOutput := (*rs).GetLastOutput() @@ -731,3 +943,8 @@ func testOneRoutingRulesCommand(t *testing.T, typ string, rules string, validate }) } } + +func confirmStates(t *testing.T, workflow *iWorkflow, startState, endState string) { + require.Contains(t, (*workflow).GetLastOutput(), fmt.Sprintf("Start State: %s", startState)) + require.Contains(t, (*workflow).GetLastOutput(), fmt.Sprintf("Current State: %s", endState)) +} diff --git a/go/test/endtoend/vreplication/wrappers_test.go b/go/test/endtoend/vreplication/wrappers_test.go index 96c54b89fe8..ab9a8eb9dfb 100644 --- a/go/test/endtoend/vreplication/wrappers_test.go +++ b/go/test/endtoend/vreplication/wrappers_test.go @@ -19,6 +19,7 @@ package vreplication import ( "math/rand/v2" "strconv" + "strings" "github.com/stretchr/testify/require" @@ -33,12 +34,15 @@ type iWorkflow interface { SwitchReads() SwitchWrites() SwitchReadsAndWrites() + ReverseReads() + ReverseWrites() ReverseReadsAndWrites() Cancel() Complete() Flavor() string GetLastOutput() string Start() + Status() Stop() } @@ -146,6 +150,11 @@ func (vmt *VtctlMoveTables) Show() { panic("implement me") } +func (vmt *VtctlMoveTables) Status() { + currentWorkflowType = binlogdatapb.VReplicationWorkflowType_MoveTables + vmt.exec("Status") +} + func (vmt *VtctlMoveTables) exec(action string) { options := &workflowExecOptions{ deferSecondaryKeys: false, @@ -156,13 +165,26 @@ func (vmt *VtctlMoveTables) exec(action string) { require.NoError(vmt.vc.t, err) } func (vmt *VtctlMoveTables) SwitchReads() { - // TODO implement me - panic("implement me") + err := tstWorkflowExecVtctl(vmt.vc.t, "", vmt.workflowName, vmt.sourceKeyspace, vmt.targetKeyspace, + vmt.tables, workflowActionSwitchTraffic, "replica,rdonly", "", "", defaultWorkflowExecOptions) + require.NoError(vmt.vc.t, err) } func (vmt *VtctlMoveTables) SwitchWrites() { - // TODO implement me - panic("implement me") + err := tstWorkflowExecVtctl(vmt.vc.t, "", vmt.workflowName, vmt.sourceKeyspace, vmt.targetKeyspace, + vmt.tables, workflowActionSwitchTraffic, "primary", "", "", defaultWorkflowExecOptions) + require.NoError(vmt.vc.t, err) +} +func (vmt *VtctlMoveTables) ReverseReads() { + err := tstWorkflowExecVtctl(vmt.vc.t, "", vmt.workflowName, vmt.sourceKeyspace, vmt.targetKeyspace, + vmt.tables, workflowActionReverseTraffic, "replica,rdonly", "", "", defaultWorkflowExecOptions) + require.NoError(vmt.vc.t, err) +} + +func (vmt *VtctlMoveTables) ReverseWrites() { + err := tstWorkflowExecVtctl(vmt.vc.t, "", vmt.workflowName, vmt.sourceKeyspace, vmt.targetKeyspace, + vmt.tables, workflowActionReverseTraffic, "primary", "", "", defaultWorkflowExecOptions) + require.NoError(vmt.vc.t, err) } func (vmt *VtctlMoveTables) Cancel() { @@ -203,9 +225,9 @@ func (v VtctldMoveTables) exec(args ...string) { args2 := []string{"MoveTables", "--workflow=" + v.workflowName, "--target-keyspace=" + v.targetKeyspace} args2 = append(args2, args...) var err error - if v.lastOutput, err = vc.VtctldClient.ExecuteCommandWithOutput(args2...); err != nil { - require.FailNowf(v.vc.t, "failed MoveTables action", "%v: %s", err, v.lastOutput) - } + v.vc.t.Logf("Executing workflow command: vtctldclient %s", strings.Join(args2, " ")) + v.lastOutput, err = vc.VtctldClient.ExecuteCommandWithOutput(args2...) + require.NoError(v.vc.t, err, "failed MoveTables action, error: %v: output: %s", err, v.lastOutput) } func (v VtctldMoveTables) Create() { @@ -247,6 +269,10 @@ func (v VtctldMoveTables) Show() { v.exec(args...) } +func (v VtctldMoveTables) Status() { + v.exec("Status") +} + func (v VtctldMoveTables) SwitchReads() { args := []string{"SwitchTraffic", "--tablet-types=rdonly,replica"} args = append(args, v.switchFlags...) @@ -259,6 +285,18 @@ func (v VtctldMoveTables) SwitchWrites() { v.exec(args...) } +func (v VtctldMoveTables) ReverseReads() { + args := []string{"ReverseTraffic", "--tablet-types=rdonly,replica"} + args = append(args, v.switchFlags...) + v.exec(args...) +} + +func (v VtctldMoveTables) ReverseWrites() { + args := []string{"ReverseTraffic", "--tablet-types=primary"} + args = append(args, v.switchFlags...) + v.exec(args...) +} + func (v VtctldMoveTables) Cancel() { v.exec("Cancel") } @@ -323,6 +361,16 @@ type VtctlReshard struct { *reshardWorkflow } +func (vrs *VtctlReshard) ReverseReads() { + //TODO implement me + panic("implement me") +} + +func (vrs *VtctlReshard) ReverseWrites() { + //TODO implement me + panic("implement me") +} + func (vrs *VtctlReshard) Flavor() string { return "vtctl" } @@ -341,6 +389,11 @@ func (vrs *VtctlReshard) MirrorTraffic() { panic("implement me") } +func (vrs *VtctlReshard) Status() { + currentWorkflowType = binlogdatapb.VReplicationWorkflowType_Reshard + vrs.exec("Status") +} + func (vrs *VtctlReshard) SwitchReadsAndWrites() { vrs.exec(workflowActionSwitchTraffic) } @@ -409,9 +462,9 @@ func (v VtctldReshard) exec(args ...string) { args2 := []string{"Reshard", "--workflow=" + v.workflowName, "--target-keyspace=" + v.targetKeyspace} args2 = append(args2, args...) var err error - if v.lastOutput, err = vc.VtctldClient.ExecuteCommandWithOutput(args2...); err != nil { - v.vc.t.Fatalf("failed to create Reshard workflow: %v: %s", err, v.lastOutput) - } + v.vc.t.Logf("Executing command: vtctldclient %s", strings.Join(args2, " ")) + v.lastOutput, err = vc.VtctldClient.ExecuteCommandWithOutput(args2...) + require.NoError(v.vc.t, err, "failed Reshard action, error: %v: output: %s", err, v.lastOutput) } func (v VtctldReshard) Create() { @@ -448,14 +501,36 @@ func (v VtctldReshard) Show() { v.exec("Show") } +func (v *VtctldReshard) Status() { + v.exec("Status") +} + func (v VtctldReshard) SwitchReads() { - // TODO implement me - panic("implement me") + args := []string{"SwitchTraffic"} + args = append(args, v.switchFlags...) + args = append(args, "--tablet-types=rdonly,replica") + v.exec(args...) } func (v VtctldReshard) SwitchWrites() { - // TODO implement me - panic("implement me") + args := []string{"SwitchTraffic"} + args = append(args, v.switchFlags...) + args = append(args, "--tablet-types=primary") + v.exec(args...) +} + +func (v VtctldReshard) ReverseReads() { + args := []string{"ReverseTraffic"} + args = append(args, v.switchFlags...) + args = append(args, "--tablet-types=rdonly,replica") + v.exec(args...) +} + +func (v VtctldReshard) ReverseWrites() { + args := []string{"ReverseTraffic"} + args = append(args, v.switchFlags...) + args = append(args, "--tablet-types=primary") + v.exec(args...) } func (v VtctldReshard) Cancel() { diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go index 80c8569978c..a9242016f40 100644 --- a/go/vt/vtctl/workflow/server.go +++ b/go/vt/vtctl/workflow/server.go @@ -3154,12 +3154,13 @@ func (s *Server) WorkflowSwitchTraffic(ctx context.Context, req *vtctldatapb.Wor span.Annotate("tablet-types", req.TabletTypes) span.Annotate("direction", req.Direction) span.Annotate("enable-reverse-replication", req.EnableReverseReplication) + span.Annotate("shards", req.Shards) span.Annotate("force", req.Force) var ( - dryRunResults []string - rdDryRunResults, wrDryRunResults *[]string - hasReplica, hasRdonly, hasPrimary bool + dryRunResults []string + rdDryRunResults, wrDryRunResults *[]string + switchReplica, switchRdonly, switchPrimary bool ) timeout, set, err := protoutil.DurationFromProto(req.GetTimeout()) if err != nil { @@ -3175,6 +3176,19 @@ func (s *Server) WorkflowSwitchTraffic(ctx context.Context, req *vtctldatapb.Wor if timeout.Seconds() < 1 { return nil, vterrors.Wrap(err, "timeout must be at least 1 second") } + maxReplicationLagAllowed, set, err := protoutil.DurationFromProto(req.MaxReplicationLagAllowed) + if err != nil { + err = vterrors.Wrapf(err, "unable to parse MaxReplicationLagAllowed into a valid duration") + return nil, err + } + if !set { + maxReplicationLagAllowed = DefaultTimeout + } + direction := TrafficSwitchDirection(req.Direction) + switchReplica, switchRdonly, switchPrimary, err = parseTabletTypes(req.TabletTypes) + if err != nil { + return nil, err + } ts, startState, err := s.getWorkflowState(ctx, req.Keyspace, req.Workflow) if err != nil { return nil, err @@ -3184,46 +3198,53 @@ func (s *Server) WorkflowSwitchTraffic(ctx context.Context, req *vtctldatapb.Wor return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "invalid action for Migrate workflow: SwitchTraffic") } - maxReplicationLagAllowed, set, err := protoutil.DurationFromProto(req.MaxReplicationLagAllowed) - if err != nil { - err = vterrors.Wrapf(err, "unable to parse MaxReplicationLagAllowed into a valid duration") - return nil, err + if direction == DirectionBackward && ts.IsMultiTenantMigration() { + // In a multi-tenant migration, multiple migrations would be writing to the same + // table, so we can't stop writes like we do with MoveTables, using denied tables, + // since it would block all other migrations as well as traffic for tenants which + // have already been migrated. + return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "cannot reverse traffic for multi-tenant migrations") } - if !set { - maxReplicationLagAllowed = DefaultTimeout - } - direction := TrafficSwitchDirection(req.Direction) - if direction == DirectionBackward { - ts, startState, err = s.getWorkflowState(ctx, startState.SourceKeyspace, ts.reverseWorkflow) + + // We need this to know when there isn't a (non-FROZEN) reverse workflow to use. + onlySwitchingReads := !startState.WritesSwitched && !switchPrimary + + // We need this for idempotency and to avoid unnecessary work and resulting risk. + writesAlreadySwitched := (direction == DirectionForward && startState.WritesSwitched) || + (direction == DirectionBackward && !startState.WritesSwitched) + + if direction == DirectionBackward && !onlySwitchingReads { + // This means that the main workflow is FROZEN and the reverse workflow + // exists. So we update the starting state so that we're using the reverse + // workflow and we can move forward with a normal traffic switch forward + // operation, from the reverse workflow's perspective. + ts, startState, err = s.getWorkflowState(ctx, ts.sourceKeyspace, ts.reverseWorkflow) if err != nil { return nil, err } - if ts.IsMultiTenantMigration() { - // In a multi-tenant migration, multiple migrations would be writing to the same table, so we can't stop writes like - // we do with MoveTables, using denied tables, since it would block all other migrations as well as traffic for - // tenants which have already been migrated. - return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "cannot reverse traffic for multi-tenant migrations") - } + direction = DirectionForward } ts.force = req.GetForce() - reason, err := s.canSwitch(ctx, ts, startState, direction, int64(maxReplicationLagAllowed.Seconds()), req.GetShards()) - if err != nil { - return nil, err - } - if reason != "" { - return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "cannot switch traffic for workflow %s at this time: %s", - startState.Workflow, reason) - } - hasReplica, hasRdonly, hasPrimary, err = parseTabletTypes(req.TabletTypes) - if err != nil { - return nil, err + if writesAlreadySwitched { + s.Logger().Infof("Writes already switched no need to check lag for the %s.%s workflow", + ts.targetKeyspace, ts.workflow) + } else { + reason, err := s.canSwitch(ctx, ts, int64(maxReplicationLagAllowed.Seconds()), req.GetShards()) + if err != nil { + return nil, err + } + if reason != "" { + return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "cannot switch traffic for workflow %s at this time: %s", + startState.Workflow, reason) + } } - if hasReplica || hasRdonly { + + if switchReplica || switchRdonly { // If we're going to switch writes immediately after then we don't need to // rebuild the SrvVSchema here as we will do it after switching writes. - if rdDryRunResults, err = s.switchReads(ctx, req, ts, startState, !hasPrimary /* rebuildSrvVSchema */, direction); err != nil { + if rdDryRunResults, err = s.switchReads(ctx, req, ts, startState, !switchPrimary /* rebuildSrvVSchema */, direction); err != nil { return nil, err } s.Logger().Infof("Switch Reads done for workflow %s.%s", req.Keyspace, req.Workflow) @@ -3231,7 +3252,8 @@ func (s *Server) WorkflowSwitchTraffic(ctx context.Context, req *vtctldatapb.Wor if rdDryRunResults != nil { dryRunResults = append(dryRunResults, *rdDryRunResults...) } - if hasPrimary { + + if switchPrimary { if _, wrDryRunResults, err = s.switchWrites(ctx, req, ts, timeout, false); err != nil { return nil, err } @@ -3244,8 +3266,10 @@ func (s *Server) WorkflowSwitchTraffic(ctx context.Context, req *vtctldatapb.Wor if req.DryRun && len(dryRunResults) == 0 { dryRunResults = append(dryRunResults, "No changes required") } + cmd := "SwitchTraffic" - if direction == DirectionBackward { + // We must check the original direction requested. + if TrafficSwitchDirection(req.Direction) == DirectionBackward { cmd = "ReverseTraffic" } s.Logger().Infof("%s done for workflow %s.%s", cmd, req.Keyspace, req.Workflow) @@ -3257,17 +3281,11 @@ func (s *Server) WorkflowSwitchTraffic(ctx context.Context, req *vtctldatapb.Wor } else { s.Logger().Infof("%s done for workflow %s.%s", cmd, req.Keyspace, req.Workflow) resp.Summary = fmt.Sprintf("%s was successful for workflow %s.%s", cmd, req.Keyspace, req.Workflow) - // Reload the state after the SwitchTraffic operation - // and return that as a string. - keyspace := req.Keyspace - workflow := req.Workflow - if direction == DirectionBackward { - keyspace = startState.SourceKeyspace - workflow = ts.reverseWorkflow - } + // Reload the state after the SwitchTraffic operation and return that + // as a string. resp.StartState = startState.String() s.Logger().Infof("Before reloading workflow state after switching traffic: %+v\n", resp.StartState) - _, currentState, err := s.getWorkflowState(ctx, keyspace, workflow) + _, currentState, err := s.getWorkflowState(ctx, ts.targetKeyspace, ts.workflow) if err != nil { resp.CurrentState = fmt.Sprintf("Error reloading workflow state after switching traffic: %v", err) } else { @@ -3275,6 +3293,7 @@ func (s *Server) WorkflowSwitchTraffic(ctx context.Context, req *vtctldatapb.Wor } s.Logger().Infof("%s done for workflow %s.%s, returning response %v", cmd, req.Keyspace, req.Workflow, resp) } + return resp, nil } @@ -3720,14 +3739,8 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit return ts.id, sw.logs(), nil } -func (s *Server) canSwitch(ctx context.Context, ts *trafficSwitcher, state *State, direction TrafficSwitchDirection, - maxAllowedReplLagSecs int64, shards []string) (reason string, err error) { - if direction == DirectionForward && state.WritesSwitched || - direction == DirectionBackward && !state.WritesSwitched { - s.Logger().Infof("writes already switched no need to check lag") - return "", nil - } - wf, err := s.GetWorkflow(ctx, state.TargetKeyspace, state.Workflow, false, shards) +func (s *Server) canSwitch(ctx context.Context, ts *trafficSwitcher, maxAllowedReplLagSecs int64, shards []string) (reason string, err error) { + wf, err := s.GetWorkflow(ctx, ts.targetKeyspace, ts.workflow, false, shards) if err != nil { return "", err } diff --git a/go/vt/vtctl/workflow/server_test.go b/go/vt/vtctl/workflow/server_test.go index 401ed625be5..70b62f0a505 100644 --- a/go/vt/vtctl/workflow/server_test.go +++ b/go/vt/vtctl/workflow/server_test.go @@ -49,6 +49,19 @@ import ( vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata" ) +var ( + allTabletTypes = []topodatapb.TabletType{ + topodatapb.TabletType_PRIMARY, + topodatapb.TabletType_REPLICA, + topodatapb.TabletType_RDONLY, + } + + roTabletTypes = []topodatapb.TabletType{ + topodatapb.TabletType_REPLICA, + topodatapb.TabletType_RDONLY, + } +) + type fakeTMC struct { tmclient.TabletManagerClient vrepQueriesByTablet map[string]map[string]*querypb.QueryResult @@ -971,12 +984,6 @@ func TestMoveTablesTrafficSwitching(t *testing.T) { targetKeyspaceName := "targetks" vrID := 1 - tabletTypes := []topodatapb.TabletType{ - topodatapb.TabletType_PRIMARY, - topodatapb.TabletType_REPLICA, - topodatapb.TabletType_RDONLY, - } - schema := map[string]*tabletmanagerdatapb.SchemaDefinition{ tableName: { TableDefinitions: []*tabletmanagerdatapb.TableDefinition{ @@ -1066,7 +1073,7 @@ func TestMoveTablesTrafficSwitching(t *testing.T) { Keyspace: targetKeyspaceName, Workflow: workflowName, Direction: int32(DirectionForward), - TabletTypes: tabletTypes, + TabletTypes: allTabletTypes, }, want: &vtctldatapb.WorkflowSwitchTrafficResponse{ Summary: fmt.Sprintf("SwitchTraffic was successful for workflow %s.%s", targetKeyspaceName, workflowName), @@ -1088,7 +1095,7 @@ func TestMoveTablesTrafficSwitching(t *testing.T) { Keyspace: targetKeyspaceName, Workflow: workflowName, Direction: int32(DirectionBackward), - TabletTypes: tabletTypes, + TabletTypes: allTabletTypes, }, want: &vtctldatapb.WorkflowSwitchTrafficResponse{ Summary: fmt.Sprintf("ReverseTraffic was successful for workflow %s.%s", targetKeyspaceName, workflowName), @@ -1096,6 +1103,28 @@ func TestMoveTablesTrafficSwitching(t *testing.T) { CurrentState: "Reads Not Switched. Writes Not Switched", }, }, + { + name: "backward for read-only tablets", + sourceKeyspace: &testKeyspace{ + KeyspaceName: sourceKeyspaceName, + ShardNames: []string{"0"}, + }, + targetKeyspace: &testKeyspace{ + KeyspaceName: targetKeyspaceName, + ShardNames: []string{"-80", "80-"}, + }, + req: &vtctldatapb.WorkflowSwitchTrafficRequest{ + Keyspace: targetKeyspaceName, + Workflow: workflowName, + Direction: int32(DirectionBackward), + TabletTypes: roTabletTypes, + }, + want: &vtctldatapb.WorkflowSwitchTrafficResponse{ + Summary: fmt.Sprintf("ReverseTraffic was successful for workflow %s.%s", targetKeyspaceName, workflowName), + StartState: "All Reads Switched. Writes Not Switched", + CurrentState: "Reads Not Switched. Writes Not Switched", + }, + }, { name: "forward with tablet refresh error", sourceKeyspace: &testKeyspace{ @@ -1110,7 +1139,7 @@ func TestMoveTablesTrafficSwitching(t *testing.T) { Keyspace: targetKeyspaceName, Workflow: workflowName, Direction: int32(DirectionForward), - TabletTypes: tabletTypes, + TabletTypes: allTabletTypes, }, preFunc: func(env *testEnv) { env.tmc.SetRefreshStateError(env.tablets[sourceKeyspaceName][startingSourceTabletUID], errors.New("tablet refresh error")) @@ -1132,7 +1161,7 @@ func TestMoveTablesTrafficSwitching(t *testing.T) { Keyspace: targetKeyspaceName, Workflow: workflowName, Direction: int32(DirectionForward), - TabletTypes: tabletTypes, + TabletTypes: allTabletTypes, Force: true, }, preFunc: func(env *testEnv) { @@ -1172,22 +1201,28 @@ func TestMoveTablesTrafficSwitching(t *testing.T) { } else { env.tmc.reverse.Store(true) // Setup the routing rules as they would be after having previously done SwitchTraffic. - env.updateTableRoutingRules(t, ctx, tabletTypes, []string{tableName}, + env.updateTableRoutingRules(t, ctx, tc.req.TabletTypes, []string{tableName}, tc.sourceKeyspace.KeyspaceName, tc.targetKeyspace.KeyspaceName, tc.targetKeyspace.KeyspaceName) - env.tmc.expectVRQueryResultOnKeyspaceTablets(tc.sourceKeyspace.KeyspaceName, copyTableQR) - for i := 0; i < len(tc.targetKeyspace.ShardNames); i++ { // Per stream - env.tmc.expectVRQueryResultOnKeyspaceTablets(tc.sourceKeyspace.KeyspaceName, cutoverQR) - } - for i := 0; i < len(tc.targetKeyspace.ShardNames); i++ { // Per stream - env.tmc.expectVRQueryResultOnKeyspaceTablets(tc.targetKeyspace.KeyspaceName, journalQR) - } - for i := 0; i < len(tc.targetKeyspace.ShardNames); i++ { // Per stream - env.tmc.expectVRQueryResultOnKeyspaceTablets(tc.targetKeyspace.KeyspaceName, lockTableQR) + if !slices.Contains(tc.req.TabletTypes, topodatapb.TabletType_PRIMARY) { + for i := 0; i < len(tc.targetKeyspace.ShardNames); i++ { // Per stream + env.tmc.expectVRQueryResultOnKeyspaceTablets(tc.sourceKeyspace.KeyspaceName, journalQR) + } + } else { + env.tmc.expectVRQueryResultOnKeyspaceTablets(tc.sourceKeyspace.KeyspaceName, copyTableQR) + for i := 0; i < len(tc.targetKeyspace.ShardNames); i++ { // Per stream + env.tmc.expectVRQueryResultOnKeyspaceTablets(tc.targetKeyspace.KeyspaceName, journalQR) + } + for i := 0; i < len(tc.targetKeyspace.ShardNames); i++ { // Per stream + env.tmc.expectVRQueryResultOnKeyspaceTablets(tc.targetKeyspace.KeyspaceName, lockTableQR) + } + for i := 0; i < len(tc.targetKeyspace.ShardNames); i++ { // Per stream + env.tmc.expectVRQueryResultOnKeyspaceTablets(tc.sourceKeyspace.KeyspaceName, cutoverQR) + env.tmc.expectVRQueryResultOnKeyspaceTablets(tc.targetKeyspace.KeyspaceName, deleteWFQR) + env.tmc.expectVRQueryResultOnKeyspaceTablets(tc.targetKeyspace.KeyspaceName, createWFQR) + env.tmc.expectVRQueryResultOnKeyspaceTablets(tc.targetKeyspace.KeyspaceName, createJournalQR) + } + env.tmc.expectVRQueryResultOnKeyspaceTablets(tc.sourceKeyspace.KeyspaceName, freezeReverseWFQR) } - env.tmc.expectVRQueryResultOnKeyspaceTablets(tc.targetKeyspace.KeyspaceName, deleteWFQR) - env.tmc.expectVRQueryResultOnKeyspaceTablets(tc.targetKeyspace.KeyspaceName, createWFQR) - env.tmc.expectVRQueryResultOnKeyspaceTablets(tc.targetKeyspace.KeyspaceName, createJournalQR) - env.tmc.expectVRQueryResultOnKeyspaceTablets(tc.sourceKeyspace.KeyspaceName, freezeReverseWFQR) } if tc.preFunc != nil { tc.preFunc(env) @@ -1203,29 +1238,47 @@ func TestMoveTablesTrafficSwitching(t *testing.T) { // Confirm that we have the expected routing rules. rr, err := env.ts.GetRoutingRules(ctx) require.NoError(t, err) - to := fmt.Sprintf("%s.%s", tc.targetKeyspace.KeyspaceName, tableName) - if tc.req.Direction == int32(DirectionBackward) { - to = fmt.Sprintf("%s.%s", tc.sourceKeyspace.KeyspaceName, tableName) - } for _, rr := range rr.Rules { + _, rrTabletType, found := strings.Cut(rr.FromTable, "@") + if !found { // No @ is primary + rrTabletType = topodatapb.TabletType_PRIMARY.String() + } + tabletType, err := topoproto.ParseTabletType(rrTabletType) + require.NoError(t, err) + + var to string + if slices.Contains(tc.req.TabletTypes, tabletType) { + to = fmt.Sprintf("%s.%s", tc.targetKeyspace.KeyspaceName, tableName) + if tc.req.Direction == int32(DirectionBackward) { + to = fmt.Sprintf("%s.%s", tc.sourceKeyspace.KeyspaceName, tableName) + } + } else { + to = fmt.Sprintf("%s.%s", tc.sourceKeyspace.KeyspaceName, tableName) + if tc.req.Direction == int32(DirectionBackward) { + to = fmt.Sprintf("%s.%s", tc.targetKeyspace.KeyspaceName, tableName) + } + } for _, tt := range rr.ToTables { - require.Equal(t, to, tt) + require.Equal(t, to, tt, "Additional info: tablet type: %s, rr.FromTable: %s, rr.ToTables: %v, to string: %s", + tabletType.String(), rr.FromTable, rr.ToTables, to) } } // Confirm that we have the expected denied tables entries. - for _, keyspace := range []*testKeyspace{tc.sourceKeyspace, tc.targetKeyspace} { - for _, shardName := range keyspace.ShardNames { - si, err := env.ts.GetShard(ctx, keyspace.KeyspaceName, shardName) - require.NoError(t, err) - switch { - case keyspace == tc.sourceKeyspace && tc.req.Direction == int32(DirectionForward): - require.True(t, hasDeniedTableEntry(si)) - case keyspace == tc.sourceKeyspace && tc.req.Direction == int32(DirectionBackward): - require.False(t, hasDeniedTableEntry(si)) - case keyspace == tc.targetKeyspace && tc.req.Direction == int32(DirectionForward): - require.False(t, hasDeniedTableEntry(si)) - case keyspace == tc.targetKeyspace && tc.req.Direction == int32(DirectionBackward): - require.True(t, hasDeniedTableEntry(si)) + if slices.Contains(tc.req.TabletTypes, topodatapb.TabletType_PRIMARY) { + for _, keyspace := range []*testKeyspace{tc.sourceKeyspace, tc.targetKeyspace} { + for _, shardName := range keyspace.ShardNames { + si, err := env.ts.GetShard(ctx, keyspace.KeyspaceName, shardName) + require.NoError(t, err) + switch { + case keyspace == tc.sourceKeyspace && tc.req.Direction == int32(DirectionForward): + require.True(t, hasDeniedTableEntry(si)) + case keyspace == tc.sourceKeyspace && tc.req.Direction == int32(DirectionBackward): + require.False(t, hasDeniedTableEntry(si)) + case keyspace == tc.targetKeyspace && tc.req.Direction == int32(DirectionForward): + require.False(t, hasDeniedTableEntry(si)) + case keyspace == tc.targetKeyspace && tc.req.Direction == int32(DirectionBackward): + require.True(t, hasDeniedTableEntry(si)) + } } } } @@ -1246,11 +1299,6 @@ func TestMoveTablesTrafficSwitchingDryRun(t *testing.T) { sourceKeyspaceName := "sourceks" targetKeyspaceName := "targetks" vrID := 1 - tabletTypes := []topodatapb.TabletType{ - topodatapb.TabletType_PRIMARY, - topodatapb.TabletType_REPLICA, - topodatapb.TabletType_RDONLY, - } schema := map[string]*tabletmanagerdatapb.SchemaDefinition{ table1Name: { TableDefinitions: []*tabletmanagerdatapb.TableDefinition{ @@ -1303,7 +1351,7 @@ func TestMoveTablesTrafficSwitchingDryRun(t *testing.T) { Keyspace: targetKeyspaceName, Workflow: workflowName, Direction: int32(DirectionForward), - TabletTypes: tabletTypes, + TabletTypes: allTabletTypes, DryRun: true, }, want: []string{ @@ -1344,13 +1392,13 @@ func TestMoveTablesTrafficSwitchingDryRun(t *testing.T) { Keyspace: targetKeyspaceName, Workflow: workflowName, Direction: int32(DirectionBackward), - TabletTypes: tabletTypes, + TabletTypes: allTabletTypes, DryRun: true, }, want: []string{ fmt.Sprintf("Lock keyspace %s", targetKeyspaceName), fmt.Sprintf("Mirroring 0.00 percent of traffic from keyspace %s to keyspace %s for tablet types [REPLICA,RDONLY]", targetKeyspaceName, sourceKeyspaceName), - fmt.Sprintf("Switch reads for tables [%s] to keyspace %s for tablet types [REPLICA,RDONLY]", tablesStr, targetKeyspaceName), + fmt.Sprintf("Switch reads for tables [%s] to keyspace %s for tablet types [REPLICA,RDONLY]", tablesStr, sourceKeyspaceName), fmt.Sprintf("Routing rules for tables [%s] will be updated", tablesStr), fmt.Sprintf("Unlock keyspace %s", targetKeyspaceName), fmt.Sprintf("Lock keyspace %s", targetKeyspaceName), @@ -1371,6 +1419,32 @@ func TestMoveTablesTrafficSwitchingDryRun(t *testing.T) { fmt.Sprintf("Unlock keyspace %s", targetKeyspaceName), }, }, + { + name: "backward for read-only tablets", + sourceKeyspace: &testKeyspace{ + KeyspaceName: sourceKeyspaceName, + ShardNames: []string{"-80", "80-"}, + }, + targetKeyspace: &testKeyspace{ + KeyspaceName: targetKeyspaceName, + ShardNames: []string{"-80", "80-"}, + }, + req: &vtctldatapb.WorkflowSwitchTrafficRequest{ + Keyspace: targetKeyspaceName, + Workflow: workflowName, + Direction: int32(DirectionBackward), + TabletTypes: roTabletTypes, + DryRun: true, + }, + want: []string{ + fmt.Sprintf("Lock keyspace %s", sourceKeyspaceName), + fmt.Sprintf("Mirroring 0.00 percent of traffic from keyspace %s to keyspace %s for tablet types [REPLICA,RDONLY]", sourceKeyspaceName, targetKeyspaceName), + fmt.Sprintf("Switch reads for tables [%s] to keyspace %s for tablet types [REPLICA,RDONLY]", tablesStr, sourceKeyspaceName), + fmt.Sprintf("Routing rules for tables [%s] will be updated", tablesStr), + fmt.Sprintf("Serving VSchema will be rebuilt for the %s keyspace", sourceKeyspaceName), + fmt.Sprintf("Unlock keyspace %s", sourceKeyspaceName), + }, + }, } for _, tc := range testcases { t.Run(tc.name, func(t *testing.T) { @@ -1391,14 +1465,20 @@ func TestMoveTablesTrafficSwitchingDryRun(t *testing.T) { } else { env.tmc.reverse.Store(true) // Setup the routing rules as they would be after having previously done SwitchTraffic. - env.updateTableRoutingRules(t, ctx, tabletTypes, tables, + env.updateTableRoutingRules(t, ctx, tc.req.TabletTypes, tables, tc.sourceKeyspace.KeyspaceName, tc.targetKeyspace.KeyspaceName, tc.targetKeyspace.KeyspaceName) - env.tmc.expectVRQueryResultOnKeyspaceTablets(tc.sourceKeyspace.KeyspaceName, copyTableQR) - for i := 0; i < len(tc.targetKeyspace.ShardNames); i++ { // Per stream - env.tmc.expectVRQueryResultOnKeyspaceTablets(tc.targetKeyspace.KeyspaceName, journalQR) - } - for i := 0; i < len(tc.targetKeyspace.ShardNames); i++ { // Per stream - env.tmc.expectVRQueryResultOnKeyspaceTablets(tc.targetKeyspace.KeyspaceName, lockTableQR) + if !slices.Contains(tc.req.TabletTypes, topodatapb.TabletType_PRIMARY) { + for i := 0; i < len(tc.sourceKeyspace.ShardNames); i++ { // Per stream + env.tmc.expectVRQueryResultOnKeyspaceTablets(tc.sourceKeyspace.KeyspaceName, journalQR) + } + } else { + env.tmc.expectVRQueryResultOnKeyspaceTablets(tc.sourceKeyspace.KeyspaceName, copyTableQR) + for i := 0; i < len(tc.sourceKeyspace.ShardNames); i++ { // Per stream + env.tmc.expectVRQueryResultOnKeyspaceTablets(tc.targetKeyspace.KeyspaceName, journalQR) + } + for i := 0; i < len(tc.sourceKeyspace.ShardNames); i++ { // Per stream + env.tmc.expectVRQueryResultOnKeyspaceTablets(tc.targetKeyspace.KeyspaceName, lockTableQR) + } } } got, err := env.ws.WorkflowSwitchTraffic(ctx, tc.req) diff --git a/go/vt/vtctl/workflow/traffic_switcher.go b/go/vt/vtctl/workflow/traffic_switcher.go index 80638cd5973..dd4975f7d43 100644 --- a/go/vt/vtctl/workflow/traffic_switcher.go +++ b/go/vt/vtctl/workflow/traffic_switcher.go @@ -607,9 +607,17 @@ func (ts *trafficSwitcher) dropSourceShards(ctx context.Context) error { } func (ts *trafficSwitcher) switchShardReads(ctx context.Context, cells []string, servedTypes []topodatapb.TabletType, direction TrafficSwitchDirection) error { + ts.Logger().Infof("switchShardReads: workflow: %s, direction: %s, cells: %v, tablet types: %v", + ts.workflow, direction.String(), cells, servedTypes) + + var fromShards, toShards []*topo.ShardInfo + if direction == DirectionForward { + fromShards, toShards = ts.SourceShards(), ts.TargetShards() + } else { + fromShards, toShards = ts.TargetShards(), ts.SourceShards() + } + cellsStr := strings.Join(cells, ",") - ts.Logger().Infof("switchShardReads: cells: %s, tablet types: %+v, direction %d", cellsStr, servedTypes, direction) - fromShards, toShards := ts.SourceShards(), ts.TargetShards() if err := ts.TopoServer().ValidateSrvKeyspace(ctx, ts.TargetKeyspaceName(), cellsStr); err != nil { err2 := vterrors.Wrapf(err, "Before switching shard reads, found SrvKeyspace for %s is corrupt in cell %s", ts.TargetKeyspaceName(), cellsStr) @@ -638,7 +646,9 @@ func (ts *trafficSwitcher) switchShardReads(ctx context.Context, cells []string, } func (ts *trafficSwitcher) switchTableReads(ctx context.Context, cells []string, servedTypes []topodatapb.TabletType, rebuildSrvVSchema bool, direction TrafficSwitchDirection) error { - ts.Logger().Infof("switchTableReads: cells: %s, tablet types: %+v, direction: %s", strings.Join(cells, ","), servedTypes, direction) + ts.Logger().Infof("switchTableReads: workflow: %s, direction: %s, cells: %v, tablet types: %v", + ts.workflow, direction.String(), cells, servedTypes) + rules, err := topotools.GetRoutingRules(ctx, ts.TopoServer()) if err != nil { return err @@ -652,13 +662,19 @@ func (ts *trafficSwitcher) switchTableReads(ctx context.Context, cells []string, if servedType != topodatapb.TabletType_REPLICA && servedType != topodatapb.TabletType_RDONLY { return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "invalid tablet type specified when switching reads: %v", servedType) } - tt := strings.ToLower(servedType.String()) for _, table := range ts.Tables() { - toTarget := []string{ts.TargetKeyspaceName() + "." + table} - rules[table+"@"+tt] = toTarget - rules[ts.TargetKeyspaceName()+"."+table+"@"+tt] = toTarget - rules[ts.SourceKeyspaceName()+"."+table+"@"+tt] = toTarget + if direction == DirectionForward { + toTarget := []string{ts.TargetKeyspaceName() + "." + table} + rules[table+"@"+tt] = toTarget + rules[ts.TargetKeyspaceName()+"."+table+"@"+tt] = toTarget + rules[ts.SourceKeyspaceName()+"."+table+"@"+tt] = toTarget + } else { + toSource := []string{ts.SourceKeyspaceName() + "." + table} + rules[table+"@"+tt] = toSource + rules[ts.TargetKeyspaceName()+"."+table+"@"+tt] = toSource + rules[ts.SourceKeyspaceName()+"."+table+"@"+tt] = toSource + } } } if err := topotools.SaveRoutingRules(ctx, ts.TopoServer(), rules); err != nil {