Skip to content

Commit

Permalink
Merge pull request #8805 from dolthub/aaron/binlog-tests-parallel
Browse files Browse the repository at this point in the history
[no-release-notes] go: binlogreplication: tests: Parallelize the test suite, slightly improve process management.
  • Loading branch information
reltuk authored Jan 31, 2025
2 parents 0ef447e + f131fb8 commit ed1af8f
Show file tree
Hide file tree
Showing 9 changed files with 1,177 additions and 1,037 deletions.
1,014 changes: 507 additions & 507 deletions go/libraries/doltcore/sqle/binlogreplication/binlog_primary_test.go

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -27,27 +27,27 @@ import (
// TestBinlogReplicationForAllTypes tests that operations (inserts, updates, and deletes) on all SQL
// data types can be successfully replicated.
func TestBinlogReplicationForAllTypes(t *testing.T) {
defer teardown(t)
startSqlServersWithDoltSystemVars(t, doltReplicaSystemVars)
startReplicationAndCreateTestDb(t, mySqlPort)
h := newHarness(t)
h.startSqlServersWithDoltSystemVars(doltReplicaSystemVars)
h.startReplicationAndCreateTestDb(h.mySqlPort)

// Set the session's timezone to UTC, to avoid TIMESTAMP test values changing
// when they are converted to UTC for storage.
primaryDatabase.MustExec("SET @@time_zone = '+0:00';")
h.primaryDatabase.MustExec("SET @@time_zone = '+0:00';")

// Create the test table
tableName := "alltypes"
createTableStatement := generateCreateTableStatement(tableName)
primaryDatabase.MustExec(createTableStatement)
h.primaryDatabase.MustExec(createTableStatement)

// Make inserts on the primary – small, large, and null values
primaryDatabase.MustExec(generateInsertValuesStatement(tableName, 0))
primaryDatabase.MustExec(generateInsertValuesStatement(tableName, 1))
primaryDatabase.MustExec(generateInsertNullValuesStatement(tableName))
h.primaryDatabase.MustExec(generateInsertValuesStatement(tableName, 0))
h.primaryDatabase.MustExec(generateInsertValuesStatement(tableName, 1))
h.primaryDatabase.MustExec(generateInsertNullValuesStatement(tableName))

// Verify inserts on replica
waitForReplicaToCatchUp(t)
rows, err := replicaDatabase.Queryx("select * from db01.alltypes order by pk asc;")
h.waitForReplicaToCatchUp()
rows, err := h.replicaDatabase.Queryx("select * from db01.alltypes order by pk asc;")
require.NoError(t, err)
row := convertMapScanResultToStrings(readNextRow(t, rows))
require.Equal(t, "1", row["pk"])
Expand All @@ -62,14 +62,14 @@ func TestBinlogReplicationForAllTypes(t *testing.T) {
require.NoError(t, rows.Close())

// Make updates on the primary
primaryDatabase.MustExec(generateUpdateToNullValuesStatement(tableName, 1))
primaryDatabase.MustExec(generateUpdateValuesStatement(tableName, 2, 0))
primaryDatabase.MustExec(generateUpdateValuesStatement(tableName, 3, 1))
h.primaryDatabase.MustExec(generateUpdateToNullValuesStatement(tableName, 1))
h.primaryDatabase.MustExec(generateUpdateValuesStatement(tableName, 2, 0))
h.primaryDatabase.MustExec(generateUpdateValuesStatement(tableName, 3, 1))

// Verify updates on the replica
waitForReplicaToCatchUp(t)
replicaDatabase.MustExec("use db01;")
rows, err = replicaDatabase.Queryx("select * from db01.alltypes order by pk asc;")
h.waitForReplicaToCatchUp()
h.replicaDatabase.MustExec("use db01;")
rows, err = h.replicaDatabase.Queryx("select * from db01.alltypes order by pk asc;")
require.NoError(t, err)
row = convertMapScanResultToStrings(readNextRow(t, rows))
require.Equal(t, "1", row["pk"])
Expand All @@ -84,13 +84,13 @@ func TestBinlogReplicationForAllTypes(t *testing.T) {
require.NoError(t, rows.Close())

// Make deletes on the primary
primaryDatabase.MustExec("delete from alltypes where pk=1;")
primaryDatabase.MustExec("delete from alltypes where pk=2;")
primaryDatabase.MustExec("delete from alltypes where pk=3;")
h.primaryDatabase.MustExec("delete from alltypes where pk=1;")
h.primaryDatabase.MustExec("delete from alltypes where pk=2;")
h.primaryDatabase.MustExec("delete from alltypes where pk=3;")

// Verify deletes on the replica
waitForReplicaToCatchUp(t)
rows, err = replicaDatabase.Queryx("select * from db01.alltypes order by pk asc;")
h.waitForReplicaToCatchUp()
rows, err = h.replicaDatabase.Queryx("select * from db01.alltypes order by pk asc;")
require.NoError(t, err)
require.False(t, rows.Next())
require.NoError(t, rows.Close())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,37 +24,37 @@ import (
// TestBinlogReplicationFilters_ignoreTablesOnly tests that the ignoreTables replication
// filtering option is correctly applied and honored.
func TestBinlogReplicationFilters_ignoreTablesOnly(t *testing.T) {
defer teardown(t)
startSqlServersWithDoltSystemVars(t, doltReplicaSystemVars)
startReplicationAndCreateTestDb(t, mySqlPort)
h := newHarness(t)
h.startSqlServersWithDoltSystemVars(doltReplicaSystemVars)
h.startReplicationAndCreateTestDb(h.mySqlPort)

// Ignore replication events for db01.t2. Also tests that the first filter setting is overwritten by
// the second and that db and that db and table names are case-insensitive.
replicaDatabase.MustExec("CHANGE REPLICATION FILTER REPLICATE_IGNORE_TABLE=(db01.t1);")
replicaDatabase.MustExec("CHANGE REPLICATION FILTER REPLICATE_IGNORE_TABLE=(DB01.T2);")
h.replicaDatabase.MustExec("CHANGE REPLICATION FILTER REPLICATE_IGNORE_TABLE=(db01.t1);")
h.replicaDatabase.MustExec("CHANGE REPLICATION FILTER REPLICATE_IGNORE_TABLE=(DB01.T2);")

// Assert that status shows replication filters
status := showReplicaStatus(t)
status := h.showReplicaStatus()
require.Equal(t, "db01.t2", status["Replicate_Ignore_Table"])
require.Equal(t, "", status["Replicate_Do_Table"])

// Make changes on the primary
primaryDatabase.MustExec("CREATE TABLE db01.t1 (pk INT PRIMARY KEY);")
primaryDatabase.MustExec("CREATE TABLE db01.t2 (pk INT PRIMARY KEY);")
h.primaryDatabase.MustExec("CREATE TABLE db01.t1 (pk INT PRIMARY KEY);")
h.primaryDatabase.MustExec("CREATE TABLE db01.t2 (pk INT PRIMARY KEY);")
for i := 1; i < 12; i++ {
primaryDatabase.MustExec(fmt.Sprintf("INSERT INTO db01.t1 VALUES (%d);", i))
primaryDatabase.MustExec(fmt.Sprintf("INSERT INTO db01.t2 VALUES (%d);", i))
h.primaryDatabase.MustExec(fmt.Sprintf("INSERT INTO db01.t1 VALUES (%d);", i))
h.primaryDatabase.MustExec(fmt.Sprintf("INSERT INTO db01.t2 VALUES (%d);", i))
}
primaryDatabase.MustExec("UPDATE db01.t1 set pk = pk-1;")
primaryDatabase.MustExec("UPDATE db01.t2 set pk = pk-1;")
primaryDatabase.MustExec("DELETE FROM db01.t1 WHERE pk = 10;")
primaryDatabase.MustExec("DELETE FROM db01.t2 WHERE pk = 10;")
h.primaryDatabase.MustExec("UPDATE db01.t1 set pk = pk-1;")
h.primaryDatabase.MustExec("UPDATE db01.t2 set pk = pk-1;")
h.primaryDatabase.MustExec("DELETE FROM db01.t1 WHERE pk = 10;")
h.primaryDatabase.MustExec("DELETE FROM db01.t2 WHERE pk = 10;")

// Pause to let the replica catch up
waitForReplicaToCatchUp(t)
h.waitForReplicaToCatchUp()

// Verify that all changes from t1 were applied on the replica
rows, err := replicaDatabase.Queryx("SELECT COUNT(pk) as count, MIN(pk) as min, MAX(pk) as max from db01.t1;")
rows, err := h.replicaDatabase.Queryx("SELECT COUNT(pk) as count, MIN(pk) as min, MAX(pk) as max from db01.t1;")
require.NoError(t, err)
row := convertMapScanResultToStrings(readNextRow(t, rows))
require.Equal(t, "10", row["count"])
Expand All @@ -63,7 +63,7 @@ func TestBinlogReplicationFilters_ignoreTablesOnly(t *testing.T) {
require.NoError(t, rows.Close())

// Verify that no changes from t2 were applied on the replica
rows, err = replicaDatabase.Queryx("SELECT COUNT(pk) as count, MIN(pk) as min, MAX(pk) as max from db01.t2;")
rows, err = h.replicaDatabase.Queryx("SELECT COUNT(pk) as count, MIN(pk) as min, MAX(pk) as max from db01.t2;")
require.NoError(t, err)
row = convertMapScanResultToStrings(readNextRow(t, rows))
require.Equal(t, "0", row["count"])
Expand All @@ -75,37 +75,37 @@ func TestBinlogReplicationFilters_ignoreTablesOnly(t *testing.T) {
// TestBinlogReplicationFilters_doTablesOnly tests that the doTables replication
// filtering option is correctly applied and honored.
func TestBinlogReplicationFilters_doTablesOnly(t *testing.T) {
defer teardown(t)
startSqlServersWithDoltSystemVars(t, doltReplicaSystemVars)
startReplicationAndCreateTestDb(t, mySqlPort)
h := newHarness(t)
h.startSqlServersWithDoltSystemVars(doltReplicaSystemVars)
h.startReplicationAndCreateTestDb(h.mySqlPort)

// Do replication events for db01.t1. Also tests that the first filter setting is overwritten by
// the second and that db and that db and table names are case-insensitive.
replicaDatabase.MustExec("CHANGE REPLICATION FILTER REPLICATE_DO_TABLE=(db01.t2);")
replicaDatabase.MustExec("CHANGE REPLICATION FILTER REPLICATE_DO_TABLE=(DB01.T1);")
h.replicaDatabase.MustExec("CHANGE REPLICATION FILTER REPLICATE_DO_TABLE=(db01.t2);")
h.replicaDatabase.MustExec("CHANGE REPLICATION FILTER REPLICATE_DO_TABLE=(DB01.T1);")

// Assert that status shows replication filters
status := showReplicaStatus(t)
status := h.showReplicaStatus()
require.Equal(t, "db01.t1", status["Replicate_Do_Table"])
require.Equal(t, "", status["Replicate_Ignore_Table"])

// Make changes on the primary
primaryDatabase.MustExec("CREATE TABLE db01.t1 (pk INT PRIMARY KEY);")
primaryDatabase.MustExec("CREATE TABLE db01.t2 (pk INT PRIMARY KEY);")
h.primaryDatabase.MustExec("CREATE TABLE db01.t1 (pk INT PRIMARY KEY);")
h.primaryDatabase.MustExec("CREATE TABLE db01.t2 (pk INT PRIMARY KEY);")
for i := 1; i < 12; i++ {
primaryDatabase.MustExec(fmt.Sprintf("INSERT INTO db01.t1 VALUES (%d);", i))
primaryDatabase.MustExec(fmt.Sprintf("INSERT INTO db01.t2 VALUES (%d);", i))
h.primaryDatabase.MustExec(fmt.Sprintf("INSERT INTO db01.t1 VALUES (%d);", i))
h.primaryDatabase.MustExec(fmt.Sprintf("INSERT INTO db01.t2 VALUES (%d);", i))
}
primaryDatabase.MustExec("UPDATE db01.t1 set pk = pk-1;")
primaryDatabase.MustExec("UPDATE db01.t2 set pk = pk-1;")
primaryDatabase.MustExec("DELETE FROM db01.t1 WHERE pk = 10;")
primaryDatabase.MustExec("DELETE FROM db01.t2 WHERE pk = 10;")
h.primaryDatabase.MustExec("UPDATE db01.t1 set pk = pk-1;")
h.primaryDatabase.MustExec("UPDATE db01.t2 set pk = pk-1;")
h.primaryDatabase.MustExec("DELETE FROM db01.t1 WHERE pk = 10;")
h.primaryDatabase.MustExec("DELETE FROM db01.t2 WHERE pk = 10;")

// Pause to let the replica catch up
waitForReplicaToCatchUp(t)
h.waitForReplicaToCatchUp()

// Verify that all changes from t1 were applied on the replica
rows, err := replicaDatabase.Queryx("SELECT COUNT(pk) as count, MIN(pk) as min, MAX(pk) as max from db01.t1;")
rows, err := h.replicaDatabase.Queryx("SELECT COUNT(pk) as count, MIN(pk) as min, MAX(pk) as max from db01.t1;")
require.NoError(t, err)
row := convertMapScanResultToStrings(readNextRow(t, rows))
require.Equal(t, "10", row["count"])
Expand All @@ -114,7 +114,7 @@ func TestBinlogReplicationFilters_doTablesOnly(t *testing.T) {
require.NoError(t, rows.Close())

// Verify that no changes from t2 were applied on the replica
rows, err = replicaDatabase.Queryx("SELECT COUNT(pk) as count, MIN(pk) as min, MAX(pk) as max from db01.t2;")
rows, err = h.replicaDatabase.Queryx("SELECT COUNT(pk) as count, MIN(pk) as min, MAX(pk) as max from db01.t2;")
require.NoError(t, err)
row = convertMapScanResultToStrings(readNextRow(t, rows))
require.Equal(t, "0", row["count"])
Expand All @@ -126,38 +126,38 @@ func TestBinlogReplicationFilters_doTablesOnly(t *testing.T) {
// TestBinlogReplicationFilters_doTablesAndIgnoreTables tests that the doTables and ignoreTables
// replication filtering options are correctly applied and honored when used together.
func TestBinlogReplicationFilters_doTablesAndIgnoreTables(t *testing.T) {
defer teardown(t)
startSqlServersWithDoltSystemVars(t, doltReplicaSystemVars)
startReplicationAndCreateTestDb(t, mySqlPort)
h := newHarness(t)
h.startSqlServersWithDoltSystemVars(doltReplicaSystemVars)
h.startReplicationAndCreateTestDb(h.mySqlPort)

// Do replication events for db01.t1, and db01.t2
replicaDatabase.MustExec("CHANGE REPLICATION FILTER REPLICATE_DO_TABLE=(db01.t1, db01.t2);")
h.replicaDatabase.MustExec("CHANGE REPLICATION FILTER REPLICATE_DO_TABLE=(db01.t1, db01.t2);")
// Ignore replication events for db01.t2
replicaDatabase.MustExec("CHANGE REPLICATION FILTER REPLICATE_IGNORE_TABLE=(db01.t2);")
h.replicaDatabase.MustExec("CHANGE REPLICATION FILTER REPLICATE_IGNORE_TABLE=(db01.t2);")

// Assert that replica status shows replication filters
status := showReplicaStatus(t)
status := h.showReplicaStatus()
require.True(t, status["Replicate_Do_Table"] == "db01.t1,db01.t2" ||
status["Replicate_Do_Table"] == "db01.t2,db01.t1")
require.Equal(t, "db01.t2", status["Replicate_Ignore_Table"])

// Make changes on the primary
primaryDatabase.MustExec("CREATE TABLE db01.t1 (pk INT PRIMARY KEY);")
primaryDatabase.MustExec("CREATE TABLE db01.t2 (pk INT PRIMARY KEY);")
h.primaryDatabase.MustExec("CREATE TABLE db01.t1 (pk INT PRIMARY KEY);")
h.primaryDatabase.MustExec("CREATE TABLE db01.t2 (pk INT PRIMARY KEY);")
for i := 1; i < 12; i++ {
primaryDatabase.MustExec(fmt.Sprintf("INSERT INTO db01.t1 VALUES (%d);", i))
primaryDatabase.MustExec(fmt.Sprintf("INSERT INTO db01.t2 VALUES (%d);", i))
h.primaryDatabase.MustExec(fmt.Sprintf("INSERT INTO db01.t1 VALUES (%d);", i))
h.primaryDatabase.MustExec(fmt.Sprintf("INSERT INTO db01.t2 VALUES (%d);", i))
}
primaryDatabase.MustExec("UPDATE db01.t1 set pk = pk-1;")
primaryDatabase.MustExec("UPDATE db01.t2 set pk = pk-1;")
primaryDatabase.MustExec("DELETE FROM db01.t1 WHERE pk = 10;")
primaryDatabase.MustExec("DELETE FROM db01.t2 WHERE pk = 10;")
h.primaryDatabase.MustExec("UPDATE db01.t1 set pk = pk-1;")
h.primaryDatabase.MustExec("UPDATE db01.t2 set pk = pk-1;")
h.primaryDatabase.MustExec("DELETE FROM db01.t1 WHERE pk = 10;")
h.primaryDatabase.MustExec("DELETE FROM db01.t2 WHERE pk = 10;")

// Pause to let the replica catch up
waitForReplicaToCatchUp(t)
h.waitForReplicaToCatchUp()

// Verify that all changes from t1 were applied on the replica
rows, err := replicaDatabase.Queryx("SELECT COUNT(pk) as count, MIN(pk) as min, MAX(pk) as max from db01.t1;")
rows, err := h.replicaDatabase.Queryx("SELECT COUNT(pk) as count, MIN(pk) as min, MAX(pk) as max from db01.t1;")
require.NoError(t, err)
row := convertMapScanResultToStrings(readNextRow(t, rows))
require.Equal(t, "10", row["count"])
Expand All @@ -166,7 +166,7 @@ func TestBinlogReplicationFilters_doTablesAndIgnoreTables(t *testing.T) {
require.NoError(t, rows.Close())

// Verify that no changes from t2 were applied on the replica
rows, err = replicaDatabase.Queryx("SELECT COUNT(pk) as count, MIN(pk) as min, MAX(pk) as max from db01.t2;")
rows, err = h.replicaDatabase.Queryx("SELECT COUNT(pk) as count, MIN(pk) as min, MAX(pk) as max from db01.t2;")
require.NoError(t, err)
row = convertMapScanResultToStrings(readNextRow(t, rows))
require.Equal(t, "0", row["count"])
Expand All @@ -177,15 +177,15 @@ func TestBinlogReplicationFilters_doTablesAndIgnoreTables(t *testing.T) {

// TestBinlogReplicationFilters_errorCases test returned errors for various error cases.
func TestBinlogReplicationFilters_errorCases(t *testing.T) {
defer teardown(t)
startSqlServers(t)
h := newHarness(t)
h.startSqlServers()

// All tables must be qualified with a database
_, err := replicaDatabase.Queryx("CHANGE REPLICATION FILTER REPLICATE_DO_TABLE=(t1);")
_, err := h.replicaDatabase.Queryx("CHANGE REPLICATION FILTER REPLICATE_DO_TABLE=(t1);")
require.Error(t, err)
require.ErrorContains(t, err, "no database specified for table")

_, err = replicaDatabase.Queryx("CHANGE REPLICATION FILTER REPLICATE_IGNORE_TABLE=(t1);")
_, err = h.replicaDatabase.Queryx("CHANGE REPLICATION FILTER REPLICATE_IGNORE_TABLE=(t1);")
require.Error(t, err)
require.ErrorContains(t, err, "no database specified for table")
}
Loading

0 comments on commit ed1af8f

Please sign in to comment.