Skip to content

Commit

Permalink
VReplication: Fix workflow filtering in GetWorkflows RPC (#15524)
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord authored Mar 21, 2024
1 parent 943b07c commit c830723
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package vreplication

import (
"encoding/json"
"fmt"
"slices"
"strings"
Expand All @@ -27,6 +28,7 @@ import (
"google.golang.org/protobuf/encoding/protojson"

"vitess.io/vitess/go/test/endtoend/cluster"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
Expand Down Expand Up @@ -59,6 +61,9 @@ func TestVtctldclientCLI(t *testing.T) {
workflowName := "wf1"
targetTabs := setupMinimalCustomerKeyspace(t)

t.Run("WorkflowList", func(t *testing.T) {
testWorkflowList(t, sourceKeyspaceName, targetKeyspaceName)
})
t.Run("MoveTablesCreateFlags1", func(t *testing.T) {
testMoveTablesFlags1(t, &mt, sourceKeyspaceName, targetKeyspaceName, workflowName, targetTabs)
})
Expand Down Expand Up @@ -175,6 +180,32 @@ func testMoveTablesFlags3(t *testing.T, sourceKeyspace, targetKeyspace string, t
require.False(t, checkTablesExist(t, "zone1-100", []string{"customer2"}))
}

// Create two workflows in order to confirm that listing all workflows works.
func testWorkflowList(t *testing.T, sourceKeyspace, targetKeyspace string) {
createFlags := []string{"--auto-start=false", "--tablet-types",
"primary,rdonly", "--tablet-types-in-preference-order=true", "--all-cells",
}
wfNames := []string{"list1", "list2"}
tables := []string{"customer", "customer2"}
for i := range wfNames {
mt := createMoveTables(t, sourceKeyspace, targetKeyspace, wfNames[i], tables[i], createFlags, nil, nil)
defer mt.Cancel()
}
slices.Sort(wfNames)

workflowNames := workflowList(targetKeyspace)
slices.Sort(workflowNames)
require.EqualValues(t, wfNames, workflowNames)

workflows := getWorkflows(targetKeyspace)
workflowNames = make([]string, len(workflows.Workflows))
for i := range workflows.Workflows {
workflowNames[i] = workflows.Workflows[i].Name
}
slices.Sort(workflowNames)
require.EqualValues(t, wfNames, workflowNames)
}

func createMoveTables(t *testing.T, sourceKeyspace, targetKeyspace, workflowName, tables string,
createFlags, completeFlags, switchFlags []string) iMoveTables {
mt := newMoveTables(vc, &moveTablesWorkflow{
Expand Down Expand Up @@ -322,6 +353,24 @@ func getWorkflow(targetKeyspace, workflow string) *vtctldatapb.GetWorkflowsRespo
return workflowResponse.CloneVT()
}

func getWorkflows(targetKeyspace string) *vtctldatapb.GetWorkflowsResponse {
getWorkflowsOutput, err := vc.VtctldClient.ExecuteCommandWithOutput("GetWorkflows", targetKeyspace, "--show-all", "--compact", "--include-logs=false")
require.NoError(vc.t, err)
var getWorkflowsResponse vtctldatapb.GetWorkflowsResponse
err = protojson.Unmarshal([]byte(getWorkflowsOutput), &getWorkflowsResponse)
require.NoError(vc.t, err)
return getWorkflowsResponse.CloneVT()
}

func workflowList(targetKeyspace string) []string {
workflowListOutput, err := vc.VtctldClient.ExecuteCommandWithOutput("Workflow", "--keyspace", targetKeyspace, "list")
require.NoError(vc.t, err)
var workflowList []string
err = json.Unmarshal([]byte(workflowListOutput), &workflowList)
require.NoError(vc.t, err)
return workflowList
}

func checkTablesExist(t *testing.T, tabletAlias string, tables []string) bool {
tablesResponse, err := vc.VtctldClient.ExecuteCommandWithOutput("GetSchema", tabletAlias, "--tables", strings.Join(tables, ","), "--table-names-only")
require.NoError(t, err)
Expand Down
5 changes: 3 additions & 2 deletions go/vt/vtctl/workflow/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,8 +383,9 @@ func (s *Server) GetWorkflows(ctx context.Context, req *vtctldatapb.GetWorkflows
span.Annotate("include_logs", req.IncludeLogs)
span.Annotate("shards", req.Shards)

readReq := &tabletmanagerdatapb.ReadVReplicationWorkflowsRequest{
IncludeWorkflows: []string{req.Workflow},
readReq := &tabletmanagerdatapb.ReadVReplicationWorkflowsRequest{}
if req.Workflow != "" {
readReq.IncludeWorkflows = []string{req.Workflow}
}
if req.ActiveOnly {
readReq.ExcludeStates = []binlogdatapb.VReplicationWorkflowState{binlogdatapb.VReplicationWorkflowState_Stopped}
Expand Down

0 comments on commit c830723

Please sign in to comment.