Skip to content

Commit

Permalink
VReplication: Relax restrictions on Cancel and ReverseTraffic when wr…
Browse files Browse the repository at this point in the history
…ites not involved (#17128)

Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord authored Nov 6, 2024
1 parent a575982 commit e87457e
Show file tree
Hide file tree
Showing 5 changed files with 344 additions and 55 deletions.
12 changes: 12 additions & 0 deletions go/test/endtoend/vreplication/multi_tenant_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,18 @@ func TestMultiTenantSimple(t *testing.T) {
require.Zero(t, rowCount)
})

t.Run("cancel after switching reads", func(t *testing.T) {
// First let's test canceling the workflow after only switching reads
// to ensure that it properly cleans up all of the state.
createFunc()
mt.SwitchReads()
confirmOnlyReadsSwitched(t)
mt.Cancel()
confirmNoRoutingRules(t)
rowCount := getRowCount(t, vtgateConn, fmt.Sprintf("%s.%s", targetKeyspace, "t1"))
require.Zero(t, rowCount)
})

// Create again and run it to completion.
createFunc()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,22 @@ func testMoveTablesFlags3(t *testing.T, sourceKeyspace, targetKeyspace string, t
// Confirm that the source tables were renamed.
require.True(t, checkTablesExist(t, "zone1-100", []string{"_customer2_old"}))
require.False(t, checkTablesExist(t, "zone1-100", []string{"customer2"}))

// Confirm that we can cancel a workflow after ONLY switching read traffic.
mt = createMoveTables(t, sourceKeyspace, targetKeyspace, workflowName, "customer", createFlags, nil, nil)
mt.Start() // Need to start because we set stop-after-copy to true.
waitForWorkflowState(t, vc, ksWorkflow, binlogdatapb.VReplicationWorkflowState_Running.String())
for _, tab := range targetTabs {
catchup(t, tab, workflowName, "MoveTables")
}
mt.SwitchReads()
wf := mt.(iWorkflow)
validateReadsRouteToTarget(t, "replica")
validateTableRoutingRule(t, "customer", "replica", sourceKs, targetKs)
validateTableRoutingRule(t, "customer", "", targetKs, sourceKs)
confirmStates(t, &wf, wrangler.WorkflowStateNotSwitched, wrangler.WorkflowStateReadsSwitched)
mt.Cancel()
confirmNoRoutingRules(t)
}

// Create two workflows in order to confirm that listing all workflows works.
Expand Down Expand Up @@ -450,6 +466,7 @@ func splitShard(t *testing.T, keyspace, workflowName, sourceShards, targetShards
"--all-cells", "--format=json",
"--config-overrides", mapToCSV(overrides),
}

rs := newReshard(vc, &reshardWorkflow{
workflowInfo: &workflowInfo{
vc: vc,
Expand All @@ -460,7 +477,6 @@ func splitShard(t *testing.T, keyspace, workflowName, sourceShards, targetShards
targetShards: targetShards,
createFlags: createFlags,
}, workflowFlavorVtctld)

ksWorkflow := fmt.Sprintf("%s.%s", keyspace, workflowName)
wf := rs.(iWorkflow)
rs.Create()
Expand Down Expand Up @@ -769,8 +785,10 @@ func getRoutingRules(t *testing.T) *vschemapb.RoutingRules {
}

func confirmNoRoutingRules(t *testing.T) {
routingRulesResponse := getRoutingRules(t)
require.Zero(t, len(routingRulesResponse.Rules))
rrRes := getRoutingRules(t)
require.Zero(t, len(rrRes.Rules))
krrRes := getKeyspaceRoutingRules(t, vc)
require.Zero(t, len(krrRes.Rules))
}

func confirmRoutingRulesExist(t *testing.T) {
Expand Down
76 changes: 46 additions & 30 deletions go/vt/vtctl/workflow/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,9 +148,9 @@ var (
// ErrMultipleTargetKeyspaces occurs when a workflow somehow has multiple
// target keyspaces across different shard primaries. This should be
// impossible.
ErrMultipleTargetKeyspaces = errors.New("multiple target keyspaces for a single workflow")
ErrWorkflowNotFullySwitched = errors.New("cannot complete workflow because you have not yet switched all read and write traffic")
ErrWorkflowPartiallySwitched = errors.New("cannot cancel workflow because you have already switched some or all read and write traffic")
ErrMultipleTargetKeyspaces = errors.New("multiple target keyspaces for a single workflow")
ErrWorkflowCompleteNotFullySwitched = errors.New("cannot complete workflow because you have not yet switched all read and write traffic")
ErrWorkflowDeleteWritesSwitched = errors.New("cannot delete workflow because you have already switched write traffic")
)

// Server provides an API to work with Vitess workflows, like vreplication
Expand Down Expand Up @@ -1736,7 +1736,7 @@ func (s *Server) MoveTablesComplete(ctx context.Context, req *vtctldatapb.MoveTa
}

if !state.WritesSwitched || len(state.ReplicaCellsNotSwitched) > 0 || len(state.RdonlyCellsNotSwitched) > 0 {
return nil, ErrWorkflowNotFullySwitched
return nil, ErrWorkflowCompleteNotFullySwitched
}
var renameTable TableRemovalType
if req.RenameTables {
Expand Down Expand Up @@ -2111,10 +2111,12 @@ func (s *Server) WorkflowDelete(ctx context.Context, req *vtctldatapb.WorkflowDe
}

if ts.workflowType != binlogdatapb.VReplicationWorkflowType_CreateLookupIndex {
// Return an error if the workflow traffic is partially switched.
if state.WritesSwitched || len(state.ReplicaCellsSwitched) > 0 || len(state.RdonlyCellsSwitched) > 0 {
return nil, ErrWorkflowPartiallySwitched
// Return an error if the write workflow traffic is switched.
if state.WritesSwitched {
return nil, ErrWorkflowDeleteWritesSwitched
}
// If only reads have been switched, then we can delete the
// workflow and its related artifacts.
}

// Lock the workflow for deletion.
Expand Down Expand Up @@ -2158,22 +2160,21 @@ func (s *Server) WorkflowDelete(ctx context.Context, req *vtctldatapb.WorkflowDe
ts.workflowType)
}
// We need to delete the rows that the target tables would have for the tenant.
// We don't cleanup other related artifacts since they are not tied to the tenant.
if !req.GetKeepData() {
if err := s.deleteTenantData(ctx, ts, req.DeleteBatchSize); err != nil {
return nil, vterrors.Wrapf(err, "failed to fully delete all migrated data for tenant %s, please retry the operation",
ts.options.TenantId)
}
}
} else {
// Cleanup related data and artifacts. There are none for a LookupVindex workflow.
if ts.workflowType != binlogdatapb.VReplicationWorkflowType_CreateLookupIndex {
if _, err := s.dropTargets(ctx, ts, req.GetKeepData(), req.GetKeepRoutingRules(), false); err != nil {
if topo.IsErrType(err, topo.NoNode) {
return nil, vterrors.Wrapf(err, "%s keyspace does not exist", req.GetKeyspace())
}
return nil, err
}

// Cleanup related data and artifacts. There are none for a LookupVindex workflow.
if ts.workflowType != binlogdatapb.VReplicationWorkflowType_CreateLookupIndex {
if _, err := s.dropTargets(ctx, ts, req.GetKeepData(), req.GetKeepRoutingRules(), false); err != nil {
if topo.IsErrType(err, topo.NoNode) {
return nil, vterrors.Wrapf(err, "%s keyspace does not exist", req.GetKeyspace())
}
return nil, err
}
}

Expand Down Expand Up @@ -2697,8 +2698,10 @@ func (s *Server) dropTargets(ctx context.Context, ts *trafficSwitcher, keepData,
if !keepData {
switch ts.MigrationType() {
case binlogdatapb.MigrationType_TABLES:
if err := sw.removeTargetTables(ctx); err != nil {
return nil, err
if !ts.IsMultiTenantMigration() {
if err := sw.removeTargetTables(ctx); err != nil {
return nil, err
}
}
if err := sw.dropSourceDeniedTables(ctx); err != nil {
return nil, err
Expand Down Expand Up @@ -3239,12 +3242,13 @@ func (s *Server) WorkflowSwitchTraffic(ctx context.Context, req *vtctldatapb.Wor
return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "invalid action for Migrate workflow: SwitchTraffic")
}

if direction == DirectionBackward && ts.IsMultiTenantMigration() {
// In a multi-tenant migration, multiple migrations would be writing to the same
// table, so we can't stop writes like we do with MoveTables, using denied tables,
// since it would block all other migrations as well as traffic for tenants which
// have already been migrated.
return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "cannot reverse traffic for multi-tenant migrations")
if ts.IsMultiTenantMigration() {
// Multi-tenant migrations use keyspace routing rules, so we need to update the state
// using them.
err = updateKeyspaceRoutingState(ctx, ts.TopoServer(), ts.sourceKeyspace, ts.targetKeyspace, startState)
if err != nil {
return nil, vterrors.Wrap(err, "failed to update multi-tenant workflow state using keyspace routing rules")
}
}

// We need this to know when there isn't a (non-FROZEN) reverse workflow to use.
Expand All @@ -3255,6 +3259,13 @@ func (s *Server) WorkflowSwitchTraffic(ctx context.Context, req *vtctldatapb.Wor
(direction == DirectionBackward && !startState.WritesSwitched)

if direction == DirectionBackward && !onlySwitchingReads {
if ts.IsMultiTenantMigration() {
// In a multi-tenant migration, multiple migrations would be writing to the same
// table, so we can't stop writes like we do with MoveTables, using denied tables,
// since it would block all other migrations as well as traffic for tenants which
// have already been migrated.
return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "cannot reverse write traffic for multi-tenant migrations")
}
// This means that the main workflow is FROZEN and the reverse workflow
// exists. So we update the starting state so that we're using the reverse
// workflow and we can move forward with a normal traffic switch forward
Expand Down Expand Up @@ -3336,6 +3347,15 @@ func (s *Server) WorkflowSwitchTraffic(ctx context.Context, req *vtctldatapb.Wor
resp.StartState = startState.String()
s.Logger().Infof("Before reloading workflow state after switching traffic: %+v\n", resp.StartState)
_, currentState, err := s.getWorkflowState(ctx, ts.targetKeyspace, ts.workflow)
if ts.IsMultiTenantMigration() {
// Multi-tenant migrations use keyspace routing rules, so we need to update the state
// using them.
sourceKs, targetKs := ts.sourceKeyspace, ts.targetKeyspace
if TrafficSwitchDirection(req.Direction) == DirectionBackward {
sourceKs, targetKs = targetKs, sourceKs
}
err = updateKeyspaceRoutingState(ctx, ts.TopoServer(), sourceKs, targetKs, currentState)
}
if err != nil {
resp.CurrentState = fmt.Sprintf("Error reloading workflow state after switching traffic: %v", err)
} else {
Expand Down Expand Up @@ -3387,11 +3407,7 @@ func (s *Server) switchReads(ctx context.Context, req *vtctldatapb.WorkflowSwitc
// shard level traffic switching is all or nothing
trafficSwitchingIsAllOrNothing = true
case ts.MigrationType() == binlogdatapb.MigrationType_TABLES && ts.IsMultiTenantMigration():
if direction == DirectionBackward {
return defaultErrorHandler(ts.Logger(), "invalid request", vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT,
"requesting reversal of read traffic for multi-tenant migrations is not supported"))
}
// For multi-tenant migrations, we only support switching traffic to all cells at once
// For multi-tenant migrations, we only support switching traffic to all cells at once.
allCells, err := ts.TopoServer().GetCellInfoNames(ctx)
if err != nil {
return nil, err
Expand All @@ -3415,7 +3431,7 @@ func (s *Server) switchReads(ctx context.Context, req *vtctldatapb.WorkflowSwitc
}
if direction == DirectionBackward && switchRdonly && len(state.RdonlyCellsSwitched) == 0 {
return defaultErrorHandler(ts.Logger(), "invalid request", vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION,
"requesting reversal of SwitchReads for RDONLYs but RDONLY reads have not been switched"))
"requesting reversal of read traffic for RDONLYs but RDONLY reads have not been switched"))
}
}

Expand Down
Loading

0 comments on commit e87457e

Please sign in to comment.