Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Experimental: Add missing tables to globally routed list in schema tracker only if they are not already present in a VSchema #17371

Draft
wants to merge 8 commits into
base: main
Choose a base branch
from
1 change: 1 addition & 0 deletions go/flags/endtoend/vtcombo.txt
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ Flags:
--log_rotate_max_size uint size in bytes at which logs are rotated (glog.MaxSize) (default 1887436800)
--logtostderr log to standard error instead of files
--manifest-external-decompressor string command with arguments to store in the backup manifest when compressing a backup with an external compression engine.
--mark_unique_unsharded_tables_as_global Mark unique unsharded tables as global tables (default true)
--max-stack-size int configure the maximum stack size in bytes (default 67108864)
--max_concurrent_online_ddl int Maximum number of online DDL changes that may run concurrently (default 256)
--max_memory_rows int Maximum number of rows that will be held in memory for intermediate results as well as the final result. (default 300000)
Expand Down
1 change: 1 addition & 0 deletions go/flags/endtoend/vtgate.txt
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ Flags:
--log_queries_to_file string Enable query logging to the specified file
--log_rotate_max_size uint size in bytes at which logs are rotated (glog.MaxSize) (default 1887436800)
--logtostderr log to standard error instead of files
--mark_unique_unsharded_tables_as_global Mark unique unsharded tables as global tables (default true)
--max-stack-size int configure the maximum stack size in bytes (default 67108864)
--max_memory_rows int Maximum number of rows that will be held in memory for intermediate results as well as the final result. (default 300000)
--max_payload_size int The threshold for query payloads in bytes. A payload greater than this threshold will result in a failure to handle the query.
Expand Down
319 changes: 319 additions & 0 deletions go/test/endtoend/vreplication/global_routing_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,319 @@
/*
Copyright 2024 The Vitess Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package vreplication

import (
"bytes"
"fmt"
"strings"
"testing"
"text/template"
"time"

"github.com/stretchr/testify/require"

"vitess.io/vitess/go/vt/log"
vttablet "vitess.io/vitess/go/vt/vttablet/common"
)

type tgrTestConfig struct {
ksU1, ksU2, ksS1 string
ksU1Tables, ksU2Tables, ksS1Tables []string
}

var grTestConfig tgrTestConfig = tgrTestConfig{
ksU1: "unsharded1",
ksU2: "unsharded2",
ksS1: "sharded1",
ksU1Tables: []string{"t1", "t2", "t3"},
ksU2Tables: []string{"t2", "t4", "t5"},
ksS1Tables: []string{"t2", "t4", "t6"},
}

type grTestExpectations struct {
postKsU1, postKsU2, postKsS1 func(t *testing.T)
}

type grTestCase struct {
markAsGlobal bool
unshardedHasVSchema bool
}

type grHelpers struct {
t *testing.T
}

func (h *grHelpers) getSchema(tables []string) string {
var createSQL string
for _, table := range tables {
createSQL += "CREATE TABLE " + table + " (id int primary key, val varchar(32)) ENGINE=InnoDB;\n"
}
return createSQL
}

func (h *grHelpers) getShardedVSchema(tables []string) string {
const vSchemaTmpl = `{
"sharded": true,
"vindexes": {
"reverse_bits": {
"type": "reverse_bits"
}
},
"tables": {
{{- range $i, $table := .Tables}}
{{- if gt $i 0}},{{end}}
"{{ $table }}": {
"column_vindexes": [
{
"column": "id",
"name": "reverse_bits"
}
]
}
{{- end}}
}
}
`
type VSchemaData struct {
Tables []string
}
tmpl, err := template.New("vschema").Parse(vSchemaTmpl)
require.NoError(h.t, err)
var buf bytes.Buffer
err = tmpl.Execute(&buf, VSchemaData{tables})
require.NoError(h.t, err)
return buf.String()
}

func (h *grHelpers) insertData(t *testing.T, keyspace string, table string, id int, val string) {
vtgateConn, cancel := getVTGateConn()
defer cancel()
_, err := vtgateConn.ExecuteFetch(fmt.Sprintf("insert into %s.%s(id, val) values(%d, '%s')",
keyspace, table, id, val), 1, false)
require.NoError(t, err)
}

func (h *grHelpers) isGlobal(t *testing.T, tables []string, expectedVal string) bool {
vtgateConn, cancel := getVTGateConn()
defer cancel()
var err error
asExpected := true
for _, table := range tables {
for _, target := range []string{"", "@primary", "@replica"} {
_, err = vtgateConn.ExecuteFetch(fmt.Sprintf("use %s", target), 1, false)
require.NoError(t, err)
rs, err := vtgateConn.ExecuteFetch(fmt.Sprintf("select * from %s", table), 1, false)
require.NoError(t, err)
gotVal := rs.Rows[0][1].ToString()
if gotVal != expectedVal {
asExpected = false
}
}
}
return asExpected
}

func (h *grHelpers) isNotGlobal(t *testing.T, tables []string) bool {
vtgateConn, cancel := getVTGateConn()
defer cancel()
var err error
asExpected := true
for _, table := range tables {
for _, target := range []string{"", "@primary", "@replica"} {
_, err = vtgateConn.ExecuteFetch(fmt.Sprintf("use %s", target), 1, false)
require.NoError(t, err)
_, err := vtgateConn.ExecuteFetch(fmt.Sprintf("select * from %s", table), 1, false)
log.Infof("Got error %v, for table %s.%s", err, table, target)
if err == nil || !strings.Contains(err.Error(), fmt.Sprintf("table %s not found", table)) {
asExpected = false
}
}
}
return asExpected
}

func (h *grHelpers) isAmbiguous(t *testing.T, tables []string) bool {
vtgateConn, cancel := getVTGateConn()
defer cancel()
var err error
asExpected := true
for _, table := range tables {
for _, target := range []string{"", "@primary", "@replica"} {
_, err = vtgateConn.ExecuteFetch(fmt.Sprintf("use %s", target), 1, false)
require.NoError(t, err)
_, err := vtgateConn.ExecuteFetch(fmt.Sprintf("select * from %s", table), 1, false)
if err == nil || !strings.Contains(err.Error(), "ambiguous") {
asExpected = false
}
}
}
return asExpected
}

func (h *grHelpers) getExpectations() *map[grTestCase]*grTestExpectations {
var exp = make(map[grTestCase]*grTestExpectations)
exp[grTestCase{unshardedHasVSchema: false, markAsGlobal: false}] = &grTestExpectations{
postKsU1: func(t *testing.T) {
require.True(t, h.isGlobal(t, []string{"t1", "t2", "t3"}, grTestConfig.ksU1))
},
postKsU2: func(t *testing.T) {
require.True(t, h.isNotGlobal(t, []string{"t1", "t2", "t3"}))
require.True(t, h.isNotGlobal(t, []string{"t4", "t5"}))
},
postKsS1: func(t *testing.T) {
require.True(t, h.isGlobal(t, []string{"t2", "t4"}, grTestConfig.ksS1))
require.True(t, h.isNotGlobal(t, []string{"t1", "t3"}))
require.True(t, h.isNotGlobal(t, []string{"t5"}))
require.True(t, h.isGlobal(t, []string{"t6"}, grTestConfig.ksS1))
},
}
exp[grTestCase{unshardedHasVSchema: false, markAsGlobal: true}] = &grTestExpectations{
postKsU1: func(t *testing.T) {
require.True(t, h.isGlobal(t, []string{"t1", "t2", "t3"}, grTestConfig.ksU1))
},
postKsU2: func(t *testing.T) {
require.True(t, h.isGlobal(t, []string{"t1", "t3"}, grTestConfig.ksU1))
require.True(t, h.isGlobal(t, []string{"t4", "t5"}, grTestConfig.ksU2))
require.True(t, h.isAmbiguous(t, []string{"t2"}))
},
postKsS1: func(t *testing.T) {
require.True(t, h.isGlobal(t, []string{"t2", "t4"}, grTestConfig.ksS1))
require.True(t, h.isGlobal(t, []string{"t1", "t3"}, grTestConfig.ksU1))
require.True(t, h.isGlobal(t, []string{"t5"}, grTestConfig.ksU2))
require.True(t, h.isGlobal(t, []string{"t6"}, grTestConfig.ksS1))
},
}
exp[grTestCase{unshardedHasVSchema: true, markAsGlobal: false}] = &grTestExpectations{
postKsU1: func(t *testing.T) {
require.True(t, h.isGlobal(t, []string{"t1", "t2", "t3"}, grTestConfig.ksU1))
},
postKsU2: func(t *testing.T) {
require.True(t, h.isGlobal(t, []string{"t1", "t3"}, grTestConfig.ksU1))
require.True(t, h.isGlobal(t, []string{"t4", "t5"}, grTestConfig.ksU2))
require.True(t, h.isAmbiguous(t, []string{"t2"}))
},
postKsS1: func(t *testing.T) {
require.True(t, h.isAmbiguous(t, []string{"t2", "t4"}))
require.True(t, h.isGlobal(t, []string{"t1", "t3"}, grTestConfig.ksU1))
require.True(t, h.isGlobal(t, []string{"t5"}, grTestConfig.ksU2))
},
}
exp[grTestCase{unshardedHasVSchema: true, markAsGlobal: true}] =
exp[grTestCase{unshardedHasVSchema: true, markAsGlobal: false}]
return &exp

}

func (h *grHelpers) getUnshardedVschema(unshardedHasVSchema bool, tables []string) string {
if !unshardedHasVSchema {
return ""
}
vschema := `{"tables": {`
for i, table := range tables {
if i != 0 {
vschema += `,`
}
vschema += fmt.Sprintf(`"%s": {}`, table)
}
vschema += `}}`
return vschema
}

func (h *grHelpers) rebuildGraphs(t *testing.T, keyspaces []string) {
var err error
for _, ks := range keyspaces {
err = vc.VtctldClient.ExecuteCommand("RebuildKeyspaceGraph", ks)
require.NoError(t, err)
}
require.NoError(t, err)
err = vc.VtctldClient.ExecuteCommand("RebuildVSchemaGraph")
require.NoError(t, err)

}

func TestGlobalRouting(t *testing.T) {
h := grHelpers{t}
exp := *h.getExpectations()
testCases := []grTestCase{
{unshardedHasVSchema: false, markAsGlobal: true},
{unshardedHasVSchema: false, markAsGlobal: false},
{unshardedHasVSchema: true, markAsGlobal: true},
{unshardedHasVSchema: true, markAsGlobal: false},
}
for _, tc := range testCases {
funcs := exp[tc]
require.NotNil(t, funcs)
testGlobalRouting(t, tc.markAsGlobal, tc.unshardedHasVSchema, funcs)
}
}

func testGlobalRouting(t *testing.T, markAsGlobal, unshardedHasVSchema bool, funcs *grTestExpectations) {
h := grHelpers{t: t}
setSidecarDBName("_vt")
vttablet.InitVReplicationConfigDefaults()
extraVTGateArgs = append(extraVTGateArgs, fmt.Sprintf("--mark_unique_unsharded_tables_as_global=%t", markAsGlobal))

vc = NewVitessCluster(t, nil)
defer vc.TearDown()
zone1 := vc.Cells["zone1"]
config := grTestConfig
vc.AddKeyspace(t, []*Cell{zone1}, config.ksU1, "0", h.getUnshardedVschema(unshardedHasVSchema, config.ksU1Tables),
h.getSchema(config.ksU1Tables), 1, 0, 100, nil)
verifyClusterHealth(t, vc)
for _, table := range config.ksU1Tables {
h.insertData(t, config.ksU1, table, 1, config.ksU1)
vtgateConn, cancel := getVTGateConn()
waitForRowCount(t, vtgateConn, config.ksU1+"@replica", table, 1)
log.Infof("waitForRowCount succeeded for %s, %s", config.ksU1+"@replica", table)
cancel()
}
keyspaces := []string{config.ksU1}
h.rebuildGraphs(t, keyspaces)
// FIXME: figure out how to ensure vtgate has processed the updated vschema
time.Sleep(5 * time.Second)
funcs.postKsU1(t)

vc.AddKeyspace(t, []*Cell{zone1}, config.ksU2, "0", h.getUnshardedVschema(unshardedHasVSchema, config.ksU2Tables),
h.getSchema(config.ksU2Tables), 1, 0, 200, nil)
verifyClusterHealth(t, vc)
for _, table := range config.ksU2Tables {
h.insertData(t, config.ksU2, table, 1, config.ksU2)
vtgateConn, cancel := getVTGateConn()
waitForRowCount(t, vtgateConn, config.ksU2+"@replica", table, 1)
log.Infof("waitForRowCount succeeded for %s, %s", config.ksU2+"@replica", table)
cancel()
}
keyspaces = append(keyspaces, config.ksU2)
h.rebuildGraphs(t, keyspaces)
time.Sleep(5 * time.Second)
funcs.postKsU2(t)

vc.AddKeyspace(t, []*Cell{zone1}, config.ksS1, "-80,80-", h.getShardedVSchema(config.ksS1Tables), h.getSchema(config.ksS1Tables),
1, 0, 300, nil)
verifyClusterHealth(t, vc)
for _, table := range config.ksS1Tables {
h.insertData(t, config.ksS1, table, 1, config.ksS1)
vtgateConn, cancel := getVTGateConn()
waitForRowCount(t, vtgateConn, config.ksS1+"@replica", table, 1)
log.Infof("waitForRowCount succeeded for %s, %s", config.ksS1+"@replica", table)
cancel()
}
keyspaces = append(keyspaces, config.ksS1)
h.rebuildGraphs(t, keyspaces)
time.Sleep(5 * time.Second)
funcs.postKsS1(t)
}
50 changes: 50 additions & 0 deletions go/test/endtoend/vreplication/r
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
cleanup() {
rm -rf /Users/rohit/vtdataroot/*
killall vtctldclient vtctld vttablet vtgate vtorc mysqlctl etcd
ps | grep /vtdataroot | awk '{print $1}' | xargs kill -9
ps x | grep mysql | grep -v grep | awk '{print $1}' | xargs kill -9


rm -rf ~/vtdataroot/*
mkdir -p ~/vtdataroot
mkdir -p ~/vtdataroot/tmp
mkdir -p ~/vtdataroot/ext
mkdir -p ~/vtdataroot/ext/tmp
}

declare -a tests=("TestMaterializeVtctld")
declare -a tests=("TestMaterializeView")
declare -a tests=("TestMultiTenantSimple")
declare -a tests=("TestReferenceTableMaterialize")
declare -a tests=("WorkflowDuplicateKeyBackoff")
declare -a tests=("BasicVreplicationWorkflow")
declare -a tests=("CellAlias")
declare -a tests=("TestVSchemaChangesUnderLoad")
declare -a tests=("TestMoveTablesBuffering")
declare -a tests=("MigrateSharded")
declare -a tests=("CopyParallel")
declare -a tests=("TestWorkflowDuplicateKeyBackoff2")
declare -a tests=("TestMoveTablesBasic")
declare -a tests=("TestVtctldclientCLI")

declare -a tests=("TestBasicVreplicationWorkflow")
declare -a tests=("TestLookupVindex")
declare -a tests=("TestGlobalRouting")


export VREPLICATION_E2E_DEBUG=
export CI=true
for test in ${tests[@]}; do
clear
echo "================ Starting $test =============="
echo
cleanup
go test -timeout 20m -failfast -v --alsologtostderr -run $test
RET=$?
echo "================ Done $test ================"
echo
say "$test done"
exit $RET
done


Loading
Loading