From a8d0d4ce77a1ac9116f93381dfe296d22d8f0e8b Mon Sep 17 00:00:00 2001 From: hailin0 Date: Mon, 28 Oct 2024 17:57:27 +0800 Subject: [PATCH] [Hotfix][CDC] Fix occasional database connection leak when read snapshot split (#7918) --- .../IncrementalSourceScanFetcher.java | 16 +++- .../IncrementalSourceStreamFetcher.java | 16 +++- .../seatunnel/cdc/mysql/MysqlCDCIT.java | 85 ++++++++++++++++++- 3 files changed, 108 insertions(+), 9 deletions(-) diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceScanFetcher.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceScanFetcher.java index 7f927af5878..2ed961865e9 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceScanFetcher.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceScanFetcher.java @@ -222,12 +222,15 @@ private void checkReadException() { @Override public void close() { try { - if (taskContext != null) { - taskContext.close(); - } + // 1. try close the split task if (snapshotSplitReadTask != null) { - snapshotSplitReadTask.shutdown(); + try { + snapshotSplitReadTask.shutdown(); + } catch (Exception e) { + log.error("Close snapshot split read task error", e); + } } + // 2. close the fetcher thread if (executorService != null) { executorService.shutdown(); if (!executorService.awaitTermination( @@ -240,6 +243,11 @@ public void close() { } } catch (Exception e) { log.error("Close scan fetcher error", e); + } finally { + // 3. close the task context + if (taskContext != null) { + taskContext.close(); + } } } diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java index 16e45376566..17536d9de09 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java @@ -187,12 +187,15 @@ private void checkReadException() { @Override public void close() { try { - if (taskContext != null) { - taskContext.close(); - } + // 1. try close the split task if (streamFetchTask != null) { - streamFetchTask.shutdown(); + try { + streamFetchTask.shutdown(); + } catch (Exception e) { + log.error("Close stream split read task error", e); + } } + // 2. close the fetcher thread if (executorService != null) { executorService.shutdown(); if (!executorService.awaitTermination( @@ -205,6 +208,11 @@ public void close() { } } catch (Exception e) { log.error("Close stream fetcher error", e); + } finally { + // 3. close the task context + if (taskContext != null) { + taskContext.close(); + } } } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java index bf7e8d8fe7c..28919074315 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java @@ -328,6 +328,9 @@ public void testMultiTableWithRestore(TestContainer container) clearTable(MYSQL_DATABASE2, SOURCE_TABLE_1); clearTable(MYSQL_DATABASE2, SOURCE_TABLE_2); + // init + initSourceTable(MYSQL_DATABASE, SOURCE_TABLE_1); + Long jobId = JobIdGenerator.newJobId(); CompletableFuture.supplyAsync( () -> { @@ -341,8 +344,32 @@ public void testMultiTableWithRestore(TestContainer container) } }); + // wait for data written to sink + await().atMost(60000, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> + Assertions.assertTrue( + query(getSourceQuerySQL(MYSQL_DATABASE2, SOURCE_TABLE_1)) + .size() + > 1)); + + // Restore job with snapshot read phase + Assertions.assertEquals(0, container.savepointJob(String.valueOf(jobId)).getExitCode()); + CompletableFuture.supplyAsync( + () -> { + try { + container.restoreJob( + "/mysqlcdc_to_mysql_with_multi_table_mode_one_table.conf", + String.valueOf(jobId)); + } catch (Exception e) { + log.error("Commit task exception :" + e.getMessage()); + throw new RuntimeException(e); + } + return null; + }); + // insert update delete - upsertDeleteSourceTable(MYSQL_DATABASE, SOURCE_TABLE_1); + changeSourceTable(MYSQL_DATABASE, SOURCE_TABLE_1); // stream stage await().atMost(60000, TimeUnit.MILLISECONDS) @@ -521,6 +548,62 @@ private void executeSql(String sql) { } } + private void initSourceTable(String database, String tableName) { + for (int i = 1; i < 100; i++) { + executeSql( + "INSERT INTO " + + database + + "." + + tableName + + " ( id, f_binary, f_blob, f_long_varbinary, f_longblob, f_tinyblob, f_varbinary, f_smallint,\n" + + " f_smallint_unsigned, f_mediumint, f_mediumint_unsigned, f_int, f_int_unsigned, f_integer,\n" + + " f_integer_unsigned, f_bigint, f_bigint_unsigned, f_numeric, f_decimal, f_float, f_double,\n" + + " f_double_precision, f_longtext, f_mediumtext, f_text, f_tinytext, f_varchar, f_date, f_datetime,\n" + + " f_timestamp, f_bit1, f_bit64, f_char, f_enum, f_mediumblob, f_long_varchar, f_real, f_time,\n" + + " f_tinyint, f_tinyint_unsigned, f_json, f_year )\n" + + "VALUES ( " + + i + + ", 0x61626374000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000,\n" + + " 0x68656C6C6F, 0x18000000789C0BC9C82C5600A244859CFCBC7485B2C4A2A4CCBCC4A24A00697308D4, NULL,\n" + + " 0x74696E79626C6F62, 0x48656C6C6F20776F726C64, 12345, 54321, 123456, 654321, 1234567, 7654321, 1234567, 7654321,\n" + + " 123456789, 987654321, 123, 789, 12.34, 56.78, 90.12, 'This is a long text field', 'This is a medium text field',\n" + + " 'This is a text field', 'This is a tiny text field', 'This is a varchar field', '2022-04-27', '2022-04-27 14:30:00',\n" + + " '2023-04-27 11:08:40', 1, b'0101010101010101010101010101010101010101010101010101010101010101', 'C', 'enum2',\n" + + " 0x1B000000789C0BC9C82C5600A24485DCD494CCD25C85A49CFC2485B4CCD49C140083FF099A, 'This is a long varchar field',\n" + + " 12.345, '14:30:00', -128, 255, '{ \"key\": \"value\" }', 1992 )"); + } + } + + private void changeSourceTable(String database, String tableName) { + for (int i = 100; i < 110; i++) { + executeSql( + "INSERT INTO " + + database + + "." + + tableName + + " ( id, f_binary, f_blob, f_long_varbinary, f_longblob, f_tinyblob, f_varbinary, f_smallint,\n" + + " f_smallint_unsigned, f_mediumint, f_mediumint_unsigned, f_int, f_int_unsigned, f_integer,\n" + + " f_integer_unsigned, f_bigint, f_bigint_unsigned, f_numeric, f_decimal, f_float, f_double,\n" + + " f_double_precision, f_longtext, f_mediumtext, f_text, f_tinytext, f_varchar, f_date, f_datetime,\n" + + " f_timestamp, f_bit1, f_bit64, f_char, f_enum, f_mediumblob, f_long_varchar, f_real, f_time,\n" + + " f_tinyint, f_tinyint_unsigned, f_json, f_year )\n" + + "VALUES ( " + + i + + ", 0x61626374000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000,\n" + + " 0x68656C6C6F, 0x18000000789C0BC9C82C5600A244859CFCBC7485B2C4A2A4CCBCC4A24A00697308D4, NULL,\n" + + " 0x74696E79626C6F62, 0x48656C6C6F20776F726C64, 12345, 54321, 123456, 654321, 1234567, 7654321, 1234567, 7654321,\n" + + " 123456789, 987654321, 123, 789, 12.34, 56.78, 90.12, 'This is a long text field', 'This is a medium text field',\n" + + " 'This is a text field', 'This is a tiny text field', 'This is a varchar field', '2022-04-27', '2022-04-27 14:30:00',\n" + + " '2023-04-27 11:08:40', 1, b'0101010101010101010101010101010101010101010101010101010101010101', 'C', 'enum2',\n" + + " 0x1B000000789C0BC9C82C5600A24485DCD494CCD25C85A49CFC2485B4CCD49C140083FF099A, 'This is a long varchar field',\n" + + " 12.345, '14:30:00', -128, 255, '{ \"key\": \"value\" }', 1992 )"); + } + + executeSql("DELETE FROM " + database + "." + tableName + " where id > 100"); + + executeSql("UPDATE " + database + "." + tableName + " SET f_bigint = 10000 where id < 10"); + } + private void upsertDeleteSourceTable(String database, String tableName) { executeSql(