diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go index 22ae49883dc..d0645a3a578 100644 --- a/go/vt/vtctl/workflow/server.go +++ b/go/vt/vtctl/workflow/server.go @@ -44,7 +44,6 @@ import ( "vitess.io/vitess/go/vt/concurrency" "vitess.io/vitess/go/vt/discovery" "vitess.io/vitess/go/vt/key" - "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/logutil" "vitess.io/vitess/go/vt/mysqlctl/tmutils" "vitess.io/vitess/go/vt/schema" @@ -65,7 +64,6 @@ import ( binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" querypb "vitess.io/vitess/go/vt/proto/query" tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" - "vitess.io/vitess/go/vt/proto/topodata" topodatapb "vitess.io/vitess/go/vt/proto/topodata" vschemapb "vitess.io/vitess/go/vt/proto/vschema" vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata" @@ -155,18 +153,26 @@ type Server struct { ts *topo.Server tmc tmclient.TabletManagerClient // Limit the number of concurrent background goroutines if needed. - sem *semaphore.Weighted - env *vtenv.Environment + sem *semaphore.Weighted + env *vtenv.Environment + options serverOptions } // NewServer returns a new server instance with the given topo.Server and // TabletManagerClient. -func NewServer(env *vtenv.Environment, ts *topo.Server, tmc tmclient.TabletManagerClient) *Server { - return &Server{ +func NewServer(env *vtenv.Environment, ts *topo.Server, tmc tmclient.TabletManagerClient, opts ...ServerOption) *Server { + s := &Server{ ts: ts, tmc: tmc, env: env, } + for _, o := range opts { + o.apply(&s.options) + } + if s.options.logger == nil { + s.options.logger = logutil.NewConsoleLogger() // Use the default system logger + } + return s } func (s *Server) SQLParser() *sqlparser.Parser { @@ -330,7 +336,7 @@ func (s *Server) GetCellsWithTableReadsSwitched( for _, to := range rule.ToTables { ks, err := getKeyspace(to) if err != nil { - log.Errorf(err.Error()) + s.Logger().Errorf(err.Error()) return nil, nil, err } @@ -851,7 +857,7 @@ ORDER BY } if stream.Id > streamLog.StreamId { - log.Warningf("Found stream log for nonexistent stream: %+v", streamLog) + s.Logger().Warningf("Found stream log for nonexistent stream: %+v", streamLog) // This can happen on manual/failed workflow cleanup so keep going. continue } @@ -941,7 +947,7 @@ ORDER BY func (s *Server) getWorkflowState(ctx context.Context, targetKeyspace, workflowName string) (*trafficSwitcher, *State, error) { ts, err := s.buildTrafficSwitcher(ctx, targetKeyspace, workflowName) if err != nil { - log.Errorf("buildTrafficSwitcher failed: %v", err) + s.Logger().Errorf("buildTrafficSwitcher failed: %v", err) return nil, nil, err } @@ -1323,7 +1329,7 @@ func (s *Server) moveTablesCreate(ctx context.Context, req *vtctldatapb.MoveTabl return nil, err } sourceTopo = externalTopo - log.Infof("Successfully opened external topo: %+v", externalTopo) + s.Logger().Infof("Successfully opened external topo: %+v", externalTopo) } var vschema *vschemapb.Keyspace @@ -1380,7 +1386,7 @@ func (s *Server) moveTablesCreate(ctx context.Context, req *vtctldatapb.MoveTabl if len(tables) == 0 { return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "no tables to move") } - log.Infof("Found tables to move: %s", strings.Join(tables, ",")) + s.Logger().Infof("Found tables to move: %s", strings.Join(tables, ",")) if !vschema.Sharded { // Save the original in case we need to restore it for a late failure @@ -1536,12 +1542,12 @@ func (s *Server) moveTablesCreate(ctx context.Context, req *vtctldatapb.MoveTabl return nil, err } if exists { - log.Errorf("Found a previous journal entry for %d", migrationID) + s.Logger().Errorf("Found a previous journal entry for %d", migrationID) msg := fmt.Sprintf("found an entry from a previous run for migration id %d in _vt.resharding_journal on tablets %s, ", migrationID, strings.Join(tablets, ",")) msg += fmt.Sprintf("please review and delete it before proceeding and then start the workflow using: MoveTables --workflow %s --target-keyspace %s start", req.Workflow, req.TargetKeyspace) - return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, msg) + return nil, vterrors.New(vtrpcpb.Code_INTERNAL, msg) } } @@ -1599,7 +1605,7 @@ func (s *Server) setupInitialRoutingRules(ctx context.Context, req *vtctldatapb. targetKeyspace := req.TargetKeyspace if req.NoRoutingRules { - log.Warningf("Found --no-routing-rules flag, not creating routing rules for workflow %s.%s", targetKeyspace, req.Workflow) + s.Logger().Warningf("Found --no-routing-rules flag, not creating routing rules for workflow %s.%s", targetKeyspace, req.Workflow) return nil } @@ -1612,7 +1618,7 @@ func (s *Server) setupInitialRoutingRules(ctx context.Context, req *vtctldatapb. } if mz.IsMultiTenantMigration() { - log.Infof("Setting up keyspace routing rules for workflow %s.%s", targetKeyspace, req.Workflow) + s.Logger().Infof("Setting up keyspace routing rules for workflow %s.%s", targetKeyspace, req.Workflow) // Note that you can never point the target keyspace to the source keyspace in a multi-tenant migration // since the target takes write traffic for all tenants! routes := make(map[string]string) @@ -1729,7 +1735,7 @@ func (s *Server) ReshardCreate(ctx context.Context, req *vtctldatapb.ReshardCrea if err := s.ts.ValidateSrvKeyspace(ctx, keyspace, strings.Join(cells, ",")); err != nil { err2 := vterrors.Wrapf(err, "SrvKeyspace for keyspace %s is corrupt for cell(s) %s", keyspace, cells) - log.Errorf("%v", err2) + s.Logger().Errorf("%v", err2) return nil, err } tabletTypesStr := discovery.BuildTabletTypesString(req.TabletTypes, req.TabletSelectionPreference) @@ -1754,7 +1760,7 @@ func (s *Server) ReshardCreate(ctx context.Context, req *vtctldatapb.ReshardCrea return nil, vterrors.Wrap(err, "startStreams") } } else { - log.Warningf("Streams will not be started since --auto-start is set to false") + s.Logger().Warningf("Streams will not be started since --auto-start is set to false") } return s.WorkflowStatus(ctx, &vtctldatapb.WorkflowStatusRequest{ Keyspace: req.Keyspace, @@ -1841,7 +1847,7 @@ func (s *Server) VDiffCreate(ctx context.Context, req *vtctldatapb.VDiffCreateRe return nil, err } if workflowStatus != binlogdatapb.VReplicationWorkflowState_Running { - log.Infof("Workflow %s.%s is not running, cannot start VDiff in state %s", req.TargetKeyspace, req.Workflow, workflowStatus) + s.Logger().Infof("Workflow %s.%s is not running, cannot start VDiff in state %s", req.TargetKeyspace, req.Workflow, workflowStatus) return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "not all streams are running in workflow %s.%s", req.TargetKeyspace, req.Workflow) } @@ -1851,7 +1857,7 @@ func (s *Server) VDiffCreate(ctx context.Context, req *vtctldatapb.VDiffCreateRe return err }) if err != nil { - log.Errorf("Error executing vdiff create action: %v", err) + s.Logger().Errorf("Error executing vdiff create action: %v", err) return nil, err } @@ -1886,7 +1892,7 @@ func (s *Server) VDiffDelete(ctx context.Context, req *vtctldatapb.VDiffDeleteRe return err }) if err != nil { - log.Errorf("Error executing vdiff delete action: %v", err) + s.Logger().Errorf("Error executing vdiff delete action: %v", err) return nil, err } @@ -1919,7 +1925,7 @@ func (s *Server) VDiffResume(ctx context.Context, req *vtctldatapb.VDiffResumeRe return err }) if err != nil { - log.Errorf("Error executing vdiff resume action: %v", err) + s.Logger().Errorf("Error executing vdiff resume action: %v", err) return nil, err } @@ -1959,7 +1965,7 @@ func (s *Server) VDiffShow(ctx context.Context, req *vtctldatapb.VDiffShowReques return err }) if output.err != nil { - log.Errorf("Error executing vdiff show action: %v", output.err) + s.Logger().Errorf("Error executing vdiff show action: %v", output.err) return nil, output.err } return &vtctldatapb.VDiffShowResponse{ @@ -1993,7 +1999,7 @@ func (s *Server) VDiffStop(ctx context.Context, req *vtctldatapb.VDiffStopReques return err }) if err != nil { - log.Errorf("Error executing vdiff stop action: %v", err) + s.Logger().Errorf("Error executing vdiff stop action: %v", err) return nil, err } @@ -2015,7 +2021,7 @@ func (s *Server) WorkflowDelete(ctx context.Context, req *vtctldatapb.WorkflowDe ts, state, err := s.getWorkflowState(ctx, req.GetKeyspace(), req.GetWorkflow()) if err != nil { - log.Errorf("failed to get VReplication workflow state for %s.%s: %v", req.GetKeyspace(), req.GetWorkflow(), err) + s.Logger().Errorf("failed to get VReplication workflow state for %s.%s: %v", req.GetKeyspace(), req.GetWorkflow(), err) return nil, err } @@ -2509,7 +2515,7 @@ func (s *Server) deleteWorkflowVDiffData(ctx context.Context, tablet *topodatapb Action: string(vdiff.DeleteAction), ActionArg: vdiff.AllActionArg, }); err != nil { - log.Errorf("Error deleting vdiff data for %s.%s workflow: %v", tablet.Keyspace, workflow, err) + s.Logger().Errorf("Error deleting vdiff data for %s.%s workflow: %v", tablet.Keyspace, workflow, err) } } @@ -2529,7 +2535,7 @@ func (s *Server) deleteWorkflowVDiffData(ctx context.Context, tablet *topodatapb func (s *Server) optimizeCopyStateTable(tablet *topodatapb.Tablet) { if s.sem != nil { if !s.sem.TryAcquire(1) { - log.Warningf("Deferring work to optimize the copy_state table on %q due to hitting the maximum concurrent background job limit.", + s.Logger().Warningf("Deferring work to optimize the copy_state table on %q due to hitting the maximum concurrent background job limit.", tablet.Alias.String()) return } @@ -2550,7 +2556,7 @@ func (s *Server) optimizeCopyStateTable(tablet *topodatapb.Tablet) { if IsTableDidNotExistError(err) { return } - log.Warningf("Failed to optimize the copy_state table on %q: %v", tablet.Alias.String(), err) + s.Logger().Warningf("Failed to optimize the copy_state table on %q: %v", tablet.Alias.String(), err) } // This will automatically set the value to 1 or the current max value in the // table, whichever is greater. @@ -2559,7 +2565,7 @@ func (s *Server) optimizeCopyStateTable(tablet *topodatapb.Tablet) { Query: []byte(sqlResetAutoInc), MaxRows: uint64(0), }); err != nil { - log.Warningf("Failed to reset the auto_increment value for the copy_state table on %q: %v", + s.Logger().Warningf("Failed to reset the auto_increment value for the copy_state table on %q: %v", tablet.Alias.String(), err) } }() @@ -2628,14 +2634,14 @@ func (s *Server) DropTargets(ctx context.Context, ts *trafficSwitcher, keepData, func (s *Server) buildTrafficSwitcher(ctx context.Context, targetKeyspace, workflowName string) (*trafficSwitcher, error) { tgtInfo, err := BuildTargets(ctx, s.ts, s.tmc, targetKeyspace, workflowName) if err != nil { - log.Infof("Error building targets: %s", err) + s.Logger().Infof("Error building targets: %s", err) return nil, err } targets, frozen, optCells, optTabletTypes := tgtInfo.Targets, tgtInfo.Frozen, tgtInfo.OptCells, tgtInfo.OptTabletTypes ts := &trafficSwitcher{ ws: s, - logger: logutil.NewConsoleLogger(), + logger: s.Logger(), workflow: workflowName, reverseWorkflow: ReverseWorkflowName(workflowName), id: HashStreams(targetKeyspace, targets), @@ -2649,7 +2655,7 @@ func (s *Server) buildTrafficSwitcher(ctx context.Context, targetKeyspace, workf workflowSubType: tgtInfo.WorkflowSubType, options: tgtInfo.Options, } - log.Infof("Migration ID for workflow %s: %d", workflowName, ts.id) + s.Logger().Infof("Migration ID for workflow %s: %d", workflowName, ts.id) sourceTopo := s.ts // Build the sources. @@ -2736,7 +2742,7 @@ func (s *Server) buildTrafficSwitcher(ctx context.Context, targetKeyspace, workf return nil, err } if ts.isPartialMigration { - log.Infof("Migration is partial, for shards %+v", sourceShards) + s.Logger().Infof("Migration is partial, for shards %+v", sourceShards) } return ts, nil } @@ -2803,7 +2809,7 @@ func (s *Server) dropSources(ctx context.Context, ts *trafficSwitcher, removalTy if !keepData { switch ts.MigrationType() { case binlogdatapb.MigrationType_TABLES: - log.Infof("Deleting tables") + s.Logger().Infof("Deleting tables") if err := sw.removeSourceTables(ctx, removalType); err != nil { return nil, err } @@ -2815,7 +2821,7 @@ func (s *Server) dropSources(ctx context.Context, ts *trafficSwitcher, removalTy } case binlogdatapb.MigrationType_SHARDS: - log.Infof("Removing shards") + s.Logger().Infof("Removing shards") if err := sw.dropSourceShards(ctx); err != nil { return nil, err } @@ -2862,7 +2868,7 @@ func (s *Server) DeleteShard(ctx context.Context, keyspace, shard string, recurs shardInfo, err := s.ts.GetShard(ctx, keyspace, shard) if err != nil { if topo.IsErrType(err, topo.NoNode) { - log.Warningf("Shard %v/%v did not exist when attempting to remove it", keyspace, shard) + s.Logger().Warningf("Shard %v/%v did not exist when attempting to remove it", keyspace, shard) return nil } return err @@ -2935,11 +2941,11 @@ func (s *Server) DeleteShard(ctx context.Context, keyspace, shard string, recurs return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "shard %v/%v still has %v tablets in cell %v; use --recursive or remove them manually", keyspace, shard, len(tabletMap), cell) } - log.Infof("Deleting all tablets in shard %v/%v cell %v", keyspace, shard, cell) + s.Logger().Infof("Deleting all tablets in shard %v/%v cell %v", keyspace, shard, cell) for tabletAlias, tabletInfo := range tabletMap { // We don't care about scrapping or updating the replication graph, // because we're about to delete the entire replication graph. - log.Infof("Deleting tablet %v", tabletAlias) + s.Logger().Infof("Deleting tablet %v", tabletAlias) if err := s.ts.DeleteTablet(ctx, tabletInfo.Alias); err != nil && !topo.IsErrType(err, topo.NoNode) { // We don't want to continue if a DeleteTablet fails for // any good reason (other than missing tablet, in which @@ -2960,7 +2966,7 @@ func (s *Server) DeleteShard(ctx context.Context, keyspace, shard string, recurs // regardless of its existence. for _, cell := range cells { if err := s.ts.DeleteShardReplication(ctx, cell, keyspace, shard); err != nil && !topo.IsErrType(err, topo.NoNode) { - log.Warningf("Cannot delete ShardReplication in cell %v for %v/%v: %v", cell, keyspace, shard, err) + s.Logger().Warningf("Cannot delete ShardReplication in cell %v for %v/%v: %v", cell, keyspace, shard, err) } } @@ -2991,7 +2997,7 @@ func (s *Server) refreshPrimaryTablets(ctx context.Context, shards []*topo.Shard if err := s.tmc.RefreshState(ctx, ti.Tablet); err != nil { rec.RecordError(err) } else { - log.Infof("%v responded", topoproto.TabletAliasString(si.PrimaryAlias)) + s.Logger().Infof("%v responded", topoproto.TabletAliasString(si.PrimaryAlias)) } }(si) } @@ -3036,7 +3042,7 @@ func (s *Server) finalizeMigrateWorkflow(ctx context.Context, ts *trafficSwitche return nil, err } } - log.Infof("cancel is %t, keepData %t", cancel, keepData) + s.Logger().Infof("cancel is %t, keepData %t", cancel, keepData) if cancel && !keepData { if err := sw.removeTargetTables(ctx); err != nil { return nil, err @@ -3113,7 +3119,7 @@ func (s *Server) WorkflowSwitchTraffic(ctx context.Context, req *vtctldatapb.Wor if rdDryRunResults, err = s.switchReads(ctx, req, ts, startState, !hasPrimary /* rebuildSrvVSchema */, direction); err != nil { return nil, err } - log.Infof("Switch Reads done for workflow %s.%s", req.Keyspace, req.Workflow) + s.Logger().Infof("Switch Reads done for workflow %s.%s", req.Keyspace, req.Workflow) } if rdDryRunResults != nil { dryRunResults = append(dryRunResults, *rdDryRunResults...) @@ -3122,7 +3128,7 @@ func (s *Server) WorkflowSwitchTraffic(ctx context.Context, req *vtctldatapb.Wor if _, wrDryRunResults, err = s.switchWrites(ctx, req, ts, timeout, false); err != nil { return nil, err } - log.Infof("Switch Writes done for workflow %s.%s", req.Keyspace, req.Workflow) + s.Logger().Infof("Switch Writes done for workflow %s.%s", req.Keyspace, req.Workflow) } if wrDryRunResults != nil { @@ -3135,13 +3141,13 @@ func (s *Server) WorkflowSwitchTraffic(ctx context.Context, req *vtctldatapb.Wor if direction == DirectionBackward { cmd = "ReverseTraffic" } - log.Infof("%s done for workflow %s.%s", cmd, req.Keyspace, req.Workflow) + s.Logger().Infof("%s done for workflow %s.%s", cmd, req.Keyspace, req.Workflow) resp := &vtctldatapb.WorkflowSwitchTrafficResponse{} if req.DryRun { resp.Summary = fmt.Sprintf("%s dry run results for workflow %s.%s at %v", cmd, req.Keyspace, req.Workflow, time.Now().UTC().Format(time.RFC822)) resp.DryRunResults = dryRunResults } else { - log.Infof("%s done for workflow %s.%s", cmd, req.Keyspace, req.Workflow) + s.Logger().Infof("%s done for workflow %s.%s", cmd, req.Keyspace, req.Workflow) resp.Summary = fmt.Sprintf("%s was successful for workflow %s.%s", cmd, req.Keyspace, req.Workflow) // Reload the state after the SwitchTraffic operation // and return that as a string. @@ -3152,14 +3158,14 @@ func (s *Server) WorkflowSwitchTraffic(ctx context.Context, req *vtctldatapb.Wor workflow = ts.reverseWorkflow } resp.StartState = startState.String() - log.Infof("Before reloading workflow state after switching traffic: %+v\n", resp.StartState) + s.Logger().Infof("Before reloading workflow state after switching traffic: %+v\n", resp.StartState) _, currentState, err := s.getWorkflowState(ctx, keyspace, workflow) if err != nil { resp.CurrentState = fmt.Sprintf("Error reloading workflow state after switching traffic: %v", err) } else { resp.CurrentState = currentState.String() } - log.Infof("%s done for workflow %s.%s, returning response %v", cmd, req.Keyspace, req.Workflow, resp) + s.Logger().Infof("%s done for workflow %s.%s, returning response %v", cmd, req.Keyspace, req.Workflow, resp) } return resp, nil } @@ -3188,7 +3194,7 @@ func (s *Server) switchReads(ctx context.Context, req *vtctldatapb.WorkflowSwitc cellsStr := strings.Join(req.Cells, ",") - log.Infof("Switching reads: %s.%s tablet types: %s, cells: %s, workflow state: %s", ts.targetKeyspace, ts.workflow, roTypesToSwitchStr, cellsStr, state.String()) + s.Logger().Infof("Switching reads: %s.%s tablet types: %s, cells: %s, workflow state: %s", ts.targetKeyspace, ts.workflow, roTypesToSwitchStr, cellsStr, state.String()) if !switchReplica && !switchRdonly { return defaultErrorHandler(ts.Logger(), "invalid tablet types", vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "tablet types must be REPLICA or RDONLY: %s", roTypesToSwitchStr)) @@ -3249,7 +3255,7 @@ func (s *Server) switchReads(ctx context.Context, req *vtctldatapb.WorkflowSwitc return defaultErrorHandler(ts.Logger(), fmt.Sprintf("failed to read journal in the %s keyspace", ts.SourceKeyspaceName()), err) } if journalsExist { - log.Infof("Found a previous journal entry for %d", ts.id) + s.Logger().Infof("Found a previous journal entry for %d", ts.id) } var sw iswitcher if req.DryRun { @@ -3346,7 +3352,7 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit // Consistently handle errors by logging and returning them. handleError := func(message string, err error) (int64, *[]string, error) { - werr := vterrors.Wrapf(err, message) + werr := vterrors.Wrap(err, message) ts.Logger().Error(werr) return 0, nil, werr } @@ -3409,7 +3415,7 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit } // Remove mirror rules for the primary tablet type. - if err := sw.mirrorTableTraffic(ctx, []topodata.TabletType{topodatapb.TabletType_PRIMARY}, 0); err != nil { + if err := sw.mirrorTableTraffic(ctx, []topodatapb.TabletType{topodatapb.TabletType_PRIMARY}, 0); err != nil { return handleError(fmt.Sprintf("failed to remove mirror rules from source keyspace %s to target keyspace %s, workflow %s, for primary tablet type", ts.SourceKeyspaceName(), ts.TargetKeyspaceName(), ts.WorkflowName()), err) } @@ -3587,7 +3593,7 @@ func (s *Server) canSwitch(ctx context.Context, ts *trafficSwitcher, state *Stat maxAllowedReplLagSecs int64, shards []string) (reason string, err error) { if direction == DirectionForward && state.WritesSwitched || direction == DirectionBackward && !state.WritesSwitched { - log.Infof("writes already switched no need to check lag") + s.Logger().Infof("writes already switched no need to check lag") return "", nil } wf, err := s.GetWorkflow(ctx, state.TargetKeyspace, state.Workflow, false, shards) @@ -3721,7 +3727,7 @@ func (s *Server) CopySchemaShard(ctx context.Context, sourceTabletAlias *topodat defer cancel() _, ok := schematools.ReloadShard(reloadCtx, s.ts, s.tmc, logutil.NewMemoryLogger(), destKeyspace, destShard, destPrimaryPos, nil, true) if !ok { - log.Error(vterrors.Errorf(vtrpcpb.Code_INTERNAL, "CopySchemaShard: failed to reload schema on all replicas")) + s.Logger().Error(vterrors.Errorf(vtrpcpb.Code_INTERNAL, "CopySchemaShard: failed to reload schema on all replicas")) } return err @@ -4224,14 +4230,14 @@ func (s *Server) WorkflowMirrorTraffic(ctx context.Context, req *vtctldatapb.Wor cmd := "MirrorTraffic" resp := &vtctldatapb.WorkflowMirrorTrafficResponse{} - log.Infof("Mirror Traffic done for workflow %s.%s", req.Keyspace, req.Workflow) + s.Logger().Infof("Mirror Traffic done for workflow %s.%s", req.Keyspace, req.Workflow) resp.Summary = fmt.Sprintf("%s was successful for workflow %s.%s", cmd, req.Keyspace, req.Workflow) // Reload the state after the MirrorTraffic operation // and return that as a string. keyspace := req.Keyspace workflow := req.Workflow resp.StartState = startState.String() - log.Infof("Before reloading workflow state after mirror traffic: %+v\n", resp.StartState) + s.Logger().Infof("Before reloading workflow state after mirror traffic: %+v\n", resp.StartState) _, currentState, err := s.getWorkflowState(ctx, keyspace, workflow) if err != nil { resp.CurrentState = fmt.Sprintf("Error reloading workflow state after mirror traffic: %v", err) @@ -4249,7 +4255,7 @@ func (s *Server) mirrorTraffic(ctx context.Context, req *vtctldatapb.WorkflowMir return err } - log.Infof("Mirroring traffic: %s.%s, workflow state: %s", ts.targetKeyspace, ts.workflow, state.String()) + s.Logger().Infof("Mirroring traffic: %s.%s, workflow state: %s", ts.targetKeyspace, ts.workflow, state.String()) sw := &switcher{ts: ts, s: s} @@ -4263,3 +4269,10 @@ func (s *Server) mirrorTraffic(ctx context.Context, req *vtctldatapb.WorkflowMir return nil } + +func (s *Server) Logger() logutil.Logger { + if s.options.logger == nil { + s.options.logger = logutil.NewConsoleLogger() // Use default system logger + } + return s.options.logger +} diff --git a/go/vt/vtctl/workflow/server_options.go b/go/vt/vtctl/workflow/server_options.go new file mode 100644 index 00000000000..ed6fdf284a9 --- /dev/null +++ b/go/vt/vtctl/workflow/server_options.go @@ -0,0 +1,56 @@ +/* +Copyright 2024 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package workflow + +import ( + "vitess.io/vitess/go/vt/logutil" +) + +// serverOptions configure a Workflow Server. serverOptions are set by +// the ServerOption values passed to the server functions. +type serverOptions struct { + logger logutil.Logger +} + +// ServerOption configures how we perform the certain operations. +type ServerOption interface { + apply(*serverOptions) +} + +// funcServerOption wraps a function that modifies serverOptions into +// an implementation of the ServerOption interface. +type funcServerOption struct { + f func(*serverOptions) +} + +func (fso *funcServerOption) apply(so *serverOptions) { + fso.f(so) +} + +func newFuncServerOption(f func(*serverOptions)) *funcServerOption { + return &funcServerOption{ + f: f, + } +} + +// WithLogger determines the customer logger to use. If this option +// is not provided then the default system logger will be used. +func WithLogger(l logutil.Logger) ServerOption { + return newFuncServerOption(func(o *serverOptions) { + o.logger = l + }) +} diff --git a/go/vt/vtctl/workflow/server_test.go b/go/vt/vtctl/workflow/server_test.go index be78b2ae4a9..1653f5001ce 100644 --- a/go/vt/vtctl/workflow/server_test.go +++ b/go/vt/vtctl/workflow/server_test.go @@ -32,6 +32,7 @@ import ( "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/test/utils" + "vitess.io/vitess/go/vt/logutil" "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topo/memorytopo" "vitess.io/vitess/go/vt/topo/topoproto" @@ -520,6 +521,7 @@ func TestWorkflowDelete(t *testing.T) { want *vtctldatapb.WorkflowDeleteResponse wantErr string postFunc func(t *testing.T, env *testEnv) + expectedLogs []string }{ { name: "missing table", @@ -562,6 +564,9 @@ func TestWorkflowDelete(t *testing.T) { result: &querypb.QueryResult{}, }, }, + expectedLogs: []string{ // Confirm that the custom logger is working as expected + fmt.Sprintf("Table `%s` did not exist when attempting to remove it", table2Name), + }, want: &vtctldatapb.WorkflowDeleteResponse{ Summary: fmt.Sprintf("Successfully cancelled the %s workflow in the %s keyspace", workflowName, targetKeyspaceName), @@ -685,6 +690,9 @@ func TestWorkflowDelete(t *testing.T) { require.NotNil(t, tc.req) env := newTestEnv(t, ctx, defaultCellName, tc.sourceKeyspace, tc.targetKeyspace) defer env.close() + memlogger := logutil.NewMemoryLogger() + defer memlogger.Clear() + env.ws.options.logger = memlogger env.tmc.schema = schema if tc.expectedSourceQueries != nil { require.NotNil(t, env.tablets[tc.sourceKeyspace.KeyspaceName]) @@ -724,6 +732,16 @@ func TestWorkflowDelete(t *testing.T) { } } } + logs := memlogger.String() + // Confirm that the custom logger was passed on to the trafficSwitcher + // if we didn't expect/want an error as otherwise we may not have made + // it into the trafficSwitcher. + if tc.wantErr == "" { + require.Contains(t, logs, "traffic_switcher.go") + } + for _, expectedLog := range tc.expectedLogs { + require.Contains(t, logs, expectedLog) + } }) } } diff --git a/go/vt/vtctl/workflow/traffic_switcher.go b/go/vt/vtctl/workflow/traffic_switcher.go index bcc42d13ce9..044efd0d9cf 100644 --- a/go/vt/vtctl/workflow/traffic_switcher.go +++ b/go/vt/vtctl/workflow/traffic_switcher.go @@ -35,7 +35,6 @@ import ( "vitess.io/vitess/go/vt/binlog/binlogplayer" "vitess.io/vitess/go/vt/concurrency" "vitess.io/vitess/go/vt/key" - "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/logutil" "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/topo" @@ -251,7 +250,7 @@ func (ts *trafficSwitcher) TopoServer() *topo.Server { func (ts *trafficSwitcher) TabletManagerClient() tmclient.TabletManagerClient { return ts.ws.tmc } func (ts *trafficSwitcher) Logger() logutil.Logger { if ts.logger == nil { - ts.logger = logutil.NewConsoleLogger() + ts.logger = logutil.NewConsoleLogger() // Use the default system logger } return ts.logger } @@ -435,7 +434,7 @@ func (ts *trafficSwitcher) deleteShardRoutingRules(ctx context.Context) error { srr, err := topotools.GetShardRoutingRules(ctx, ts.TopoServer()) if err != nil { if topo.IsErrType(err, topo.NoNode) { - log.Warningf("No shard routing rules found when attempting to delete the ones for the %s keyspace", ts.targetKeyspace) + ts.Logger().Warningf("No shard routing rules found when attempting to delete the ones for the %s keyspace", ts.targetKeyspace) return nil } return err @@ -453,7 +452,7 @@ func (ts *trafficSwitcher) deleteKeyspaceRoutingRules(ctx context.Context) error if !ts.IsMultiTenantMigration() { return nil } - log.Infof("deleteKeyspaceRoutingRules: workflow %s.%s", ts.targetKeyspace, ts.workflow) + ts.Logger().Infof("deleteKeyspaceRoutingRules: workflow %s.%s", ts.targetKeyspace, ts.workflow) reason := fmt.Sprintf("Deleting rules for %s", ts.SourceKeyspaceName()) return topotools.UpdateKeyspaceRoutingRules(ctx, ts.TopoServer(), reason, func(ctx context.Context, rules *map[string]string) error { @@ -582,19 +581,19 @@ func (ts *trafficSwitcher) dropSourceShards(ctx context.Context) error { func (ts *trafficSwitcher) switchShardReads(ctx context.Context, cells []string, servedTypes []topodatapb.TabletType, direction TrafficSwitchDirection) error { cellsStr := strings.Join(cells, ",") - log.Infof("switchShardReads: cells: %s, tablet types: %+v, direction %d", cellsStr, servedTypes, direction) + ts.Logger().Infof("switchShardReads: cells: %s, tablet types: %+v, direction %d", cellsStr, servedTypes, direction) fromShards, toShards := ts.SourceShards(), ts.TargetShards() if err := ts.TopoServer().ValidateSrvKeyspace(ctx, ts.TargetKeyspaceName(), cellsStr); err != nil { err2 := vterrors.Wrapf(err, "Before switching shard reads, found SrvKeyspace for %s is corrupt in cell %s", ts.TargetKeyspaceName(), cellsStr) - log.Errorf("%w", err2) + ts.Logger().Errorf("%w", err2) return err2 } for _, servedType := range servedTypes { - if err := ts.ws.updateShardRecords(ctx, ts.SourceKeyspaceName(), fromShards, cells, servedType, true /* isFrom */, false /* clearSourceShards */, ts.logger); err != nil { + if err := ts.ws.updateShardRecords(ctx, ts.SourceKeyspaceName(), fromShards, cells, servedType, true /* isFrom */, false /* clearSourceShards */, ts.Logger()); err != nil { return err } - if err := ts.ws.updateShardRecords(ctx, ts.SourceKeyspaceName(), toShards, cells, servedType, false, false, ts.logger); err != nil { + if err := ts.ws.updateShardRecords(ctx, ts.SourceKeyspaceName(), toShards, cells, servedType, false, false, ts.Logger()); err != nil { return err } err := ts.TopoServer().MigrateServedType(ctx, ts.SourceKeyspaceName(), toShards, fromShards, servedType, cells) @@ -605,14 +604,14 @@ func (ts *trafficSwitcher) switchShardReads(ctx context.Context, cells []string, if err := ts.TopoServer().ValidateSrvKeyspace(ctx, ts.TargetKeyspaceName(), cellsStr); err != nil { err2 := vterrors.Wrapf(err, "after switching shard reads, found SrvKeyspace for %s is corrupt in cell %s", ts.TargetKeyspaceName(), cellsStr) - log.Errorf("%w", err2) + ts.Logger().Errorf("%w", err2) return err2 } return nil } func (ts *trafficSwitcher) switchTableReads(ctx context.Context, cells []string, servedTypes []topodatapb.TabletType, rebuildSrvVSchema bool, direction TrafficSwitchDirection) error { - log.Infof("switchTableReads: cells: %s, tablet types: %+v, direction: %s", strings.Join(cells, ","), servedTypes, direction) + ts.Logger().Infof("switchTableReads: cells: %s, tablet types: %+v, direction: %s", strings.Join(cells, ","), servedTypes, direction) rules, err := topotools.GetRoutingRules(ctx, ts.TopoServer()) if err != nil { return err @@ -654,7 +653,7 @@ func (ts *trafficSwitcher) startReverseVReplication(ctx context.Context) error { } func (ts *trafficSwitcher) createJournals(ctx context.Context, sourceWorkflows []string) error { - log.Infof("In createJournals for source workflows %+v", sourceWorkflows) + ts.Logger().Infof("In createJournals for source workflows %+v", sourceWorkflows) return ts.ForAllSources(func(source *MigrationSource) error { if source.Journaled { return nil @@ -691,7 +690,7 @@ func (ts *trafficSwitcher) createJournals(ctx context.Context, sourceWorkflows [ }) } - log.Infof("Creating journal %v", journal) + ts.Logger().Infof("Creating journal %v", journal) ts.Logger().Infof("Creating journal: %v", journal) statement := fmt.Sprintf("insert into _vt.resharding_journal "+ "(id, db_name, val) "+ @@ -772,7 +771,7 @@ func (ts *trafficSwitcher) changeWriteRoute(ctx context.Context) error { func (ts *trafficSwitcher) changeShardRouting(ctx context.Context) error { if err := ts.TopoServer().ValidateSrvKeyspace(ctx, ts.TargetKeyspaceName(), ""); err != nil { err2 := vterrors.Wrapf(err, "Before changing shard routes, found SrvKeyspace for %s is corrupt", ts.TargetKeyspaceName()) - log.Errorf("%w", err2) + ts.Logger().Errorf("%w", err2) return err2 } err := ts.ForAllSources(func(source *MigrationSource) error { @@ -801,7 +800,7 @@ func (ts *trafficSwitcher) changeShardRouting(ctx context.Context) error { } if err := ts.TopoServer().ValidateSrvKeyspace(ctx, ts.TargetKeyspaceName(), ""); err != nil { err2 := vterrors.Wrapf(err, "after changing shard routes, found SrvKeyspace for %s is corrupt", ts.TargetKeyspaceName()) - log.Errorf("%w", err2) + ts.Logger().Errorf("%w", err2) return err2 } return nil @@ -921,7 +920,7 @@ func (ts *trafficSwitcher) createReverseVReplication(ctx context.Context) error Filter: filter, }) } - log.Infof("Creating reverse workflow vreplication stream on tablet %s: workflow %s, startPos %s", + ts.Logger().Infof("Creating reverse workflow vreplication stream on tablet %s: workflow %s, startPos %s", source.GetPrimary().GetAlias(), ts.ReverseWorkflowName(), target.Position) _, err = ts.VReplicationExec(ctx, source.GetPrimary().GetAlias(), binlogplayer.CreateVReplicationState(ts.ReverseWorkflowName(), reverseBls, target.Position, @@ -938,7 +937,7 @@ func (ts *trafficSwitcher) createReverseVReplication(ctx context.Context) error updateQuery := ts.getReverseVReplicationUpdateQuery(target.GetPrimary().GetAlias().GetCell(), source.GetPrimary().GetAlias().GetCell(), source.GetPrimary().DbName(), string(optionsJSON)) if updateQuery != "" { - log.Infof("Updating vreplication stream entry on %s with: %s", source.GetPrimary().GetAlias(), updateQuery) + ts.Logger().Infof("Updating vreplication stream entry on %s with: %s", source.GetPrimary().GetAlias(), updateQuery) _, err = ts.VReplicationExec(ctx, source.GetPrimary().GetAlias(), updateQuery) return err } @@ -988,12 +987,12 @@ func (ts *trafficSwitcher) waitForCatchup(ctx context.Context, filteredReplicati if err := ts.TabletManagerClient().VReplicationWaitForPos(ctx, target.GetPrimary().Tablet, uid, source.Position); err != nil { return err } - log.Infof("After catchup: target keyspace:shard: %v:%v, source position %v, uid %d", + ts.Logger().Infof("After catchup: target keyspace:shard: %v:%v, source position %v, uid %d", ts.TargetKeyspaceName(), target.GetShard().ShardName(), source.Position, uid) ts.Logger().Infof("After catchup: position for keyspace:shard: %v:%v reached, uid %d", ts.TargetKeyspaceName(), target.GetShard().ShardName(), uid) if _, err := ts.TabletManagerClient().VReplicationExec(ctx, target.GetPrimary().Tablet, binlogplayer.StopVReplication(uid, "stopped for cutover")); err != nil { - log.Infof("Error marking stopped for cutover on %s, uid %d", topoproto.TabletAliasString(target.GetPrimary().GetAlias()), uid) + ts.Logger().Infof("Error marking stopped for cutover on %s, uid %d", topoproto.TabletAliasString(target.GetPrimary().GetAlias()), uid) return err } return nil @@ -1017,16 +1016,16 @@ func (ts *trafficSwitcher) stopSourceWrites(ctx context.Context) error { err = ts.changeShardsAccess(ctx, ts.SourceKeyspaceName(), ts.SourceShards(), disallowWrites) } if err != nil { - log.Warningf("Error: %s", err) + ts.Logger().Warningf("Error: %s", err) return err } return ts.ForAllSources(func(source *MigrationSource) error { var err error source.Position, err = ts.TabletManagerClient().PrimaryPosition(ctx, source.GetPrimary().Tablet) - log.Infof("Stopped Source Writes. Position for source %v:%v: %v", + ts.Logger().Infof("Stopped Source Writes. Position for source %v:%v: %v", ts.SourceKeyspaceName(), source.GetShard().ShardName(), source.Position) if err != nil { - log.Warningf("Error: %s", err) + ts.Logger().Warningf("Error: %s", err) } return err }) @@ -1075,7 +1074,7 @@ func (ts *trafficSwitcher) switchDeniedTables(ctx context.Context) error { }) }) if err := egrp.Wait(); err != nil { - log.Warningf("Error in switchDeniedTables: %s", err) + ts.Logger().Warningf("Error in switchDeniedTables: %s", err) return err } @@ -1157,7 +1156,7 @@ func (ts *trafficSwitcher) dropSourceReverseVReplicationStreams(ctx context.Cont func (ts *trafficSwitcher) removeTargetTables(ctx context.Context) error { err := ts.ForAllTargets(func(target *MigrationTarget) error { - log.Infof("ForAllTargets: %+v", target) + ts.Logger().Infof("ForAllTargets: %+v", target) for _, tableName := range ts.Tables() { primaryDbName, err := sqlescape.EnsureEscaped(target.GetPrimary().DbName()) if err != nil { @@ -1176,7 +1175,7 @@ func (ts *trafficSwitcher) removeTargetTables(ctx context.Context) error { ReloadSchema: true, DisableForeignKeyChecks: true, }) - log.Infof("Removed target table with result: %+v", res) + ts.Logger().Infof("Removed target table with result: %+v", res) if err != nil { if IsTableDidNotExistError(err) { // The table was already gone, so we can ignore the error. diff --git a/go/vt/vtctl/workflow/utils.go b/go/vt/vtctl/workflow/utils.go index fde7b36da6e..1e317b5c69a 100644 --- a/go/vt/vtctl/workflow/utils.go +++ b/go/vt/vtctl/workflow/utils.go @@ -545,7 +545,7 @@ func doValidateWorkflowHasCompleted(ctx context.Context, ts *trafficSwitcher) er _ = ts.ForAllSources(func(source *MigrationSource) error { wg.Add(1) if source.GetShard().IsPrimaryServing { - rec.RecordError(fmt.Errorf(fmt.Sprintf("Shard %s is still serving", source.GetShard().ShardName()))) + rec.RecordError(fmt.Errorf("shard %s is still serving", source.GetShard().ShardName())) } wg.Done() return nil @@ -964,7 +964,7 @@ func IsTableDidNotExistError(err error) bool { // defaultErrorHandler provides a way to consistently handle errors by logging and // returning them. func defaultErrorHandler(logger logutil.Logger, message string, err error) (*[]string, error) { - werr := vterrors.Wrapf(err, message) + werr := vterrors.Wrap(err, message) logger.Error(werr) return nil, werr }