Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: fix duplicate file seq #11987

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions cdc/model/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -1160,6 +1160,11 @@ func (t *SingleTableTxn) GetPhysicalTableID() int64 {
return t.PhysicalTableID
}

// GetLogicalTableID returns the logical table id of the table in the transaction
func (t *SingleTableTxn) GetLogicalTableID() int64 {
return t.TableInfo.TableName.TableID
}

// TrySplitAndSortUpdateEvent split update events if unique key is updated
func (t *SingleTableTxn) TrySplitAndSortUpdateEvent(scheme string, outputRawChangeEvent bool) error {
if sink.IsMySQLCompatibleScheme(scheme) || outputRawChangeEvent {
Expand Down
12 changes: 10 additions & 2 deletions cdc/sink/dmlsink/cloudstorage/cloud_storage_dml_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ type eventFragment struct {
type DMLSink struct {
changefeedID model.ChangeFeedID
scheme string
config *cloudstorage.Config
outputRawChangeEvent bool
// last sequence number
lastSeqNum uint64
Expand Down Expand Up @@ -147,6 +148,7 @@ func NewDMLSink(ctx context.Context,
s := &DMLSink{
changefeedID: changefeedID,
scheme: strings.ToLower(sinkURI.Scheme),
config: cfg,
outputRawChangeEvent: replicaConfig.Sink.CloudStorageConfig.GetOutputRawChangeEvent(),
encodingWorkers: make([]*encodingWorker, defaultEncodingConcurrency),
workers: make([]*dmlWorker, cfg.WorkerCount),
Expand Down Expand Up @@ -249,11 +251,17 @@ func (s *DMLSink) WriteEvents(txns ...*dmlsink.CallbackableEvent[*model.SingleTa
continue
}

tableID := txn.Event.GetPhysicalTableID()
// If EnablePartitionSeparator is false, all partition data will be written to the same directory.
// So we need to use the logical table ID to manage file index between different partitions.
if !s.config.EnablePartitionSeparator && txn.Event.TableInfo.IsPartitionTable() {
tableID = txn.Event.GetLogicalTableID()
}
tbl := cloudstorage.VersionedTableName{
TableNameWithPhysicTableID: model.TableName{
TableNameWithTableID: model.TableName{
Schema: txn.Event.TableInfo.GetSchemaName(),
Table: txn.Event.TableInfo.GetTableName(),
TableID: txn.Event.GetPhysicalTableID(),
TableID: tableID,
IsPartition: txn.Event.TableInfo.IsPartitionTable(),
},
TableInfoVersion: txn.Event.TableInfoVersion,
Expand Down
2 changes: 1 addition & 1 deletion cdc/sink/dmlsink/cloudstorage/defragmenter.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func (d *defragmenter) writeMsgsConsecutive(
}

func (d *defragmenter) dispatchFragToDMLWorker(frag eventFragment) {
tableName := frag.versionedTable.TableNameWithPhysicTableID
tableName := frag.versionedTable.TableNameWithTableID
d.hasher.Reset()
d.hasher.Write([]byte(tableName.Schema), []byte(tableName.Table))
workerID := d.hasher.Sum32() % uint32(len(d.outputChs))
Expand Down
2 changes: 1 addition & 1 deletion cdc/sink/dmlsink/cloudstorage/defragmenter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func TestDeframenter(t *testing.T) {
encoder := encoderBuilder.Build()
frag := eventFragment{
versionedTable: cloudstorage.VersionedTableName{
TableNameWithPhysicTableID: model.TableName{
TableNameWithTableID: model.TableName{
Schema: "test",
Table: "table1",
TableID: 100,
Expand Down
4 changes: 2 additions & 2 deletions cdc/sink/dmlsink/cloudstorage/dml_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,8 +233,8 @@ func (d *dmlWorker) flushMessages(ctx context.Context) error {
log.Debug("write file to storage success", zap.Int("workerID", d.id),
zap.String("namespace", d.changeFeedID.Namespace),
zap.String("changefeed", d.changeFeedID.ID),
zap.String("schema", table.TableNameWithPhysicTableID.Schema),
zap.String("table", table.TableNameWithPhysicTableID.Table),
zap.String("schema", table.TableNameWithTableID.Schema),
zap.String("table", table.TableNameWithTableID.Table),
zap.String("path", dataFilePath),
)
}
Expand Down
4 changes: 2 additions & 2 deletions cdc/sink/dmlsink/cloudstorage/dml_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,8 @@ func TestDMLWorkerRun(t *testing.T) {
frag := eventFragment{
seqNumber: uint64(i),
versionedTable: cloudstorage.VersionedTableName{
TableNameWithPhysicTableID: table1,
TableInfoVersion: 99,
TableNameWithTableID: table1,
TableInfoVersion: 99,
},
event: &dmlsink.TxnCallbackableEvent{
Event: &model.SingleTableTxn{
Expand Down
4 changes: 2 additions & 2 deletions cdc/sink/dmlsink/cloudstorage/encoding_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func TestEncodeEvents(t *testing.T) {

err := encodingWorker.encodeEvents(eventFragment{
versionedTable: cloudstorage.VersionedTableName{
TableNameWithPhysicTableID: model.TableName{
TableNameWithTableID: model.TableName{
Schema: "test",
Table: "table1",
TableID: 100,
Expand Down Expand Up @@ -157,7 +157,7 @@ func TestEncodingWorkerRun(t *testing.T) {
for i := 0; i < 3; i++ {
frag := eventFragment{
versionedTable: cloudstorage.VersionedTableName{
TableNameWithPhysicTableID: table,
TableNameWithTableID: table,
},
seqNumber: uint64(i + 1),
event: &dmlsink.TxnCallbackableEvent{
Expand Down
15 changes: 9 additions & 6 deletions pkg/sink/cloudstorage/path.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,12 +120,15 @@ type indexWithDate struct {
currDate, prevDate string
}

// VersionedTableName is used to wrap TableNameWithPhysicTableID with a version.
// VersionedTableName is used to wrap TableNameWithTableID with a version.
type VersionedTableName struct {
// Because we need to generate different file paths for different
// tables, we need to use the physical table ID instead of the
// logical table ID.(Especially when the table is a partitioned table).
TableNameWithPhysicTableID model.TableName
// For partition table,
// 1. when enable-partition-separator is false, we need use logical table id to manage the file index.
// 2. when enable-partition-separator is true, we need use physical table id to manage the file index.
TableNameWithTableID model.TableName
// TableInfoVersion is consistent with the version of TableInfo recorded in
// schema storage. It can either be finished ts of a DDL event,
// or be the checkpoint ts when processor is restarted.
Expand Down Expand Up @@ -312,12 +315,12 @@ func (f *FilePathGenerator) GenerateDataFilePath(
func (f *FilePathGenerator) generateDataDirPath(tbl VersionedTableName, date string) string {
var elems []string

elems = append(elems, tbl.TableNameWithPhysicTableID.Schema)
elems = append(elems, tbl.TableNameWithPhysicTableID.Table)
elems = append(elems, tbl.TableNameWithTableID.Schema)
elems = append(elems, tbl.TableNameWithTableID.Table)
elems = append(elems, fmt.Sprintf("%d", f.versionMap[tbl]))

if f.config.EnablePartitionSeparator && tbl.TableNameWithPhysicTableID.IsPartition {
elems = append(elems, fmt.Sprintf("%d", tbl.TableNameWithPhysicTableID.TableID))
if f.config.EnablePartitionSeparator && tbl.TableNameWithTableID.IsPartition {
elems = append(elems, fmt.Sprintf("%d", tbl.TableNameWithTableID.TableID))
}

if len(date) != 0 {
Expand Down
48 changes: 44 additions & 4 deletions pkg/sink/cloudstorage/path_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func TestGenerateDataFilePath(t *testing.T) {
defer cancel()

table := VersionedTableName{
TableNameWithPhysicTableID: model.TableName{
TableNameWithTableID: model.TableName{
Schema: "test",
Table: "table1",
},
Expand Down Expand Up @@ -156,6 +156,46 @@ func TestGenerateDataFilePath(t *testing.T) {
require.Equal(t, "test/table1/5/2023-01-01/CDC000002.json", path)
}

func TestGenerateDataFilePathWithPartitionTable(t *testing.T) {
t.Parallel()

ctx, cancel := context.WithCancel(context.TODO())
defer cancel()

table := VersionedTableName{
TableNameWithTableID: model.TableName{
Schema: "test",
Table: "table1",
TableID: 100,
IsPartition: true,
},
TableInfoVersion: 5,
}

// EnablePartitionSeparator is false
dir := t.TempDir()
f := testFilePathGenerator(ctx, t, dir)
f.config.EnablePartitionSeparator = false
f.versionMap[table] = table.TableInfoVersion
path, err := f.GenerateDataFilePath(ctx, table, "")
require.NoError(t, err)
require.Equal(t, "test/table1/5/CDC000001.json", path)
path, err = f.GenerateDataFilePath(ctx, table, "")
require.NoError(t, err)
require.Equal(t, "test/table1/5/CDC000002.json", path)

// EnablePartitionSeparator is true
f = testFilePathGenerator(ctx, t, dir)
f.config.EnablePartitionSeparator = true
f.versionMap[table] = table.TableInfoVersion
path, err = f.GenerateDataFilePath(ctx, table, "")
require.NoError(t, err)
require.Equal(t, "test/table1/5/100/CDC000001.json", path)
path, err = f.GenerateDataFilePath(ctx, table, "")
require.NoError(t, err)
require.Equal(t, "test/table1/5/100/CDC000002.json", path)
}

func TestFetchIndexFromFileName(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -218,7 +258,7 @@ func TestGenerateDataFilePathWithIndexFile(t *testing.T) {

mockClock.Set(time.Date(2023, 3, 9, 23, 59, 59, 0, time.UTC))
table := VersionedTableName{
TableNameWithPhysicTableID: model.TableName{
TableNameWithTableID: model.TableName{
Schema: "test",
Table: "table1",
},
Expand Down Expand Up @@ -316,8 +356,8 @@ func TestCheckOrWriteSchema(t *testing.T) {
}

table := VersionedTableName{
TableNameWithPhysicTableID: tableInfo.TableName,
TableInfoVersion: tableInfo.Version,
TableNameWithTableID: tableInfo.TableName,
TableInfoVersion: tableInfo.Version,
}

err := f.CheckOrWriteSchema(ctx, table, tableInfo)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ protocol = "canal-json"
terminator = "\n"
# Directory date separator, Optional values are `none`, `year`, `month`, `date`. The default value is none.
date-separator = 'day'
# Enable partition separator. The default value is false.
# Enable partition separator. The default value is true.
enable-partition-separator = true

[sink.csv]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ export-fix-sql = true
check-struct-only = false

[task]
output-dir = "/tmp/tidb_cdc_test/partition_table/sync_diff/output"
output-dir = "/tmp/tidb_cdc_test/canal_json_storage_partition_table/sync_diff/output"

source-instances = ["mysql1"]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ ALTER TABLE t1 EXCHANGE PARTITION p3 WITH TABLE t2;
insert into t2 values (100),(101),(102),(103),(104),(105); /*these values will be replicated to in downstream t2*/
insert into t1 values (25),(29); /*these values will be replicated to in downstream t1.p3*/

/* exchange partition ccase 2: source table and target table in different database */
/* exchange partition case 2: source table and target table in different database */
create database `partition_table2`;
create table partition_table2.t2 (a int primary key);
ALTER TABLE t1 EXCHANGE PARTITION p3 WITH TABLE partition_table2.t2;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
[sink]
protocol = "canal-json"
# Line terminator. Empty value means "\r\n" (CRLF) is line terminators. The default value is empty.
terminator = "\n"
# Directory date separator, Optional values are `none`, `year`, `month`, `date`. The default value is none.
date-separator = 'none'
# Disable partition separator. The default value is true.
enable-partition-separator = false
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# diff Configuration.

check-thread-count = 4

export-fix-sql = true

check-struct-only = false

[task]
output-dir = "/tmp/tidb_cdc_test/canal_json_storage_partition_table2/sync_diff/output"

source-instances = ["mysql1"]

target-instance = "tidb0"

target-check-tables = ["partition_table.?*"]

[data-sources]
[data-sources.mysql1]
host = "127.0.0.1"
port = 4000
user = "root"
password = ""

[data-sources.tidb0]
host = "127.0.0.1"
port = 3306
user = "root"
password = ""
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
drop database if exists `partition_table`;
create database `partition_table`;
use `partition_table`;

create table t1 (a int primary key) PARTITION BY RANGE ( a ) ( PARTITION p0 VALUES LESS THAN (6),PARTITION p1 VALUES LESS THAN (11),PARTITION p2 VALUES LESS THAN (21));
-- partition p0
insert into t1 values (1),(2),(3);
-- partition p1
insert into t1 values (7),(8),(9);
-- partition p2
insert into t1 values (11),(12),(20);

Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
use `partition_table`;

-- partition p0
insert into t1 values (4),(5);
-- partition p1
insert into t1 values (10);
-- partition p2
insert into t1 values (13),(14),(15),(16),(17);

create table finish_mark (a int primary key);
48 changes: 48 additions & 0 deletions tests/integration_tests/canal_json_storage_partition_table2/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
#!/bin/bash

set -eu

CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
source $CUR/../_utils/test_prepare
WORK_DIR=$OUT_DIR/$TEST_NAME
CDC_BINARY=cdc.test
SINK_TYPE=$1

function run() {
if [ "$SINK_TYPE" != "storage" ]; then
return
fi

rm -rf $WORK_DIR && mkdir -p $WORK_DIR
start_tidb_cluster --workdir $WORK_DIR
cd $WORK_DIR

run_sql "set @@global.tidb_enable_exchange_partition=on" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}

run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --addr "127.0.0.1:8300" --logsuffix cdc0
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --addr "127.0.0.1:8301" --logsuffix cdc1
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --addr "127.0.0.1:8302" --logsuffix cdc2

SINK_URI="file://$WORK_DIR/storage_test?flush-interval=1s&enable-tidb-extension=true"
run_cdc_cli changefeed create --sink-uri="$SINK_URI" --config=$CUR/conf/changefeed.toml

run_sql_file $CUR/data/prepare.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT}
# wait data flushed to downstream
sleep 6
run_sql_file $CUR/data/prepare2.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT}

run_storage_consumer $WORK_DIR $SINK_URI $CUR/conf/changefeed.toml ""
sleep 8

# sync_diff can't check non-exist table, so we check expected tables are created in downstream first
check_table_exists partition_table.t1 ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}
check_table_exists partition_table.finish_mark ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}
check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml

cleanup_process $CDC_BINARY
}

trap stop_tidb_cluster EXIT
run $*
check_logs $WORK_DIR
echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>"
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ protocol = "csv"
terminator = "\n"
# Directory date separator, Optional values are `none`, `year`, `month`, `date`. The default value is none.
date-separator = 'day'
# Enable partition separator. The default value is false.
# Enable partition separator. The default value is true.
enable-partition-separator = true

[sink.csv]
Expand Down
Loading