Skip to content

Commit

Permalink
[e2e] vtctld init tablet and some output-based commands (#15297)
Browse files Browse the repository at this point in the history
Signed-off-by: Andrew Mason <[email protected]>
  • Loading branch information
Andrew Mason authored Mar 6, 2024
1 parent 6db8860 commit c75b784
Show file tree
Hide file tree
Showing 15 changed files with 227 additions and 149 deletions.
4 changes: 2 additions & 2 deletions go/test/endtoend/backup/vtbackup/backup_only_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func firstBackupTest(t *testing.T, tabletType string) {
mysqlctl.CompressionEngineName = "lz4"
defer func() { mysqlctl.CompressionEngineName = "pgzip" }()
// now bring up the other replica, letting it restore from backup.
err = localCluster.VtctlclientProcess.InitTablet(replica2, cell, keyspaceName, hostname, shardName)
err = localCluster.InitTablet(replica2, keyspaceName, shardName)
require.Nil(t, err)
restore(t, replica2, "replica", "SERVING")
// Replica2 takes time to serve. Sleeping for 5 sec.
Expand Down Expand Up @@ -266,7 +266,7 @@ func removeBackups(t *testing.T) {
func initTablets(t *testing.T, startTablet bool, initShardPrimary bool) {
// Initialize tablets
for _, tablet := range []cluster.Vttablet{*primary, *replica1} {
err := localCluster.VtctlclientProcess.InitTablet(&tablet, cell, keyspaceName, hostname, shardName)
err := localCluster.InitTablet(&tablet, keyspaceName, shardName)
require.Nil(t, err)

if startTablet {
Expand Down
10 changes: 5 additions & 5 deletions go/test/endtoend/backup/vtctlbackup/backup_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,13 +228,13 @@ func LaunchCluster(setupType int, streamMode string, stripes int, cDetails *Comp
replica2 = shard.Vttablets[2]
replica3 = shard.Vttablets[3]

if err := localCluster.VtctlclientProcess.InitTablet(primary, cell, keyspaceName, hostname, shard.Name); err != nil {
if err := localCluster.InitTablet(primary, keyspaceName, shard.Name); err != nil {
return 1, err
}
if err := localCluster.VtctlclientProcess.InitTablet(replica1, cell, keyspaceName, hostname, shard.Name); err != nil {
if err := localCluster.InitTablet(replica1, keyspaceName, shard.Name); err != nil {
return 1, err
}
if err := localCluster.VtctlclientProcess.InitTablet(replica2, cell, keyspaceName, hostname, shard.Name); err != nil {
if err := localCluster.InitTablet(replica2, keyspaceName, shard.Name); err != nil {
return 1, err
}
vtctldClientProcess := cluster.VtctldClientProcessInstance("localhost", localCluster.VtctldProcess.GrpcPort, localCluster.TmpDirectory)
Expand Down Expand Up @@ -449,7 +449,7 @@ func primaryBackup(t *testing.T) {
}()
verifyInitialReplication(t)

output, err := localCluster.VtctlclientProcess.ExecuteCommandWithOutput("Backup", primary.Alias)
output, err := localCluster.VtctldClientProcess.ExecuteCommandWithOutput("Backup", primary.Alias)
require.Error(t, err)
assert.Contains(t, output, "type PRIMARY cannot take backup. if you really need to do this, rerun the backup command with --allow_primary")

Expand Down Expand Up @@ -746,7 +746,7 @@ func restartPrimaryAndReplica(t *testing.T) {
proc.Wait()
}
for _, tablet := range []*cluster.Vttablet{primary, replica1} {
err := localCluster.VtctlclientProcess.InitTablet(tablet, cell, keyspaceName, hostname, shardName)
err := localCluster.InitTablet(tablet, keyspaceName, shardName)
require.Nil(t, err)
err = tablet.VttabletProcess.Setup()
require.Nil(t, err)
Expand Down
64 changes: 41 additions & 23 deletions go/test/endtoend/cluster/cluster_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ import (
"time"

"vitess.io/vitess/go/constants/sidecar"
"vitess.io/vitess/go/json2"
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/syscallutil"
Expand Down Expand Up @@ -319,6 +318,44 @@ func (cluster *LocalProcessCluster) StartKeyspace(keyspace Keyspace, shardNames
return nil
}

// InitTablet initializes a tablet record in the topo server. It does not start the tablet process.
func (cluster *LocalProcessCluster) InitTablet(tablet *Vttablet, keyspace string, shard string) error {
tabletpb := &topodatapb.Tablet{
Alias: &topodatapb.TabletAlias{
Cell: tablet.Cell,
Uid: uint32(tablet.TabletUID),
},
Hostname: cluster.Hostname,
Type: topodatapb.TabletType_REPLICA,
PortMap: map[string]int32{
"vt": int32(tablet.HTTPPort),
},
Keyspace: keyspace,
Shard: shard,
}

switch tablet.Type {
case "rdonly":
tabletpb.Type = topodatapb.TabletType_RDONLY
case "primary":
tabletpb.Type = topodatapb.TabletType_PRIMARY
}

if tablet.MySQLPort > 0 {
tabletpb.PortMap["mysql"] = int32(tablet.MySQLPort)
}

if tablet.GrpcPort > 0 {
tabletpb.PortMap["grpc"] = int32(tablet.GrpcPort)
}

allowPrimaryOverride := false
createShardAndKeyspace := true
allowUpdate := true

return cluster.TopoProcess.Server.InitTablet(context.Background(), tabletpb, allowPrimaryOverride, createShardAndKeyspace, allowUpdate)
}

// StartKeyspace starts required number of shard and the corresponding tablets
// keyspace : struct containing keyspace name, Sqlschema to apply, VSchema to apply
// shardName : list of shard names
Expand Down Expand Up @@ -856,7 +893,7 @@ func (cluster *LocalProcessCluster) ExecOnTablet(ctx context.Context, vttablet *
return nil, err
}

tablet, err := cluster.VtctlclientGetTablet(vttablet)
tablet, err := cluster.VtctldClientProcess.GetTablet(vttablet.Alias)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -899,7 +936,7 @@ func (cluster *LocalProcessCluster) ExecOnVTGate(ctx context.Context, addr strin
// returns the responses. It returns an error if the stream ends with fewer than
// `count` responses.
func (cluster *LocalProcessCluster) StreamTabletHealth(ctx context.Context, vttablet *Vttablet, count int) (responses []*querypb.StreamHealthResponse, err error) {
tablet, err := cluster.VtctlclientGetTablet(vttablet)
tablet, err := cluster.VtctldClientProcess.GetTablet(vttablet.Alias)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -934,7 +971,7 @@ func (cluster *LocalProcessCluster) StreamTabletHealth(ctx context.Context, vtta
// StreamTabletHealthUntil invokes a HealthStream on a local cluster Vttablet and
// returns the responses. It waits until a certain condition is met. The amount of time to wait is an input that it takes.
func (cluster *LocalProcessCluster) StreamTabletHealthUntil(ctx context.Context, vttablet *Vttablet, timeout time.Duration, condition func(shr *querypb.StreamHealthResponse) bool) error {
tablet, err := cluster.VtctlclientGetTablet(vttablet)
tablet, err := cluster.VtctldClientProcess.GetTablet(vttablet.Alias)
if err != nil {
return err
}
Expand Down Expand Up @@ -971,25 +1008,6 @@ func (cluster *LocalProcessCluster) StreamTabletHealthUntil(ctx context.Context,
return err
}

func (cluster *LocalProcessCluster) VtctlclientGetTablet(tablet *Vttablet) (*topodatapb.Tablet, error) {
result, err := cluster.VtctlclientProcess.ExecuteCommandWithOutput("GetTablet", "--", tablet.Alias)
if err != nil {
return nil, err
}

var ti topodatapb.Tablet
if err := json2.Unmarshal([]byte(result), &ti); err != nil {
return nil, err
}

return &ti, nil
}

func (cluster *LocalProcessCluster) VtctlclientChangeTabletType(tablet *Vttablet, tabletType topodatapb.TabletType) error {
_, err := cluster.VtctlclientProcess.ExecuteCommandWithOutput("ChangeTabletType", "--", tablet.Alias, tabletType.String())
return err
}

// Teardown brings down the cluster by invoking teardown for individual processes
func (cluster *LocalProcessCluster) Teardown() {
PanicHandler(nil)
Expand Down
18 changes: 5 additions & 13 deletions go/test/endtoend/cluster/cluster_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"vitess.io/vitess/go/json2"
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/vt/vtgate/vtgateconn"

Expand Down Expand Up @@ -360,7 +359,11 @@ func GetPasswordUpdateSQL(localCluster *LocalProcessCluster) string {
// CheckSrvKeyspace confirms that the cell and keyspace contain the expected
// shard mappings.
func CheckSrvKeyspace(t *testing.T, cell string, ksname string, expectedPartition map[topodatapb.TabletType][]string, ci LocalProcessCluster) {
srvKeyspace := GetSrvKeyspace(t, cell, ksname, ci)
srvKeyspaces, err := ci.VtctldClientProcess.GetSrvKeyspaces(ksname, cell)
require.NoError(t, err)

srvKeyspace := srvKeyspaces[cell]
require.NotNil(t, srvKeyspace, "srvKeyspace is nil for %s", cell)

currentPartition := map[topodatapb.TabletType][]string{}

Expand All @@ -374,17 +377,6 @@ func CheckSrvKeyspace(t *testing.T, cell string, ksname string, expectedPartitio
assert.True(t, reflect.DeepEqual(currentPartition, expectedPartition))
}

// GetSrvKeyspace returns the SrvKeyspace structure for the cell and keyspace.
func GetSrvKeyspace(t *testing.T, cell string, ksname string, ci LocalProcessCluster) *topodatapb.SrvKeyspace {
output, err := ci.VtctlclientProcess.ExecuteCommandWithOutput("GetSrvKeyspace", cell, ksname)
require.Nil(t, err)
var srvKeyspace topodatapb.SrvKeyspace

err = json2.Unmarshal([]byte(output), &srvKeyspace)
require.Nil(t, err)
return &srvKeyspace
}

// ExecuteOnTablet executes a query on the specified vttablet.
// It should always be called with a primary tablet for a keyspace/shard.
func ExecuteOnTablet(t *testing.T, query string, vttablet Vttablet, ks string, expectFail bool) {
Expand Down
34 changes: 31 additions & 3 deletions go/test/endtoend/cluster/topo_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ import (

"vitess.io/vitess/go/vt/log"
vtopo "vitess.io/vitess/go/vt/topo"

// Register topo server implementations
_ "vitess.io/vitess/go/vt/topo/consultopo"
_ "vitess.io/vitess/go/vt/topo/etcd2topo"
_ "vitess.io/vitess/go/vt/topo/zk2topo"
)

// TopoProcess is a generic handle for a running Topo service .
Expand All @@ -51,6 +56,7 @@ type TopoProcess struct {
PeerURL string
ZKPorts string
Client interface{}
Server *vtopo.Server

proc *exec.Cmd
exit chan error
Expand All @@ -60,15 +66,22 @@ type TopoProcess struct {
func (topo *TopoProcess) Setup(topoFlavor string, cluster *LocalProcessCluster) (err error) {
switch topoFlavor {
case "zk2":
return topo.SetupZookeeper(cluster)
err = topo.SetupZookeeper(cluster)
case "consul":
return topo.SetupConsul(cluster)
err = topo.SetupConsul(cluster)
default:
// Override any inherited ETCDCTL_API env value to
// ensure that we use the v3 API and storage.
os.Setenv("ETCDCTL_API", "3")
return topo.SetupEtcd()
err = topo.SetupEtcd()
}

if err != nil {
return
}

topo.Server, err = vtopo.OpenServer(topoFlavor, net.JoinHostPort(topo.Host, fmt.Sprintf("%d", topo.Port)), TopoGlobalRoot(topoFlavor))
return
}

// SetupEtcd spawns a new etcd service and initializes it with the defaults.
Expand Down Expand Up @@ -289,6 +302,11 @@ func (topo *TopoProcess) SetupConsul(cluster *LocalProcessCluster) (err error) {

// TearDown shutdowns the running topo service.
func (topo *TopoProcess) TearDown(Cell string, originalVtRoot string, currentRoot string, keepdata bool, topoFlavor string) error {
if topo.Server != nil {
topo.Server.Close()
topo.Server = nil
}

if topo.Client != nil {
switch cli := topo.Client.(type) {
case *clientv3.Client:
Expand Down Expand Up @@ -437,3 +455,13 @@ func TopoProcessInstance(port int, peerPort int, hostname string, flavor string,
topo.PeerURL = fmt.Sprintf("http://%s:%d", hostname, peerPort)
return topo
}

// TopoGlobalRoot returns the global root for the given topo flavor.
func TopoGlobalRoot(flavor string) string {
switch flavor {
case "consul":
return "global"
default:
return "/vitess/global"
}
}
4 changes: 1 addition & 3 deletions go/test/endtoend/cluster/vtctl_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,6 @@ func VtctlProcessInstance(topoPort int, hostname string) *VtctlProcess {

// Default values for etcd2 topo server.
topoImplementation := "etcd2"
topoGlobalRoot := "/vitess/global"
topoRootPath := "/"

// Checking and resetting the parameters for required topo server.
Expand All @@ -127,7 +126,6 @@ func VtctlProcessInstance(topoPort int, hostname string) *VtctlProcess {
topoImplementation = "zk2"
case "consul":
topoImplementation = "consul"
topoGlobalRoot = "global"
// For consul we do not need "/" in the path
topoRootPath = ""
}
Expand All @@ -142,7 +140,7 @@ func VtctlProcessInstance(topoPort int, hostname string) *VtctlProcess {
Binary: "vtctl",
TopoImplementation: topoImplementation,
TopoGlobalAddress: fmt.Sprintf("%s:%d", hostname, topoPort),
TopoGlobalRoot: topoGlobalRoot,
TopoGlobalRoot: TopoGlobalRoot(*topoFlavor),
TopoServerAddress: fmt.Sprintf("%s:%d", hostname, topoPort),
TopoRootPath: topoRootPath,
VtctlMajorVersion: version,
Expand Down
22 changes: 22 additions & 0 deletions go/test/endtoend/cluster/vtctldclient_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,28 @@ func (vtctldclient *VtctldClientProcess) ApplyVSchema(keyspace string, json stri
)
}

// ChangeTabletType changes the type of the given tablet.
func (vtctldclient *VtctldClientProcess) ChangeTabletType(tablet *Vttablet, tabletType topodatapb.TabletType) error {
return vtctldclient.ExecuteCommand(
"ChangeTabletType",
tablet.Alias,
tabletType.String(),
)
}

// GetShardReplication returns a mapping of cell to shard replication for the given keyspace and shard.
func (vtctldclient *VtctldClientProcess) GetShardReplication(keyspace string, shard string, cells ...string) (map[string]*topodatapb.ShardReplication, error) {
args := append([]string{"GetShardReplication", keyspace + "/" + shard}, cells...)
out, err := vtctldclient.ExecuteCommandWithOutput(args...)
if err != nil {
return nil, err
}

var resp vtctldatapb.GetShardReplicationResponse
err = json2.Unmarshal([]byte(out), &resp)
return resp.ShardReplicationByCell, err
}

// GetSrvKeyspaces returns a mapping of cell to srv keyspace for the given keyspace.
func (vtctldclient *VtctldClientProcess) GetSrvKeyspaces(keyspace string, cells ...string) (ksMap map[string]*topodatapb.SrvKeyspace, err error) {
args := append([]string{"GetSrvKeyspaces", keyspace}, cells...)
Expand Down
16 changes: 9 additions & 7 deletions go/test/endtoend/clustertest/vtctld_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func testTopoDataAPI(t *testing.T, url string) {

func testListAllTablets(t *testing.T) {
// first w/o any filters, aside from cell
result, err := clusterInstance.VtctlclientProcess.ExecuteCommandWithOutput("ListAllTablets", clusterInstance.Cell)
result, err := clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("GetTablets", "--cell", clusterInstance.Cell)
require.NoError(t, err)

tablets := getAllTablets()
Expand All @@ -102,10 +102,12 @@ func testListAllTablets(t *testing.T) {

// now filtering with the first keyspace and tablet type of primary, in
// addition to the cell
result, err = clusterInstance.VtctlclientProcess.ExecuteCommandWithOutput(
"ListAllTablets", "--", "--keyspace", clusterInstance.Keyspaces[0].Name,
"--tablet_type", "primary",
clusterInstance.Cell)
result, err = clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput(
"GetTablets",
"--keyspace", clusterInstance.Keyspaces[0].Name,
"--tablet-type", "primary",
"--cell", clusterInstance.Cell,
)
require.NoError(t, err)

// We should only return a single primary tablet per shard in the first keyspace
Expand Down Expand Up @@ -164,7 +166,7 @@ func testExecuteAsDba(t *testing.T) {
}
for _, tcase := range tcases {
t.Run(tcase.query, func(t *testing.T) {
result, err := clusterInstance.VtctlclientProcess.ExecuteCommandWithOutput("ExecuteFetchAsDba", clusterInstance.Keyspaces[0].Shards[0].Vttablets[0].Alias, tcase.query)
result, err := clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("ExecuteFetchAsDBA", clusterInstance.Keyspaces[0].Shards[0].Vttablets[0].Alias, tcase.query)
if tcase.expectErr {
assert.Error(t, err)
} else {
Expand All @@ -176,7 +178,7 @@ func testExecuteAsDba(t *testing.T) {
}

func testExecuteAsApp(t *testing.T) {
result, err := clusterInstance.VtctlclientProcess.ExecuteCommandWithOutput("ExecuteFetchAsApp", clusterInstance.Keyspaces[0].Shards[0].Vttablets[0].Alias, `SELECT 1 AS a`)
result, err := clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("ExecuteFetchAsApp", clusterInstance.Keyspaces[0].Shards[0].Vttablets[0].Alias, `SELECT 1 AS a`)
require.NoError(t, err)
assert.Equal(t, result, oneTableOutput)
}
Expand Down
Loading

0 comments on commit c75b784

Please sign in to comment.