Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[no-release-notes] go: binlogreplication: tests: Parallelize the test suite, slightly improve process management. #8805

Merged
merged 2 commits into from
Jan 31, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,014 changes: 507 additions & 507 deletions go/libraries/doltcore/sqle/binlogreplication/binlog_primary_test.go

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -27,27 +27,27 @@ import (
// TestBinlogReplicationForAllTypes tests that operations (inserts, updates, and deletes) on all SQL
// data types can be successfully replicated.
func TestBinlogReplicationForAllTypes(t *testing.T) {
defer teardown(t)
startSqlServersWithDoltSystemVars(t, doltReplicaSystemVars)
startReplicationAndCreateTestDb(t, mySqlPort)
h := newHarness(t)
h.startSqlServersWithDoltSystemVars(doltReplicaSystemVars)
h.startReplicationAndCreateTestDb(h.mySqlPort)

// Set the session's timezone to UTC, to avoid TIMESTAMP test values changing
// when they are converted to UTC for storage.
primaryDatabase.MustExec("SET @@time_zone = '+0:00';")
h.primaryDatabase.MustExec("SET @@time_zone = '+0:00';")

// Create the test table
tableName := "alltypes"
createTableStatement := generateCreateTableStatement(tableName)
primaryDatabase.MustExec(createTableStatement)
h.primaryDatabase.MustExec(createTableStatement)

// Make inserts on the primary – small, large, and null values
primaryDatabase.MustExec(generateInsertValuesStatement(tableName, 0))
primaryDatabase.MustExec(generateInsertValuesStatement(tableName, 1))
primaryDatabase.MustExec(generateInsertNullValuesStatement(tableName))
h.primaryDatabase.MustExec(generateInsertValuesStatement(tableName, 0))
h.primaryDatabase.MustExec(generateInsertValuesStatement(tableName, 1))
h.primaryDatabase.MustExec(generateInsertNullValuesStatement(tableName))

// Verify inserts on replica
waitForReplicaToCatchUp(t)
rows, err := replicaDatabase.Queryx("select * from db01.alltypes order by pk asc;")
h.waitForReplicaToCatchUp()
rows, err := h.replicaDatabase.Queryx("select * from db01.alltypes order by pk asc;")
require.NoError(t, err)
row := convertMapScanResultToStrings(readNextRow(t, rows))
require.Equal(t, "1", row["pk"])
Expand All @@ -62,14 +62,14 @@ func TestBinlogReplicationForAllTypes(t *testing.T) {
require.NoError(t, rows.Close())

// Make updates on the primary
primaryDatabase.MustExec(generateUpdateToNullValuesStatement(tableName, 1))
primaryDatabase.MustExec(generateUpdateValuesStatement(tableName, 2, 0))
primaryDatabase.MustExec(generateUpdateValuesStatement(tableName, 3, 1))
h.primaryDatabase.MustExec(generateUpdateToNullValuesStatement(tableName, 1))
h.primaryDatabase.MustExec(generateUpdateValuesStatement(tableName, 2, 0))
h.primaryDatabase.MustExec(generateUpdateValuesStatement(tableName, 3, 1))

// Verify updates on the replica
waitForReplicaToCatchUp(t)
replicaDatabase.MustExec("use db01;")
rows, err = replicaDatabase.Queryx("select * from db01.alltypes order by pk asc;")
h.waitForReplicaToCatchUp()
h.replicaDatabase.MustExec("use db01;")
rows, err = h.replicaDatabase.Queryx("select * from db01.alltypes order by pk asc;")
require.NoError(t, err)
row = convertMapScanResultToStrings(readNextRow(t, rows))
require.Equal(t, "1", row["pk"])
Expand All @@ -84,13 +84,13 @@ func TestBinlogReplicationForAllTypes(t *testing.T) {
require.NoError(t, rows.Close())

// Make deletes on the primary
primaryDatabase.MustExec("delete from alltypes where pk=1;")
primaryDatabase.MustExec("delete from alltypes where pk=2;")
primaryDatabase.MustExec("delete from alltypes where pk=3;")
h.primaryDatabase.MustExec("delete from alltypes where pk=1;")
h.primaryDatabase.MustExec("delete from alltypes where pk=2;")
h.primaryDatabase.MustExec("delete from alltypes where pk=3;")

// Verify deletes on the replica
waitForReplicaToCatchUp(t)
rows, err = replicaDatabase.Queryx("select * from db01.alltypes order by pk asc;")
h.waitForReplicaToCatchUp()
rows, err = h.replicaDatabase.Queryx("select * from db01.alltypes order by pk asc;")
require.NoError(t, err)
require.False(t, rows.Next())
require.NoError(t, rows.Close())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,37 +24,37 @@ import (
// TestBinlogReplicationFilters_ignoreTablesOnly tests that the ignoreTables replication
// filtering option is correctly applied and honored.
func TestBinlogReplicationFilters_ignoreTablesOnly(t *testing.T) {
defer teardown(t)
startSqlServersWithDoltSystemVars(t, doltReplicaSystemVars)
startReplicationAndCreateTestDb(t, mySqlPort)
h := newHarness(t)
h.startSqlServersWithDoltSystemVars(doltReplicaSystemVars)
h.startReplicationAndCreateTestDb(h.mySqlPort)

// Ignore replication events for db01.t2. Also tests that the first filter setting is overwritten by
// the second and that db and that db and table names are case-insensitive.
replicaDatabase.MustExec("CHANGE REPLICATION FILTER REPLICATE_IGNORE_TABLE=(db01.t1);")
replicaDatabase.MustExec("CHANGE REPLICATION FILTER REPLICATE_IGNORE_TABLE=(DB01.T2);")
h.replicaDatabase.MustExec("CHANGE REPLICATION FILTER REPLICATE_IGNORE_TABLE=(db01.t1);")
h.replicaDatabase.MustExec("CHANGE REPLICATION FILTER REPLICATE_IGNORE_TABLE=(DB01.T2);")

// Assert that status shows replication filters
status := showReplicaStatus(t)
status := h.showReplicaStatus()
require.Equal(t, "db01.t2", status["Replicate_Ignore_Table"])
require.Equal(t, "", status["Replicate_Do_Table"])

// Make changes on the primary
primaryDatabase.MustExec("CREATE TABLE db01.t1 (pk INT PRIMARY KEY);")
primaryDatabase.MustExec("CREATE TABLE db01.t2 (pk INT PRIMARY KEY);")
h.primaryDatabase.MustExec("CREATE TABLE db01.t1 (pk INT PRIMARY KEY);")
h.primaryDatabase.MustExec("CREATE TABLE db01.t2 (pk INT PRIMARY KEY);")
for i := 1; i < 12; i++ {
primaryDatabase.MustExec(fmt.Sprintf("INSERT INTO db01.t1 VALUES (%d);", i))
primaryDatabase.MustExec(fmt.Sprintf("INSERT INTO db01.t2 VALUES (%d);", i))
h.primaryDatabase.MustExec(fmt.Sprintf("INSERT INTO db01.t1 VALUES (%d);", i))
h.primaryDatabase.MustExec(fmt.Sprintf("INSERT INTO db01.t2 VALUES (%d);", i))
}
primaryDatabase.MustExec("UPDATE db01.t1 set pk = pk-1;")
primaryDatabase.MustExec("UPDATE db01.t2 set pk = pk-1;")
primaryDatabase.MustExec("DELETE FROM db01.t1 WHERE pk = 10;")
primaryDatabase.MustExec("DELETE FROM db01.t2 WHERE pk = 10;")
h.primaryDatabase.MustExec("UPDATE db01.t1 set pk = pk-1;")
h.primaryDatabase.MustExec("UPDATE db01.t2 set pk = pk-1;")
h.primaryDatabase.MustExec("DELETE FROM db01.t1 WHERE pk = 10;")
h.primaryDatabase.MustExec("DELETE FROM db01.t2 WHERE pk = 10;")

// Pause to let the replica catch up
waitForReplicaToCatchUp(t)
h.waitForReplicaToCatchUp()

// Verify that all changes from t1 were applied on the replica
rows, err := replicaDatabase.Queryx("SELECT COUNT(pk) as count, MIN(pk) as min, MAX(pk) as max from db01.t1;")
rows, err := h.replicaDatabase.Queryx("SELECT COUNT(pk) as count, MIN(pk) as min, MAX(pk) as max from db01.t1;")
require.NoError(t, err)
row := convertMapScanResultToStrings(readNextRow(t, rows))
require.Equal(t, "10", row["count"])
Expand All @@ -63,7 +63,7 @@ func TestBinlogReplicationFilters_ignoreTablesOnly(t *testing.T) {
require.NoError(t, rows.Close())

// Verify that no changes from t2 were applied on the replica
rows, err = replicaDatabase.Queryx("SELECT COUNT(pk) as count, MIN(pk) as min, MAX(pk) as max from db01.t2;")
rows, err = h.replicaDatabase.Queryx("SELECT COUNT(pk) as count, MIN(pk) as min, MAX(pk) as max from db01.t2;")
require.NoError(t, err)
row = convertMapScanResultToStrings(readNextRow(t, rows))
require.Equal(t, "0", row["count"])
Expand All @@ -75,37 +75,37 @@ func TestBinlogReplicationFilters_ignoreTablesOnly(t *testing.T) {
// TestBinlogReplicationFilters_doTablesOnly tests that the doTables replication
// filtering option is correctly applied and honored.
func TestBinlogReplicationFilters_doTablesOnly(t *testing.T) {
defer teardown(t)
startSqlServersWithDoltSystemVars(t, doltReplicaSystemVars)
startReplicationAndCreateTestDb(t, mySqlPort)
h := newHarness(t)
h.startSqlServersWithDoltSystemVars(doltReplicaSystemVars)
h.startReplicationAndCreateTestDb(h.mySqlPort)

// Do replication events for db01.t1. Also tests that the first filter setting is overwritten by
// the second and that db and that db and table names are case-insensitive.
replicaDatabase.MustExec("CHANGE REPLICATION FILTER REPLICATE_DO_TABLE=(db01.t2);")
replicaDatabase.MustExec("CHANGE REPLICATION FILTER REPLICATE_DO_TABLE=(DB01.T1);")
h.replicaDatabase.MustExec("CHANGE REPLICATION FILTER REPLICATE_DO_TABLE=(db01.t2);")
h.replicaDatabase.MustExec("CHANGE REPLICATION FILTER REPLICATE_DO_TABLE=(DB01.T1);")

// Assert that status shows replication filters
status := showReplicaStatus(t)
status := h.showReplicaStatus()
require.Equal(t, "db01.t1", status["Replicate_Do_Table"])
require.Equal(t, "", status["Replicate_Ignore_Table"])

// Make changes on the primary
primaryDatabase.MustExec("CREATE TABLE db01.t1 (pk INT PRIMARY KEY);")
primaryDatabase.MustExec("CREATE TABLE db01.t2 (pk INT PRIMARY KEY);")
h.primaryDatabase.MustExec("CREATE TABLE db01.t1 (pk INT PRIMARY KEY);")
h.primaryDatabase.MustExec("CREATE TABLE db01.t2 (pk INT PRIMARY KEY);")
for i := 1; i < 12; i++ {
primaryDatabase.MustExec(fmt.Sprintf("INSERT INTO db01.t1 VALUES (%d);", i))
primaryDatabase.MustExec(fmt.Sprintf("INSERT INTO db01.t2 VALUES (%d);", i))
h.primaryDatabase.MustExec(fmt.Sprintf("INSERT INTO db01.t1 VALUES (%d);", i))
h.primaryDatabase.MustExec(fmt.Sprintf("INSERT INTO db01.t2 VALUES (%d);", i))
}
primaryDatabase.MustExec("UPDATE db01.t1 set pk = pk-1;")
primaryDatabase.MustExec("UPDATE db01.t2 set pk = pk-1;")
primaryDatabase.MustExec("DELETE FROM db01.t1 WHERE pk = 10;")
primaryDatabase.MustExec("DELETE FROM db01.t2 WHERE pk = 10;")
h.primaryDatabase.MustExec("UPDATE db01.t1 set pk = pk-1;")
h.primaryDatabase.MustExec("UPDATE db01.t2 set pk = pk-1;")
h.primaryDatabase.MustExec("DELETE FROM db01.t1 WHERE pk = 10;")
h.primaryDatabase.MustExec("DELETE FROM db01.t2 WHERE pk = 10;")

// Pause to let the replica catch up
waitForReplicaToCatchUp(t)
h.waitForReplicaToCatchUp()

// Verify that all changes from t1 were applied on the replica
rows, err := replicaDatabase.Queryx("SELECT COUNT(pk) as count, MIN(pk) as min, MAX(pk) as max from db01.t1;")
rows, err := h.replicaDatabase.Queryx("SELECT COUNT(pk) as count, MIN(pk) as min, MAX(pk) as max from db01.t1;")
require.NoError(t, err)
row := convertMapScanResultToStrings(readNextRow(t, rows))
require.Equal(t, "10", row["count"])
Expand All @@ -114,7 +114,7 @@ func TestBinlogReplicationFilters_doTablesOnly(t *testing.T) {
require.NoError(t, rows.Close())

// Verify that no changes from t2 were applied on the replica
rows, err = replicaDatabase.Queryx("SELECT COUNT(pk) as count, MIN(pk) as min, MAX(pk) as max from db01.t2;")
rows, err = h.replicaDatabase.Queryx("SELECT COUNT(pk) as count, MIN(pk) as min, MAX(pk) as max from db01.t2;")
require.NoError(t, err)
row = convertMapScanResultToStrings(readNextRow(t, rows))
require.Equal(t, "0", row["count"])
Expand All @@ -126,38 +126,38 @@ func TestBinlogReplicationFilters_doTablesOnly(t *testing.T) {
// TestBinlogReplicationFilters_doTablesAndIgnoreTables tests that the doTables and ignoreTables
// replication filtering options are correctly applied and honored when used together.
func TestBinlogReplicationFilters_doTablesAndIgnoreTables(t *testing.T) {
defer teardown(t)
startSqlServersWithDoltSystemVars(t, doltReplicaSystemVars)
startReplicationAndCreateTestDb(t, mySqlPort)
h := newHarness(t)
h.startSqlServersWithDoltSystemVars(doltReplicaSystemVars)
h.startReplicationAndCreateTestDb(h.mySqlPort)

// Do replication events for db01.t1, and db01.t2
replicaDatabase.MustExec("CHANGE REPLICATION FILTER REPLICATE_DO_TABLE=(db01.t1, db01.t2);")
h.replicaDatabase.MustExec("CHANGE REPLICATION FILTER REPLICATE_DO_TABLE=(db01.t1, db01.t2);")
// Ignore replication events for db01.t2
replicaDatabase.MustExec("CHANGE REPLICATION FILTER REPLICATE_IGNORE_TABLE=(db01.t2);")
h.replicaDatabase.MustExec("CHANGE REPLICATION FILTER REPLICATE_IGNORE_TABLE=(db01.t2);")

// Assert that replica status shows replication filters
status := showReplicaStatus(t)
status := h.showReplicaStatus()
require.True(t, status["Replicate_Do_Table"] == "db01.t1,db01.t2" ||
status["Replicate_Do_Table"] == "db01.t2,db01.t1")
require.Equal(t, "db01.t2", status["Replicate_Ignore_Table"])

// Make changes on the primary
primaryDatabase.MustExec("CREATE TABLE db01.t1 (pk INT PRIMARY KEY);")
primaryDatabase.MustExec("CREATE TABLE db01.t2 (pk INT PRIMARY KEY);")
h.primaryDatabase.MustExec("CREATE TABLE db01.t1 (pk INT PRIMARY KEY);")
h.primaryDatabase.MustExec("CREATE TABLE db01.t2 (pk INT PRIMARY KEY);")
for i := 1; i < 12; i++ {
primaryDatabase.MustExec(fmt.Sprintf("INSERT INTO db01.t1 VALUES (%d);", i))
primaryDatabase.MustExec(fmt.Sprintf("INSERT INTO db01.t2 VALUES (%d);", i))
h.primaryDatabase.MustExec(fmt.Sprintf("INSERT INTO db01.t1 VALUES (%d);", i))
h.primaryDatabase.MustExec(fmt.Sprintf("INSERT INTO db01.t2 VALUES (%d);", i))
}
primaryDatabase.MustExec("UPDATE db01.t1 set pk = pk-1;")
primaryDatabase.MustExec("UPDATE db01.t2 set pk = pk-1;")
primaryDatabase.MustExec("DELETE FROM db01.t1 WHERE pk = 10;")
primaryDatabase.MustExec("DELETE FROM db01.t2 WHERE pk = 10;")
h.primaryDatabase.MustExec("UPDATE db01.t1 set pk = pk-1;")
h.primaryDatabase.MustExec("UPDATE db01.t2 set pk = pk-1;")
h.primaryDatabase.MustExec("DELETE FROM db01.t1 WHERE pk = 10;")
h.primaryDatabase.MustExec("DELETE FROM db01.t2 WHERE pk = 10;")

// Pause to let the replica catch up
waitForReplicaToCatchUp(t)
h.waitForReplicaToCatchUp()

// Verify that all changes from t1 were applied on the replica
rows, err := replicaDatabase.Queryx("SELECT COUNT(pk) as count, MIN(pk) as min, MAX(pk) as max from db01.t1;")
rows, err := h.replicaDatabase.Queryx("SELECT COUNT(pk) as count, MIN(pk) as min, MAX(pk) as max from db01.t1;")
require.NoError(t, err)
row := convertMapScanResultToStrings(readNextRow(t, rows))
require.Equal(t, "10", row["count"])
Expand All @@ -166,7 +166,7 @@ func TestBinlogReplicationFilters_doTablesAndIgnoreTables(t *testing.T) {
require.NoError(t, rows.Close())

// Verify that no changes from t2 were applied on the replica
rows, err = replicaDatabase.Queryx("SELECT COUNT(pk) as count, MIN(pk) as min, MAX(pk) as max from db01.t2;")
rows, err = h.replicaDatabase.Queryx("SELECT COUNT(pk) as count, MIN(pk) as min, MAX(pk) as max from db01.t2;")
require.NoError(t, err)
row = convertMapScanResultToStrings(readNextRow(t, rows))
require.Equal(t, "0", row["count"])
Expand All @@ -177,15 +177,15 @@ func TestBinlogReplicationFilters_doTablesAndIgnoreTables(t *testing.T) {

// TestBinlogReplicationFilters_errorCases test returned errors for various error cases.
func TestBinlogReplicationFilters_errorCases(t *testing.T) {
defer teardown(t)
startSqlServers(t)
h := newHarness(t)
h.startSqlServers()

// All tables must be qualified with a database
_, err := replicaDatabase.Queryx("CHANGE REPLICATION FILTER REPLICATE_DO_TABLE=(t1);")
_, err := h.replicaDatabase.Queryx("CHANGE REPLICATION FILTER REPLICATE_DO_TABLE=(t1);")
require.Error(t, err)
require.ErrorContains(t, err, "no database specified for table")

_, err = replicaDatabase.Queryx("CHANGE REPLICATION FILTER REPLICATE_IGNORE_TABLE=(t1);")
_, err = h.replicaDatabase.Queryx("CHANGE REPLICATION FILTER REPLICATE_IGNORE_TABLE=(t1);")
require.Error(t, err)
require.ErrorContains(t, err, "no database specified for table")
}
Loading
Loading