diff --git a/cdc/model/sink.go b/cdc/model/sink.go index cc33d2b1008..26e348830aa 100644 --- a/cdc/model/sink.go +++ b/cdc/model/sink.go @@ -1160,6 +1160,11 @@ func (t *SingleTableTxn) GetPhysicalTableID() int64 { return t.PhysicalTableID } +// GetLogicalTableID returns the logical table id of the table in the transaction +func (t *SingleTableTxn) GetLogicalTableID() int64 { + return t.TableInfo.TableName.TableID +} + // TrySplitAndSortUpdateEvent split update events if unique key is updated func (t *SingleTableTxn) TrySplitAndSortUpdateEvent(scheme string, outputRawChangeEvent bool) error { if sink.IsMySQLCompatibleScheme(scheme) || outputRawChangeEvent { diff --git a/cdc/sink/dmlsink/cloudstorage/cloud_storage_dml_sink.go b/cdc/sink/dmlsink/cloudstorage/cloud_storage_dml_sink.go index d41ae5d785e..8ddeacb78b8 100644 --- a/cdc/sink/dmlsink/cloudstorage/cloud_storage_dml_sink.go +++ b/cdc/sink/dmlsink/cloudstorage/cloud_storage_dml_sink.go @@ -74,6 +74,7 @@ type eventFragment struct { type DMLSink struct { changefeedID model.ChangeFeedID scheme string + config *cloudstorage.Config outputRawChangeEvent bool // last sequence number lastSeqNum uint64 @@ -147,6 +148,7 @@ func NewDMLSink(ctx context.Context, s := &DMLSink{ changefeedID: changefeedID, scheme: strings.ToLower(sinkURI.Scheme), + config: cfg, outputRawChangeEvent: replicaConfig.Sink.CloudStorageConfig.GetOutputRawChangeEvent(), encodingWorkers: make([]*encodingWorker, defaultEncodingConcurrency), workers: make([]*dmlWorker, cfg.WorkerCount), @@ -249,11 +251,17 @@ func (s *DMLSink) WriteEvents(txns ...*dmlsink.CallbackableEvent[*model.SingleTa continue } + tableID := txn.Event.GetPhysicalTableID() + // If EnablePartitionSeparator is false, all partition data will be written to the same directory. + // So we need to use the logical table ID to manage file index between different partitions. + if !s.config.EnablePartitionSeparator && txn.Event.TableInfo.IsPartitionTable() { + tableID = txn.Event.GetLogicalTableID() + } tbl := cloudstorage.VersionedTableName{ - TableNameWithPhysicTableID: model.TableName{ + TableNameWithTableID: model.TableName{ Schema: txn.Event.TableInfo.GetSchemaName(), Table: txn.Event.TableInfo.GetTableName(), - TableID: txn.Event.GetPhysicalTableID(), + TableID: tableID, IsPartition: txn.Event.TableInfo.IsPartitionTable(), }, TableInfoVersion: txn.Event.TableInfoVersion, diff --git a/cdc/sink/dmlsink/cloudstorage/defragmenter.go b/cdc/sink/dmlsink/cloudstorage/defragmenter.go index 82e3e9ca1b2..83461a95217 100644 --- a/cdc/sink/dmlsink/cloudstorage/defragmenter.go +++ b/cdc/sink/dmlsink/cloudstorage/defragmenter.go @@ -90,7 +90,7 @@ func (d *defragmenter) writeMsgsConsecutive( } func (d *defragmenter) dispatchFragToDMLWorker(frag eventFragment) { - tableName := frag.versionedTable.TableNameWithPhysicTableID + tableName := frag.versionedTable.TableNameWithTableID d.hasher.Reset() d.hasher.Write([]byte(tableName.Schema), []byte(tableName.Table)) workerID := d.hasher.Sum32() % uint32(len(d.outputChs)) diff --git a/cdc/sink/dmlsink/cloudstorage/defragmenter_test.go b/cdc/sink/dmlsink/cloudstorage/defragmenter_test.go index b6ea7da97be..3df8d8e97aa 100644 --- a/cdc/sink/dmlsink/cloudstorage/defragmenter_test.go +++ b/cdc/sink/dmlsink/cloudstorage/defragmenter_test.go @@ -83,7 +83,7 @@ func TestDeframenter(t *testing.T) { encoder := encoderBuilder.Build() frag := eventFragment{ versionedTable: cloudstorage.VersionedTableName{ - TableNameWithPhysicTableID: model.TableName{ + TableNameWithTableID: model.TableName{ Schema: "test", Table: "table1", TableID: 100, diff --git a/cdc/sink/dmlsink/cloudstorage/dml_worker.go b/cdc/sink/dmlsink/cloudstorage/dml_worker.go index ddb95675cec..9a399fc836f 100644 --- a/cdc/sink/dmlsink/cloudstorage/dml_worker.go +++ b/cdc/sink/dmlsink/cloudstorage/dml_worker.go @@ -233,8 +233,8 @@ func (d *dmlWorker) flushMessages(ctx context.Context) error { log.Debug("write file to storage success", zap.Int("workerID", d.id), zap.String("namespace", d.changeFeedID.Namespace), zap.String("changefeed", d.changeFeedID.ID), - zap.String("schema", table.TableNameWithPhysicTableID.Schema), - zap.String("table", table.TableNameWithPhysicTableID.Table), + zap.String("schema", table.TableNameWithTableID.Schema), + zap.String("table", table.TableNameWithTableID.Table), zap.String("path", dataFilePath), ) } diff --git a/cdc/sink/dmlsink/cloudstorage/dml_worker_test.go b/cdc/sink/dmlsink/cloudstorage/dml_worker_test.go index 879581f54f0..a1f28caf0f6 100644 --- a/cdc/sink/dmlsink/cloudstorage/dml_worker_test.go +++ b/cdc/sink/dmlsink/cloudstorage/dml_worker_test.go @@ -86,8 +86,8 @@ func TestDMLWorkerRun(t *testing.T) { frag := eventFragment{ seqNumber: uint64(i), versionedTable: cloudstorage.VersionedTableName{ - TableNameWithPhysicTableID: table1, - TableInfoVersion: 99, + TableNameWithTableID: table1, + TableInfoVersion: 99, }, event: &dmlsink.TxnCallbackableEvent{ Event: &model.SingleTableTxn{ diff --git a/cdc/sink/dmlsink/cloudstorage/encoding_worker_test.go b/cdc/sink/dmlsink/cloudstorage/encoding_worker_test.go index 84e1aa688f8..582147b1489 100644 --- a/cdc/sink/dmlsink/cloudstorage/encoding_worker_test.go +++ b/cdc/sink/dmlsink/cloudstorage/encoding_worker_test.go @@ -78,7 +78,7 @@ func TestEncodeEvents(t *testing.T) { err := encodingWorker.encodeEvents(eventFragment{ versionedTable: cloudstorage.VersionedTableName{ - TableNameWithPhysicTableID: model.TableName{ + TableNameWithTableID: model.TableName{ Schema: "test", Table: "table1", TableID: 100, @@ -157,7 +157,7 @@ func TestEncodingWorkerRun(t *testing.T) { for i := 0; i < 3; i++ { frag := eventFragment{ versionedTable: cloudstorage.VersionedTableName{ - TableNameWithPhysicTableID: table, + TableNameWithTableID: table, }, seqNumber: uint64(i + 1), event: &dmlsink.TxnCallbackableEvent{ diff --git a/pkg/sink/cloudstorage/path.go b/pkg/sink/cloudstorage/path.go index 714cb4b7cc4..741854a4d21 100644 --- a/pkg/sink/cloudstorage/path.go +++ b/pkg/sink/cloudstorage/path.go @@ -120,12 +120,15 @@ type indexWithDate struct { currDate, prevDate string } -// VersionedTableName is used to wrap TableNameWithPhysicTableID with a version. +// VersionedTableName is used to wrap TableNameWithTableID with a version. type VersionedTableName struct { // Because we need to generate different file paths for different // tables, we need to use the physical table ID instead of the // logical table ID.(Especially when the table is a partitioned table). - TableNameWithPhysicTableID model.TableName + // For partition table, + // 1. when enable-partition-separator is false, we need use logical table id to manage the file index. + // 2. when enable-partition-separator is true, we need use physical table id to manage the file index. + TableNameWithTableID model.TableName // TableInfoVersion is consistent with the version of TableInfo recorded in // schema storage. It can either be finished ts of a DDL event, // or be the checkpoint ts when processor is restarted. @@ -312,12 +315,12 @@ func (f *FilePathGenerator) GenerateDataFilePath( func (f *FilePathGenerator) generateDataDirPath(tbl VersionedTableName, date string) string { var elems []string - elems = append(elems, tbl.TableNameWithPhysicTableID.Schema) - elems = append(elems, tbl.TableNameWithPhysicTableID.Table) + elems = append(elems, tbl.TableNameWithTableID.Schema) + elems = append(elems, tbl.TableNameWithTableID.Table) elems = append(elems, fmt.Sprintf("%d", f.versionMap[tbl])) - if f.config.EnablePartitionSeparator && tbl.TableNameWithPhysicTableID.IsPartition { - elems = append(elems, fmt.Sprintf("%d", tbl.TableNameWithPhysicTableID.TableID)) + if f.config.EnablePartitionSeparator && tbl.TableNameWithTableID.IsPartition { + elems = append(elems, fmt.Sprintf("%d", tbl.TableNameWithTableID.TableID)) } if len(date) != 0 { diff --git a/pkg/sink/cloudstorage/path_test.go b/pkg/sink/cloudstorage/path_test.go index 57c5f2b30f7..ce7e9a2a0d7 100644 --- a/pkg/sink/cloudstorage/path_test.go +++ b/pkg/sink/cloudstorage/path_test.go @@ -62,7 +62,7 @@ func TestGenerateDataFilePath(t *testing.T) { defer cancel() table := VersionedTableName{ - TableNameWithPhysicTableID: model.TableName{ + TableNameWithTableID: model.TableName{ Schema: "test", Table: "table1", }, @@ -156,6 +156,46 @@ func TestGenerateDataFilePath(t *testing.T) { require.Equal(t, "test/table1/5/2023-01-01/CDC000002.json", path) } +func TestGenerateDataFilePathWithPartitionTable(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithCancel(context.TODO()) + defer cancel() + + table := VersionedTableName{ + TableNameWithTableID: model.TableName{ + Schema: "test", + Table: "table1", + TableID: 100, + IsPartition: true, + }, + TableInfoVersion: 5, + } + + // EnablePartitionSeparator is false + dir := t.TempDir() + f := testFilePathGenerator(ctx, t, dir) + f.config.EnablePartitionSeparator = false + f.versionMap[table] = table.TableInfoVersion + path, err := f.GenerateDataFilePath(ctx, table, "") + require.NoError(t, err) + require.Equal(t, "test/table1/5/CDC000001.json", path) + path, err = f.GenerateDataFilePath(ctx, table, "") + require.NoError(t, err) + require.Equal(t, "test/table1/5/CDC000002.json", path) + + // EnablePartitionSeparator is true + f = testFilePathGenerator(ctx, t, dir) + f.config.EnablePartitionSeparator = true + f.versionMap[table] = table.TableInfoVersion + path, err = f.GenerateDataFilePath(ctx, table, "") + require.NoError(t, err) + require.Equal(t, "test/table1/5/100/CDC000001.json", path) + path, err = f.GenerateDataFilePath(ctx, table, "") + require.NoError(t, err) + require.Equal(t, "test/table1/5/100/CDC000002.json", path) +} + func TestFetchIndexFromFileName(t *testing.T) { t.Parallel() @@ -218,7 +258,7 @@ func TestGenerateDataFilePathWithIndexFile(t *testing.T) { mockClock.Set(time.Date(2023, 3, 9, 23, 59, 59, 0, time.UTC)) table := VersionedTableName{ - TableNameWithPhysicTableID: model.TableName{ + TableNameWithTableID: model.TableName{ Schema: "test", Table: "table1", }, @@ -316,8 +356,8 @@ func TestCheckOrWriteSchema(t *testing.T) { } table := VersionedTableName{ - TableNameWithPhysicTableID: tableInfo.TableName, - TableInfoVersion: tableInfo.Version, + TableNameWithTableID: tableInfo.TableName, + TableInfoVersion: tableInfo.Version, } err := f.CheckOrWriteSchema(ctx, table, tableInfo) diff --git a/tests/integration_tests/canal_json_storage_partition_table/conf/changefeed.toml b/tests/integration_tests/canal_json_storage_partition_table/conf/changefeed.toml index a5343cfb3f5..dc61a8cc884 100644 --- a/tests/integration_tests/canal_json_storage_partition_table/conf/changefeed.toml +++ b/tests/integration_tests/canal_json_storage_partition_table/conf/changefeed.toml @@ -4,7 +4,7 @@ protocol = "canal-json" terminator = "\n" # Directory date separator, Optional values are `none`, `year`, `month`, `date`. The default value is none. date-separator = 'day' -# Enable partition separator. The default value is false. +# Enable partition separator. The default value is true. enable-partition-separator = true [sink.csv] diff --git a/tests/integration_tests/canal_json_storage_partition_table/conf/diff_config.toml b/tests/integration_tests/canal_json_storage_partition_table/conf/diff_config.toml index 24e98abed31..26bdd151534 100644 --- a/tests/integration_tests/canal_json_storage_partition_table/conf/diff_config.toml +++ b/tests/integration_tests/canal_json_storage_partition_table/conf/diff_config.toml @@ -7,7 +7,7 @@ export-fix-sql = true check-struct-only = false [task] - output-dir = "/tmp/tidb_cdc_test/partition_table/sync_diff/output" + output-dir = "/tmp/tidb_cdc_test/canal_json_storage_partition_table/sync_diff/output" source-instances = ["mysql1"] diff --git a/tests/integration_tests/canal_json_storage_partition_table/data/prepare.sql b/tests/integration_tests/canal_json_storage_partition_table/data/prepare.sql index 2454bfaeffc..7d75299b4eb 100644 --- a/tests/integration_tests/canal_json_storage_partition_table/data/prepare.sql +++ b/tests/integration_tests/canal_json_storage_partition_table/data/prepare.sql @@ -27,7 +27,7 @@ ALTER TABLE t1 EXCHANGE PARTITION p3 WITH TABLE t2; insert into t2 values (100),(101),(102),(103),(104),(105); /*these values will be replicated to in downstream t2*/ insert into t1 values (25),(29); /*these values will be replicated to in downstream t1.p3*/ -/* exchange partition ccase 2: source table and target table in different database */ +/* exchange partition case 2: source table and target table in different database */ create database `partition_table2`; create table partition_table2.t2 (a int primary key); ALTER TABLE t1 EXCHANGE PARTITION p3 WITH TABLE partition_table2.t2; diff --git a/tests/integration_tests/canal_json_storage_partition_table2/conf/changefeed.toml b/tests/integration_tests/canal_json_storage_partition_table2/conf/changefeed.toml new file mode 100644 index 00000000000..1966d177650 --- /dev/null +++ b/tests/integration_tests/canal_json_storage_partition_table2/conf/changefeed.toml @@ -0,0 +1,8 @@ +[sink] +protocol = "canal-json" +# Line terminator. Empty value means "\r\n" (CRLF) is line terminators. The default value is empty. +terminator = "\n" +# Directory date separator, Optional values are `none`, `year`, `month`, `date`. The default value is none. +date-separator = 'none' +# Disable partition separator. The default value is true. +enable-partition-separator = false diff --git a/tests/integration_tests/canal_json_storage_partition_table2/conf/diff_config.toml b/tests/integration_tests/canal_json_storage_partition_table2/conf/diff_config.toml new file mode 100644 index 00000000000..9507755a8a1 --- /dev/null +++ b/tests/integration_tests/canal_json_storage_partition_table2/conf/diff_config.toml @@ -0,0 +1,29 @@ +# diff Configuration. + +check-thread-count = 4 + +export-fix-sql = true + +check-struct-only = false + +[task] + output-dir = "/tmp/tidb_cdc_test/canal_json_storage_partition_table2/sync_diff/output" + + source-instances = ["mysql1"] + + target-instance = "tidb0" + + target-check-tables = ["partition_table.?*"] + +[data-sources] +[data-sources.mysql1] + host = "127.0.0.1" + port = 4000 + user = "root" + password = "" + +[data-sources.tidb0] + host = "127.0.0.1" + port = 3306 + user = "root" + password = "" diff --git a/tests/integration_tests/canal_json_storage_partition_table2/data/prepare.sql b/tests/integration_tests/canal_json_storage_partition_table2/data/prepare.sql new file mode 100644 index 00000000000..43a523def52 --- /dev/null +++ b/tests/integration_tests/canal_json_storage_partition_table2/data/prepare.sql @@ -0,0 +1,12 @@ +drop database if exists `partition_table`; +create database `partition_table`; +use `partition_table`; + +create table t1 (a int primary key) PARTITION BY RANGE ( a ) ( PARTITION p0 VALUES LESS THAN (6),PARTITION p1 VALUES LESS THAN (11),PARTITION p2 VALUES LESS THAN (21)); +-- partition p0 +insert into t1 values (1),(2),(3); +-- partition p1 +insert into t1 values (7),(8),(9); +-- partition p2 +insert into t1 values (11),(12),(20); + diff --git a/tests/integration_tests/canal_json_storage_partition_table2/data/prepare2.sql b/tests/integration_tests/canal_json_storage_partition_table2/data/prepare2.sql new file mode 100644 index 00000000000..f31e65d13a7 --- /dev/null +++ b/tests/integration_tests/canal_json_storage_partition_table2/data/prepare2.sql @@ -0,0 +1,10 @@ +use `partition_table`; + +-- partition p0 +insert into t1 values (4),(5); +-- partition p1 +insert into t1 values (10); +-- partition p2 +insert into t1 values (13),(14),(15),(16),(17); + +create table finish_mark (a int primary key); \ No newline at end of file diff --git a/tests/integration_tests/canal_json_storage_partition_table2/run.sh b/tests/integration_tests/canal_json_storage_partition_table2/run.sh new file mode 100644 index 00000000000..a2bfc5efff4 --- /dev/null +++ b/tests/integration_tests/canal_json_storage_partition_table2/run.sh @@ -0,0 +1,48 @@ +#!/bin/bash + +set -eu + +CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +source $CUR/../_utils/test_prepare +WORK_DIR=$OUT_DIR/$TEST_NAME +CDC_BINARY=cdc.test +SINK_TYPE=$1 + +function run() { + if [ "$SINK_TYPE" != "storage" ]; then + return + fi + + rm -rf $WORK_DIR && mkdir -p $WORK_DIR + start_tidb_cluster --workdir $WORK_DIR + cd $WORK_DIR + + run_sql "set @@global.tidb_enable_exchange_partition=on" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} + + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --addr "127.0.0.1:8300" --logsuffix cdc0 + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --addr "127.0.0.1:8301" --logsuffix cdc1 + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --addr "127.0.0.1:8302" --logsuffix cdc2 + + SINK_URI="file://$WORK_DIR/storage_test?flush-interval=1s&enable-tidb-extension=true" + run_cdc_cli changefeed create --sink-uri="$SINK_URI" --config=$CUR/conf/changefeed.toml + + run_sql_file $CUR/data/prepare.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT} + # wait data flushed to downstream + sleep 6 + run_sql_file $CUR/data/prepare2.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT} + + run_storage_consumer $WORK_DIR $SINK_URI $CUR/conf/changefeed.toml "" + sleep 8 + + # sync_diff can't check non-exist table, so we check expected tables are created in downstream first + check_table_exists partition_table.t1 ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} + check_table_exists partition_table.finish_mark ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} + check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml + + cleanup_process $CDC_BINARY +} + +trap stop_tidb_cluster EXIT +run $* +check_logs $WORK_DIR +echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>" diff --git a/tests/integration_tests/csv_storage_partition_table/conf/changefeed.toml b/tests/integration_tests/csv_storage_partition_table/conf/changefeed.toml index 1a30b45d37c..b889d7404af 100644 --- a/tests/integration_tests/csv_storage_partition_table/conf/changefeed.toml +++ b/tests/integration_tests/csv_storage_partition_table/conf/changefeed.toml @@ -4,7 +4,7 @@ protocol = "csv" terminator = "\n" # Directory date separator, Optional values are `none`, `year`, `month`, `date`. The default value is none. date-separator = 'day' -# Enable partition separator. The default value is false. +# Enable partition separator. The default value is true. enable-partition-separator = true [sink.csv]