Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

VReplication: Fixes for generated column handling #17107

Merged
merged 6 commits into from
Nov 1, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions go/test/endtoend/vreplication/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ import (
"testing"
"time"

"github.com/stretchr/testify/require"

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/test/endtoend/cluster"
"vitess.io/vitess/go/test/endtoend/throttler"
Expand All @@ -40,8 +42,6 @@ import (
vttablet "vitess.io/vitess/go/vt/vttablet/common"

vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata"

"github.com/stretchr/testify/require"
)

var (
Expand Down
4 changes: 2 additions & 2 deletions go/test/endtoend/vreplication/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ var (
customerTypes = []string{"'individual'", "'soho'", "'enterprise'"}
initialProductSchema = fmt.Sprintf(`
create table product(pid int, description varbinary(128), date1 datetime not null default '0000-00-00 00:00:00', date2 datetime not null default '2021-00-01 00:00:00', primary key(pid), key(date1,date2)) CHARSET=utf8mb4;
create table customer(cid int auto_increment, name varchar(128) collate utf8mb4_bin, meta json default null, typ enum(%s), sport set('football','cricket','baseball'),
ts timestamp not null default current_timestamp, bits bit(2) default b'11', date1 datetime not null default '0000-00-00 00:00:00',
create table customer(cid int auto_increment, name varchar(128) collate utf8mb4_bin, meta json default null, industryCategory varchar(100) generated always as (json_extract(meta, '$.industry')) virtual,
typ enum(%s), sport set('football','cricket','baseball'), ts timestamp not null default current_timestamp, bits bit(2) default b'11', date1 datetime not null default '0000-00-00 00:00:00',
date2 datetime not null default '2021-00-01 00:00:00', dec80 decimal(8,0), blb blob, primary key(cid,typ), key(name)) CHARSET=utf8mb4;
create table customer_seq(id int, next_id bigint, cache bigint, primary key(id)) comment 'vitess_sequence';
create table merchant(mname varchar(128), category varchar(128), primary key(mname), key(category)) CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;
Expand Down
4 changes: 2 additions & 2 deletions go/test/endtoend/vreplication/unsharded_init_data.sql
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
insert into customer(cid, name, typ, sport, meta) values(1, 'Jøhn "❤️" Rizzolo',1,'football,baseball','{}');
insert into customer(cid, name, typ, sport, meta) values(1, 'Jøhn "❤️" Rizzolo',1,'football,baseball','{"industry":"IT SaaS","company":"PlanetScale"}');
insert into customer(cid, name, typ, sport, meta) values(2, 'Paül','soho','cricket',convert(x'7b7d' using utf8mb4));
-- We use a high cid value here to test the target sequence initialization.
insert into customer(cid, name, typ, sport, blb) values(999999, 'ringo','enterprise','','blob data');
insert into customer(cid, name, typ, sport, blb, meta) values(999999, 'ringo','enterprise','','blob data', '{"industry":"Music"}');
insert into merchant(mname, category) values('Monoprice', 'eléctronics');
insert into merchant(mname, category) values('newegg', 'elec†ronics');
insert into product(pid, description) values(1, 'keyböard ⌨️');
Expand Down
14 changes: 9 additions & 5 deletions go/test/endtoend/vreplication/vreplication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -587,12 +587,16 @@ func testVStreamCellFlag(t *testing.T) {
}
}

// TestCellAliasVreplicationWorkflow tests replication from a cell with an alias to test the tablet picker's alias functionality
// We also reuse the setup of this test to validate that the "vstream * from" vtgate query functionality is functional
// TestCellAliasVreplicationWorkflow tests replication from a cell with an alias to test
// the tablet picker's alias functionality.
// We also reuse the setup of this test to validate that the "vstream * from" vtgate
// query functionality is functional.
func TestCellAliasVreplicationWorkflow(t *testing.T) {
cells := []string{"zone1", "zone2"}
defer mainClusterConfig.enableGTIDCompression()
defer setAllVTTabletExperimentalFlags()
resetCompression := mainClusterConfig.enableGTIDCompression()
defer resetCompression()
resetExperimentalFlags := setAllVTTabletExperimentalFlags()
defer resetExperimentalFlags()
Comment on lines +596 to +599
Copy link
Contributor Author

@mattlord mattlord Oct 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is unrelated to the PR issues, but I noticed that it was incorrect as I walked through this test.

vc = NewVitessCluster(t, &clusterOptions{cells: cells})
defer vc.TearDown()

Expand Down Expand Up @@ -718,12 +722,12 @@ func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAl
vtgateConn := getConnection(t, vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateMySQLPort)
defer vtgateConn.Close()
// Confirm that the 0 scale decimal field, dec80, is replicated correctly
dec80Replicated := false
execVtgateQuery(t, vtgateConn, sourceKs, "update customer set dec80 = 0")
execVtgateQuery(t, vtgateConn, sourceKs, "update customer set blb = \"new blob data\" where cid=3")
execVtgateQuery(t, vtgateConn, sourceKs, "update json_tbl set j1 = null, j2 = 'null', j3 = '\"null\"'")
execVtgateQuery(t, vtgateConn, sourceKs, "insert into json_tbl(id, j1, j2, j3) values (7, null, 'null', '\"null\"')")
waitForNoWorkflowLag(t, vc, targetKs, workflow)
dec80Replicated := false
for _, tablet := range []*cluster.VttabletProcess{customerTab1, customerTab2} {
// Query the tablet's mysqld directly as the targets will have denied table entries.
dbc, err := tablet.TabletConn(targetKs, true)
Expand Down
23 changes: 12 additions & 11 deletions go/vt/vttablet/tabletmanager/vreplication/replicator_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,19 +111,18 @@ func (rp *ReplicatorPlan) buildFromFields(tableName string, lastpk *sqltypes.Res
}
for _, field := range fields {
colName := sqlparser.NewIdentifierCI(field.Name)
isGenerated := false
generated := false
// We have to loop over the columns in the plan as the columns between the
// source and target are not always 1 to 1.
for _, colInfo := range tpb.colInfos {
if !strings.EqualFold(colInfo.Name, field.Name) {
continue
}
if colInfo.IsGenerated {
isGenerated = true
generated = true
}
break
}
if isGenerated {
continue
}
cexpr := &colExpr{
colName: colName,
colType: field.Type,
Expand All @@ -133,6 +132,7 @@ func (rp *ReplicatorPlan) buildFromFields(tableName string, lastpk *sqltypes.Res
references: map[string]bool{
field.Name: true,
},
isGenerated: generated,
}
tpb.colExprs = append(tpb.colExprs, cexpr)
}
Expand Down Expand Up @@ -608,12 +608,13 @@ func valsEqual(v1, v2 sqltypes.Value) bool {
return v1.ToString() == v2.ToString()
}

// AppendFromRow behaves like Append but takes a querypb.Row directly, assuming that
// the fields in the row are in the same order as the placeholders in this query. The fields might include generated
// columns which are dropped, by checking against skipFields, before binding the variables
// note: there can be more fields than bind locations since extra columns might be requested from the source if not all
// primary keys columns are present in the target table, for example. Also some values in the row may not correspond for
// values from the database on the source: sum/count for aggregation queries, for example
// AppendFromRow behaves like Append but takes a querypb.Row directly, assuming that the
// fields in the row are in the same order as the placeholders in this query. The fields
// might include generated columns which are dropped before binding the variables note:
// there can be more fields than bind locations since extra columns might be requested
// from the source if not all primary keys columns are present in the target table, for
// example. Also some values in the row may not correspond for values from the database
// on the source: sum/count for aggregation queries, for example.
func (tp *TablePlan) appendFromRow(buf *bytes2.Buffer, row *querypb.Row) error {
bindLocations := tp.BulkInsertValues.BindLocations()
if len(tp.Fields) < len(bindLocations) {
Expand Down
33 changes: 11 additions & 22 deletions go/vt/vttablet/tabletmanager/vreplication/table_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,11 @@ type colExpr struct {
// references contains all the column names referenced in the expression.
references map[string]bool

isGrouped bool
isPK bool
dataType string
columnType string
isGrouped bool
isPK bool
isGenerated bool
dataType string
columnType string
}

// operation is the opcode for the colExpr.
Expand Down Expand Up @@ -360,7 +361,7 @@ func (tpb *tablePlanBuilder) generate() *TablePlan {
fieldsToSkip := make(map[string]bool)
for _, colInfo := range tpb.colInfos {
if colInfo.IsGenerated {
fieldsToSkip[colInfo.Name] = true
fieldsToSkip[strings.ToLower(colInfo.Name)] = true
}
}
return &TablePlan{
Expand Down Expand Up @@ -694,7 +695,7 @@ func (tpb *tablePlanBuilder) generateInsertPart(buf *sqlparser.TrackedBuffer) *s
}
separator := ""
for _, cexpr := range tpb.colExprs {
if tpb.isColumnGenerated(cexpr.colName) {
if cexpr.isGenerated {
continue
}
buf.Myprintf("%s%v", separator, cexpr.colName)
Expand All @@ -708,7 +709,7 @@ func (tpb *tablePlanBuilder) generateValuesPart(buf *sqlparser.TrackedBuffer, bv
bvf.mode = bvAfter
separator := "("
for _, cexpr := range tpb.colExprs {
if tpb.isColumnGenerated(cexpr.colName) {
if cexpr.isGenerated {
continue
}
buf.Myprintf("%s", separator)
Expand Down Expand Up @@ -745,7 +746,7 @@ func (tpb *tablePlanBuilder) generateSelectPart(buf *sqlparser.TrackedBuffer, bv
buf.WriteString(" select ")
separator := ""
for _, cexpr := range tpb.colExprs {
if tpb.isColumnGenerated(cexpr.colName) {
if cexpr.isGenerated {
continue
}
buf.Myprintf("%s", separator)
Expand Down Expand Up @@ -781,7 +782,7 @@ func (tpb *tablePlanBuilder) generateOnDupPart(buf *sqlparser.TrackedBuffer) *sq
if cexpr.isGrouped || cexpr.isPK {
continue
}
if tpb.isColumnGenerated(cexpr.colName) {
if cexpr.isGenerated {
continue
}
buf.Myprintf("%s%v=", separator, cexpr.colName)
Expand Down Expand Up @@ -812,10 +813,7 @@ func (tpb *tablePlanBuilder) generateUpdateStatement() *sqlparser.ParsedQuery {
if cexpr.isPK {
tpb.pkIndices[i] = true
}
if cexpr.isGrouped || cexpr.isPK {
continue
}
if tpb.isColumnGenerated(cexpr.colName) {
if cexpr.isGrouped || cexpr.isPK || cexpr.isGenerated {
continue
}
buf.Myprintf("%s%v=", separator, cexpr.colName)
Expand Down Expand Up @@ -961,15 +959,6 @@ func (tpb *tablePlanBuilder) generatePKConstraint(buf *sqlparser.TrackedBuffer,
buf.WriteString(")")
}

func (tpb *tablePlanBuilder) isColumnGenerated(col sqlparser.IdentifierCI) bool {
for _, colInfo := range tpb.colInfos {
if col.EqualString(colInfo.Name) && colInfo.IsGenerated {
return true
}
}
return false
}

// bindvarFormatter is a dual mode formatter. Its behavior
// can be changed dynamically changed to generate bind vars
// for the 'before' row or 'after' row by setting its mode
Expand Down
17 changes: 4 additions & 13 deletions go/vt/vttablet/tabletmanager/vreplication/table_plan_partial.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,7 @@ func (tpb *tablePlanBuilder) generatePartialValuesPart(buf *sqlparser.TrackedBuf
bvf.mode = bvAfter
separator := "("
for ind, cexpr := range tpb.colExprs {
if tpb.isColumnGenerated(cexpr.colName) {
continue
}
if !isBitSet(dataColumns.Cols, ind) {
if cexpr.isGenerated || !isBitSet(dataColumns.Cols, ind) {
continue
}
buf.Myprintf("%s", separator)
Expand Down Expand Up @@ -84,7 +81,7 @@ func (tpb *tablePlanBuilder) generatePartialInsertPart(buf *sqlparser.TrackedBuf
buf.Myprintf("insert into %v(", tpb.name)
separator := ""
for ind, cexpr := range tpb.colExprs {
if tpb.isColumnGenerated(cexpr.colName) {
if cexpr.isGenerated {
continue
}
if !isBitSet(dataColumns.Cols, ind) {
Expand All @@ -102,7 +99,7 @@ func (tpb *tablePlanBuilder) generatePartialSelectPart(buf *sqlparser.TrackedBuf
buf.WriteString(" select ")
separator := ""
for ind, cexpr := range tpb.colExprs {
if tpb.isColumnGenerated(cexpr.colName) {
if cexpr.isGenerated {
continue
}
if !isBitSet(dataColumns.Cols, ind) {
Expand Down Expand Up @@ -141,17 +138,11 @@ func (tpb *tablePlanBuilder) createPartialUpdateQuery(dataColumns *binlogdatapb.
buf.Myprintf("update %v set ", tpb.name)
separator := ""
for i, cexpr := range tpb.colExprs {
if cexpr.isPK {
continue
}
if tpb.isColumnGenerated(cexpr.colName) {
continue
}
if int64(i) >= dataColumns.Count {
log.Errorf("Ran out of columns trying to generate query for %s", tpb.name.CompliantName())
return nil
}
if !isBitSet(dataColumns.Cols, i) {
if cexpr.isPK || cexpr.isGenerated || !isBitSet(dataColumns.Cols, i) {
continue
}
buf.Myprintf("%s%v=", separator, cexpr.colName)
Expand Down
Loading