diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/AbstractJdbcSourceChunkSplitter.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/AbstractJdbcSourceChunkSplitter.java new file mode 100644 index 00000000000..e956b111709 --- /dev/null +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/AbstractJdbcSourceChunkSplitter.java @@ -0,0 +1,392 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.cdc.base.source.enumerator.splitter; + +import org.apache.seatunnel.api.table.catalog.ConstraintKey; +import org.apache.seatunnel.api.table.catalog.PrimaryKey; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceConfig; +import org.apache.seatunnel.connectors.cdc.base.dialect.JdbcDataSourceDialect; +import org.apache.seatunnel.connectors.cdc.base.source.split.SnapshotSplit; +import org.apache.seatunnel.connectors.cdc.base.utils.ObjectUtils; + +import io.debezium.jdbc.JdbcConnection; +import io.debezium.relational.Column; +import io.debezium.relational.Table; +import io.debezium.relational.TableId; +import lombok.extern.slf4j.Slf4j; + +import java.math.BigDecimal; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.Optional; + +import static java.math.BigDecimal.ROUND_CEILING; +import static org.apache.seatunnel.connectors.cdc.base.utils.ObjectUtils.doubleCompare; + +@Slf4j +public abstract class AbstractJdbcSourceChunkSplitter implements JdbcSourceChunkSplitter { + + private final JdbcSourceConfig sourceConfig; + private final JdbcDataSourceDialect dialect; + + public AbstractJdbcSourceChunkSplitter( + JdbcSourceConfig sourceConfig, JdbcDataSourceDialect dialect) { + this.sourceConfig = sourceConfig; + this.dialect = dialect; + } + + @Override + public Collection generateSplits(TableId tableId) { + try (JdbcConnection jdbc = dialect.openJdbcConnection(sourceConfig)) { + log.info("Start splitting table {} into chunks...", tableId); + long start = System.currentTimeMillis(); + + Column splitColumn = getSplitColumn(jdbc, dialect, tableId); + final List chunks; + try { + chunks = splitTableIntoChunks(jdbc, tableId, splitColumn); + } catch (SQLException e) { + throw new RuntimeException("Failed to split chunks for table " + tableId, e); + } + + // convert chunks into splits + List splits = new ArrayList<>(); + SeaTunnelRowType splitType = getSplitType(splitColumn); + for (int i = 0; i < chunks.size(); i++) { + ChunkRange chunk = chunks.get(i); + SnapshotSplit split = + createSnapshotSplit( + jdbc, + tableId, + i, + splitType, + chunk.getChunkStart(), + chunk.getChunkEnd()); + splits.add(split); + } + + long end = System.currentTimeMillis(); + log.info( + "Split table {} into {} chunks, time cost: {}ms.", + tableId, + splits.size(), + end - start); + return splits; + } catch (Exception e) { + throw new RuntimeException( + String.format("Generate Splits for table %s error", tableId), e); + } + } + + private List splitTableIntoChunks( + JdbcConnection jdbc, TableId tableId, Column splitColumn) throws SQLException { + final String splitColumnName = splitColumn.name(); + final Object[] minMax = queryMinMax(jdbc, tableId, splitColumnName); + final Object min = minMax[0]; + final Object max = minMax[1]; + if (min == null || max == null || min.equals(max)) { + // empty table, or only one row, return full table scan as a chunk + return Collections.singletonList(ChunkRange.all()); + } + + final int chunkSize = sourceConfig.getSplitSize(); + final double distributionFactorUpper = sourceConfig.getDistributionFactorUpper(); + final double distributionFactorLower = sourceConfig.getDistributionFactorLower(); + + if (isEvenlySplitColumn(splitColumn)) { + long approximateRowCnt = queryApproximateRowCnt(jdbc, tableId); + double distributionFactor = + calculateDistributionFactor(tableId, min, max, approximateRowCnt); + + boolean dataIsEvenlyDistributed = + doubleCompare(distributionFactor, distributionFactorLower) >= 0 + && doubleCompare(distributionFactor, distributionFactorUpper) <= 0; + + if (dataIsEvenlyDistributed) { + // the minimum dynamic chunk size is at least 1 + final int dynamicChunkSize = Math.max((int) (distributionFactor * chunkSize), 1); + return splitEvenlySizedChunks( + tableId, min, max, approximateRowCnt, chunkSize, dynamicChunkSize); + } else { + int shardCount = (int) (approximateRowCnt / chunkSize); + int inverseSamplingRate = sourceConfig.getInverseSamplingRate(); + if (sourceConfig.getSampleShardingThreshold() < shardCount) { + // It is necessary to ensure that the number of data rows sampled by the + // sampling rate is greater than the number of shards. + // Otherwise, if the sampling rate is too low, it may result in an insufficient + // number of data rows for the shards, leading to an inadequate number of + // shards. + // Therefore, inverseSamplingRate should be less than chunkSize + if (inverseSamplingRate > chunkSize) { + log.warn( + "The inverseSamplingRate is {}, which is greater than chunkSize {}, so we set inverseSamplingRate to chunkSize", + inverseSamplingRate, + chunkSize); + inverseSamplingRate = chunkSize; + } + Object[] sample = + sampleDataFromColumn( + jdbc, tableId, splitColumnName, inverseSamplingRate); + return efficientShardingThroughSampling( + tableId, sample, approximateRowCnt, shardCount); + } + return splitUnevenlySizedChunks( + jdbc, tableId, splitColumnName, min, max, chunkSize); + } + } else { + return splitUnevenlySizedChunks(jdbc, tableId, splitColumnName, min, max, chunkSize); + } + } + + /** Split table into unevenly sized chunks by continuously calculating next chunk max value. */ + protected List splitUnevenlySizedChunks( + JdbcConnection jdbc, + TableId tableId, + String splitColumnName, + Object min, + Object max, + int chunkSize) + throws SQLException { + log.info( + "Use unevenly-sized chunks for table {}, the chunk size is {}", tableId, chunkSize); + final List splits = new ArrayList<>(); + Object chunkStart = null; + Object chunkEnd = nextChunkEnd(jdbc, min, tableId, splitColumnName, max, chunkSize); + int count = 0; + while (chunkEnd != null && ObjectCompare(chunkEnd, max) <= 0) { + // we start from [null, min + chunk_size) and avoid [null, min) + splits.add(ChunkRange.of(chunkStart, chunkEnd)); + // may sleep a while to avoid DDOS on MySQL server + maySleep(count++, tableId); + chunkStart = chunkEnd; + chunkEnd = nextChunkEnd(jdbc, chunkEnd, tableId, splitColumnName, max, chunkSize); + } + // add the ending split + splits.add(ChunkRange.of(chunkStart, null)); + return splits; + } + + protected Object nextChunkEnd( + JdbcConnection jdbc, + Object previousChunkEnd, + TableId tableId, + String splitColumnName, + Object max, + int chunkSize) + throws SQLException { + // chunk end might be null when max values are removed + Object chunkEnd = + queryNextChunkMax(jdbc, tableId, splitColumnName, chunkSize, previousChunkEnd); + if (Objects.equals(previousChunkEnd, chunkEnd)) { + // we don't allow equal chunk start and end, + // should query the next one larger than chunkEnd + chunkEnd = queryMin(jdbc, tableId, splitColumnName, chunkEnd); + } + if (ObjectCompare(chunkEnd, max) >= 0) { + return null; + } else { + return chunkEnd; + } + } + + protected List efficientShardingThroughSampling( + TableId tableId, Object[] sampleData, long approximateRowCnt, int shardCount) { + log.info( + "Use efficient sharding through sampling optimization for table {}, the approximate row count is {}, the shardCount is {}", + tableId, + approximateRowCnt, + shardCount); + + final List splits = new ArrayList<>(); + + if (shardCount == 0) { + splits.add(ChunkRange.of(null, null)); + return splits; + } + + double approxSamplePerShard = (double) sampleData.length / shardCount; + + if (approxSamplePerShard <= 1) { + + splits.add(ChunkRange.of(null, sampleData[0])); + for (int i = 0; i < sampleData.length - 1; i++) { + splits.add(ChunkRange.of(sampleData[i], sampleData[i + 1])); + } + splits.add(ChunkRange.of(sampleData[sampleData.length - 1], null)); + } else { + // Calculate the shard boundaries + for (int i = 0; i < shardCount; i++) { + Object chunkStart = i == 0 ? null : sampleData[(int) (i * approxSamplePerShard)]; + Object chunkEnd = + i < shardCount - 1 + ? sampleData[(int) ((i + 1) * approxSamplePerShard)] + : null; + splits.add(ChunkRange.of(chunkStart, chunkEnd)); + } + } + return splits; + } + + /** + * Split table into evenly sized chunks based on the numeric min and max value of split column, + * and tumble chunks in step size. + */ + protected List splitEvenlySizedChunks( + TableId tableId, + Object min, + Object max, + long approximateRowCnt, + int chunkSize, + int dynamicChunkSize) { + log.info( + "Use evenly-sized chunk optimization for table {}, the approximate row count is {}, the chunk size is {}, the dynamic chunk size is {}", + tableId, + approximateRowCnt, + chunkSize, + dynamicChunkSize); + if (approximateRowCnt <= chunkSize) { + // there is no more than one chunk, return full table as a chunk + return Collections.singletonList(ChunkRange.all()); + } + + final List splits = new ArrayList<>(); + Object chunkStart = null; + Object chunkEnd = ObjectUtils.plus(min, dynamicChunkSize); + while (ObjectCompare(chunkEnd, max) <= 0) { + splits.add(ChunkRange.of(chunkStart, chunkEnd)); + chunkStart = chunkEnd; + try { + chunkEnd = ObjectUtils.plus(chunkEnd, dynamicChunkSize); + } catch (ArithmeticException e) { + // Stop chunk split to avoid dead loop when number overflows. + break; + } + } + // add the ending split + splits.add(ChunkRange.of(chunkStart, null)); + return splits; + } + + // ------------------------------------------------------------------------------------------ + /** Returns the distribution factor of the given table. */ + @SuppressWarnings("MagicNumber") + protected double calculateDistributionFactor( + TableId tableId, Object min, Object max, long approximateRowCnt) { + + if (!min.getClass().equals(max.getClass())) { + throw new IllegalStateException( + String.format( + "Unsupported operation type, the MIN value type %s is different with MAX value type %s.", + min.getClass().getSimpleName(), max.getClass().getSimpleName())); + } + if (approximateRowCnt == 0) { + return Double.MAX_VALUE; + } + BigDecimal difference = ObjectUtils.minus(max, min); + // factor = (max - min + 1) / rowCount + final BigDecimal subRowCnt = difference.add(BigDecimal.valueOf(1)); + double distributionFactor = + subRowCnt.divide(new BigDecimal(approximateRowCnt), 4, ROUND_CEILING).doubleValue(); + log.info( + "The distribution factor of table {} is {} according to the min split key {}, max split key {} and approximate row count {}", + tableId, + distributionFactor, + min, + max, + approximateRowCnt); + return distributionFactor; + } + + protected SnapshotSplit createSnapshotSplit( + JdbcConnection jdbc, + TableId tableId, + int chunkId, + SeaTunnelRowType splitKeyType, + Object chunkStart, + Object chunkEnd) { + // currently, we only support single split column + Object[] splitStart = chunkStart == null ? null : new Object[] {chunkStart}; + Object[] splitEnd = chunkEnd == null ? null : new Object[] {chunkEnd}; + return new SnapshotSplit( + splitId(tableId, chunkId), tableId, splitKeyType, splitStart, splitEnd); + } + + protected Column getSplitColumn( + JdbcConnection jdbc, JdbcDataSourceDialect dialect, TableId tableId) + throws SQLException { + Optional primaryKey = dialect.getPrimaryKey(jdbc, tableId); + if (primaryKey.isPresent()) { + List pkColumns = primaryKey.get().getColumnNames(); + + Table table = dialect.queryTableSchema(jdbc, tableId).getTable(); + for (String pkColumn : pkColumns) { + Column column = table.columnWithName(pkColumn); + if (isEvenlySplitColumn(column)) { + return column; + } + } + } + + List uniqueKeys = dialect.getUniqueKeys(jdbc, tableId); + if (!uniqueKeys.isEmpty()) { + Table table = dialect.queryTableSchema(jdbc, tableId).getTable(); + for (ConstraintKey uniqueKey : uniqueKeys) { + List uniqueKeyColumns = + uniqueKey.getColumnNames(); + for (ConstraintKey.ConstraintKeyColumn uniqueKeyColumn : uniqueKeyColumns) { + Column column = table.columnWithName(uniqueKeyColumn.getColumnName()); + if (isEvenlySplitColumn(column)) { + return column; + } + } + } + } + + throw new UnsupportedOperationException( + String.format( + "Incremental snapshot for tables requires primary key/unique key," + + " but table %s doesn't have primary key.", + tableId)); + } + + protected String splitId(TableId tableId, int chunkId) { + return tableId.toString() + ":" + chunkId; + } + + protected int ObjectCompare(Object obj1, Object obj2) { + return ObjectUtils.compare(obj1, obj2); + } + + @SuppressWarnings("MagicNumber") + private static void maySleep(int count, TableId tableId) { + // every 100 queries to sleep 1s + if (count % 10 == 0) { + try { + Thread.sleep(100); + } catch (InterruptedException e) { + // nothing to do + } + log.info("JdbcSourceChunkSplitter has split {} chunks for table {}", count, tableId); + } + } +} diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/JdbcSourceChunkSplitter.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/JdbcSourceChunkSplitter.java index bbad9d04b1e..b271be0d765 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/JdbcSourceChunkSplitter.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/JdbcSourceChunkSplitter.java @@ -17,22 +17,16 @@ package org.apache.seatunnel.connectors.cdc.base.source.enumerator.splitter; -import org.apache.seatunnel.api.table.catalog.ConstraintKey; -import org.apache.seatunnel.api.table.catalog.PrimaryKey; import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; -import org.apache.seatunnel.connectors.cdc.base.dialect.JdbcDataSourceDialect; import org.apache.seatunnel.connectors.cdc.base.source.split.SnapshotSplit; import io.debezium.jdbc.JdbcConnection; import io.debezium.relational.Column; -import io.debezium.relational.Table; import io.debezium.relational.TableId; import java.sql.SQLException; import java.util.Collection; -import java.util.List; -import java.util.Optional; /** The {@code ChunkSplitter} used to split table into a set of chunks for JDBC data source. */ public interface JdbcSourceChunkSplitter extends ChunkSplitter { @@ -142,6 +136,7 @@ default boolean isEvenlySplitColumn(Column splitColumn) { case INT: case BIGINT: case DECIMAL: + case STRING: return true; default: return false; @@ -167,42 +162,4 @@ default SeaTunnelRowType getSplitType(Column splitColumn) { new String[] {splitColumn.name()}, new SeaTunnelDataType[] {fromDbzColumn(splitColumn)}); } - - default Column getSplitColumn( - JdbcConnection jdbc, JdbcDataSourceDialect dialect, TableId tableId) - throws SQLException { - Optional primaryKey = dialect.getPrimaryKey(jdbc, tableId); - if (primaryKey.isPresent()) { - List pkColumns = primaryKey.get().getColumnNames(); - - Table table = dialect.queryTableSchema(jdbc, tableId).getTable(); - for (String pkColumn : pkColumns) { - Column column = table.columnWithName(pkColumn); - if (isEvenlySplitColumn(column)) { - return column; - } - } - } - - List uniqueKeys = dialect.getUniqueKeys(jdbc, tableId); - if (!uniqueKeys.isEmpty()) { - Table table = dialect.queryTableSchema(jdbc, tableId).getTable(); - for (ConstraintKey uniqueKey : uniqueKeys) { - List uniqueKeyColumns = - uniqueKey.getColumnNames(); - for (ConstraintKey.ConstraintKeyColumn uniqueKeyColumn : uniqueKeyColumns) { - Column column = table.columnWithName(uniqueKeyColumn.getColumnName()); - if (isEvenlySplitColumn(column)) { - return column; - } - } - } - } - - throw new UnsupportedOperationException( - String.format( - "Incremental snapshot for tables requires primary key/unique key," - + " but table %s doesn't have primary key.", - tableId)); - } } diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/utils/ObjectUtils.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/utils/ObjectUtils.java index 3c5b669a257..0f703f02c1c 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/utils/ObjectUtils.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/utils/ObjectUtils.java @@ -63,6 +63,8 @@ public static BigDecimal minus(Object minuend, Object subtrahend) { ((BigInteger) minuend).subtract((BigInteger) subtrahend).toString()); } else if (minuend instanceof BigDecimal) { return ((BigDecimal) minuend).subtract((BigDecimal) subtrahend); + } else if (minuend instanceof String) { + return BigDecimal.valueOf(Long.MAX_VALUE); } else { throw new UnsupportedOperationException( String.format( diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/eumerator/MySqlChunkSplitter.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/eumerator/MySqlChunkSplitter.java index 04671d28f5b..0249889b239 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/eumerator/MySqlChunkSplitter.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/eumerator/MySqlChunkSplitter.java @@ -21,86 +21,21 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceConfig; import org.apache.seatunnel.connectors.cdc.base.dialect.JdbcDataSourceDialect; -import org.apache.seatunnel.connectors.cdc.base.source.enumerator.splitter.ChunkRange; -import org.apache.seatunnel.connectors.cdc.base.source.enumerator.splitter.JdbcSourceChunkSplitter; -import org.apache.seatunnel.connectors.cdc.base.source.split.SnapshotSplit; -import org.apache.seatunnel.connectors.cdc.base.utils.ObjectUtils; +import org.apache.seatunnel.connectors.cdc.base.source.enumerator.splitter.AbstractJdbcSourceChunkSplitter; import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.MySqlTypeUtils; import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.MySqlUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import io.debezium.jdbc.JdbcConnection; import io.debezium.relational.Column; import io.debezium.relational.TableId; -import java.math.BigDecimal; import java.sql.SQLException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.List; -import java.util.Objects; - -import static java.math.BigDecimal.ROUND_CEILING; -import static org.apache.seatunnel.connectors.cdc.base.utils.ObjectUtils.doubleCompare; /** The {@code ChunkSplitter} used to split table into a set of chunks for JDBC data source. */ -public class MySqlChunkSplitter implements JdbcSourceChunkSplitter { - - private static final Logger LOG = LoggerFactory.getLogger(MySqlChunkSplitter.class); - - private final JdbcSourceConfig sourceConfig; - private final JdbcDataSourceDialect dialect; +public class MySqlChunkSplitter extends AbstractJdbcSourceChunkSplitter { public MySqlChunkSplitter(JdbcSourceConfig sourceConfig, JdbcDataSourceDialect dialect) { - this.sourceConfig = sourceConfig; - this.dialect = dialect; - } - - @Override - public Collection generateSplits(TableId tableId) { - try (JdbcConnection jdbc = dialect.openJdbcConnection(sourceConfig)) { - - LOG.info("Start splitting table {} into chunks...", tableId); - long start = System.currentTimeMillis(); - - Column splitColumn = getSplitColumn(jdbc, dialect, tableId); - final List chunks; - try { - chunks = splitTableIntoChunks(jdbc, tableId, splitColumn); - } catch (SQLException e) { - throw new RuntimeException("Failed to split chunks for table " + tableId, e); - } - - // convert chunks into splits - List splits = new ArrayList<>(); - SeaTunnelRowType splitType = getSplitType(splitColumn); - for (int i = 0; i < chunks.size(); i++) { - ChunkRange chunk = chunks.get(i); - SnapshotSplit split = - createSnapshotSplit( - jdbc, - tableId, - i, - splitType, - chunk.getChunkStart(), - chunk.getChunkEnd()); - splits.add(split); - } - - long end = System.currentTimeMillis(); - LOG.info( - "Split table {} into {} chunks, time cost: {}ms.", - tableId, - splits.size(), - end - start); - return splits; - } catch (Exception e) { - throw new RuntimeException( - String.format("Generate Splits for table %s error", tableId), e); - } + super(sourceConfig, dialect); } @Override @@ -153,242 +88,4 @@ public String buildSplitScanQuery( public SeaTunnelDataType fromDbzColumn(Column splitColumn) { return MySqlTypeUtils.convertFromColumn(splitColumn); } - - // -------------------------------------------------------------------------------------------- - // Utilities - // -------------------------------------------------------------------------------------------- - - /** - * We can use evenly-sized chunks or unevenly-sized chunks when split table into chunks, using - * evenly-sized chunks which is much efficient, using unevenly-sized chunks which will request - * many queries and is not efficient. - */ - private List splitTableIntoChunks( - JdbcConnection jdbc, TableId tableId, Column splitColumn) throws SQLException { - final String splitColumnName = splitColumn.name(); - final Object[] minMax = queryMinMax(jdbc, tableId, splitColumnName); - final Object min = minMax[0]; - final Object max = minMax[1]; - if (min == null || max == null || min.equals(max)) { - // empty table, or only one row, return full table scan as a chunk - return Collections.singletonList(ChunkRange.all()); - } - - final int chunkSize = sourceConfig.getSplitSize(); - final double distributionFactorUpper = sourceConfig.getDistributionFactorUpper(); - final double distributionFactorLower = sourceConfig.getDistributionFactorLower(); - - if (isEvenlySplitColumn(splitColumn)) { - long approximateRowCnt = queryApproximateRowCnt(jdbc, tableId); - double distributionFactor = - calculateDistributionFactor(tableId, min, max, approximateRowCnt); - - boolean dataIsEvenlyDistributed = - doubleCompare(distributionFactor, distributionFactorLower) >= 0 - && doubleCompare(distributionFactor, distributionFactorUpper) <= 0; - - if (dataIsEvenlyDistributed) { - // the minimum dynamic chunk size is at least 1 - final int dynamicChunkSize = Math.max((int) (distributionFactor * chunkSize), 1); - return splitEvenlySizedChunks( - tableId, min, max, approximateRowCnt, chunkSize, dynamicChunkSize); - } else { - int shardCount = (int) (approximateRowCnt / chunkSize); - if (sourceConfig.getSampleShardingThreshold() < shardCount) { - Object[] sample = - sampleDataFromColumn( - jdbc, - tableId, - splitColumnName, - sourceConfig.getInverseSamplingRate()); - // In order to prevent data loss due to the absence of the minimum value in the - // sampled data, the minimum value is directly added here. - Object[] newSample = new Object[sample.length + 1]; - newSample[0] = min; - System.arraycopy(sample, 0, newSample, 1, sample.length); - return efficientShardingThroughSampling( - tableId, newSample, approximateRowCnt, shardCount); - } - return splitUnevenlySizedChunks( - jdbc, tableId, splitColumnName, min, max, chunkSize); - } - } else { - return splitUnevenlySizedChunks(jdbc, tableId, splitColumnName, min, max, chunkSize); - } - } - - private List efficientShardingThroughSampling( - TableId tableId, Object[] sampleData, long approximateRowCnt, int shardCount) { - LOG.info( - "Use efficient sharding through sampling optimization for table {}, the approximate row count is {}, the shardCount is {}", - tableId, - approximateRowCnt, - shardCount); - - final List splits = new ArrayList<>(); - - // Calculate the shard boundaries - for (int i = 0; i < shardCount; i++) { - Object chunkStart = sampleData[(int) ((long) i * sampleData.length / shardCount)]; - Object chunkEnd = - i < shardCount - 1 - ? sampleData[(int) (((long) i + 1) * sampleData.length / shardCount)] - : null; - splits.add(ChunkRange.of(chunkStart, chunkEnd)); - } - - return splits; - } - - /** - * Split table into evenly sized chunks based on the numeric min and max value of split column, - * and tumble chunks in step size. - */ - private List splitEvenlySizedChunks( - TableId tableId, - Object min, - Object max, - long approximateRowCnt, - int chunkSize, - int dynamicChunkSize) { - LOG.info( - "Use evenly-sized chunk optimization for table {}, the approximate row count is {}, the chunk size is {}, the dynamic chunk size is {}", - tableId, - approximateRowCnt, - chunkSize, - dynamicChunkSize); - if (approximateRowCnt <= chunkSize) { - // there is no more than one chunk, return full table as a chunk - return Collections.singletonList(ChunkRange.all()); - } - - final List splits = new ArrayList<>(); - Object chunkStart = null; - Object chunkEnd = ObjectUtils.plus(min, dynamicChunkSize); - while (ObjectUtils.compare(chunkEnd, max) <= 0) { - splits.add(ChunkRange.of(chunkStart, chunkEnd)); - chunkStart = chunkEnd; - try { - chunkEnd = ObjectUtils.plus(chunkEnd, dynamicChunkSize); - } catch (ArithmeticException e) { - // Stop chunk split to avoid dead loop when number overflows. - break; - } - } - // add the ending split - splits.add(ChunkRange.of(chunkStart, null)); - return splits; - } - - /** Split table into unevenly sized chunks by continuously calculating next chunk max value. */ - private List splitUnevenlySizedChunks( - JdbcConnection jdbc, - TableId tableId, - String splitColumnName, - Object min, - Object max, - int chunkSize) - throws SQLException { - LOG.info( - "Use unevenly-sized chunks for table {}, the chunk size is {}", tableId, chunkSize); - final List splits = new ArrayList<>(); - Object chunkStart = null; - Object chunkEnd = nextChunkEnd(jdbc, min, tableId, splitColumnName, max, chunkSize); - int count = 0; - while (chunkEnd != null && ObjectUtils.compare(chunkEnd, max) <= 0) { - // we start from [null, min + chunk_size) and avoid [null, min) - splits.add(ChunkRange.of(chunkStart, chunkEnd)); - // may sleep a while to avoid DDOS on MySQL server - maySleep(count++, tableId); - chunkStart = chunkEnd; - chunkEnd = nextChunkEnd(jdbc, chunkEnd, tableId, splitColumnName, max, chunkSize); - } - // add the ending split - splits.add(ChunkRange.of(chunkStart, null)); - return splits; - } - - private Object nextChunkEnd( - JdbcConnection jdbc, - Object previousChunkEnd, - TableId tableId, - String splitColumnName, - Object max, - int chunkSize) - throws SQLException { - // chunk end might be null when max values are removed - Object chunkEnd = - queryNextChunkMax(jdbc, tableId, splitColumnName, chunkSize, previousChunkEnd); - if (Objects.equals(previousChunkEnd, chunkEnd)) { - // we don't allow equal chunk start and end, - // should query the next one larger than chunkEnd - chunkEnd = queryMin(jdbc, tableId, splitColumnName, chunkEnd); - } - if (ObjectUtils.compare(chunkEnd, max) >= 0) { - return null; - } else { - return chunkEnd; - } - } - - private SnapshotSplit createSnapshotSplit( - JdbcConnection jdbc, - TableId tableId, - int chunkId, - SeaTunnelRowType splitKeyType, - Object chunkStart, - Object chunkEnd) { - // currently, we only support single split column - Object[] splitStart = chunkStart == null ? null : new Object[] {chunkStart}; - Object[] splitEnd = chunkEnd == null ? null : new Object[] {chunkEnd}; - return new SnapshotSplit( - splitId(tableId, chunkId), tableId, splitKeyType, splitStart, splitEnd); - } - - // ------------------------------------------------------------------------------------------ - /** Returns the distribution factor of the given table. */ - @SuppressWarnings("MagicNumber") - private double calculateDistributionFactor( - TableId tableId, Object min, Object max, long approximateRowCnt) { - - if (!min.getClass().equals(max.getClass())) { - throw new IllegalStateException( - String.format( - "Unsupported operation type, the MIN value type %s is different with MAX value type %s.", - min.getClass().getSimpleName(), max.getClass().getSimpleName())); - } - if (approximateRowCnt == 0) { - return Double.MAX_VALUE; - } - BigDecimal difference = ObjectUtils.minus(max, min); - // factor = (max - min + 1) / rowCount - final BigDecimal subRowCnt = difference.add(BigDecimal.valueOf(1)); - double distributionFactor = - subRowCnt.divide(new BigDecimal(approximateRowCnt), 4, ROUND_CEILING).doubleValue(); - LOG.info( - "The distribution factor of table {} is {} according to the min split key {}, max split key {} and approximate row count {}", - tableId, - distributionFactor, - min, - max, - approximateRowCnt); - return distributionFactor; - } - - private static String splitId(TableId tableId, int chunkId) { - return tableId.toString() + ":" + chunkId; - } - - @SuppressWarnings("MagicNumber") - private static void maySleep(int count, TableId tableId) { - // every 100 queries to sleep 1s - if (count % 10 == 0) { - try { - Thread.sleep(100); - } catch (InterruptedException e) { - // nothing to do - } - LOG.info("JdbcSourceChunkSplitter has split {} chunks for table {}", count, tableId); - } - } } diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/eumerator/SqlServerChunkSplitter.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/eumerator/SqlServerChunkSplitter.java index ac0b8165dbd..7efd53dc3fc 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/eumerator/SqlServerChunkSplitter.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/eumerator/SqlServerChunkSplitter.java @@ -21,10 +21,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceConfig; import org.apache.seatunnel.connectors.cdc.base.dialect.JdbcDataSourceDialect; -import org.apache.seatunnel.connectors.cdc.base.source.enumerator.splitter.ChunkRange; -import org.apache.seatunnel.connectors.cdc.base.source.enumerator.splitter.JdbcSourceChunkSplitter; -import org.apache.seatunnel.connectors.cdc.base.source.split.SnapshotSplit; -import org.apache.seatunnel.connectors.cdc.base.utils.ObjectUtils; +import org.apache.seatunnel.connectors.cdc.base.source.enumerator.splitter.AbstractJdbcSourceChunkSplitter; import org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.utils.SqlServerTypeUtils; import org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.utils.SqlServerUtils; @@ -33,71 +30,14 @@ import io.debezium.relational.TableId; import lombok.extern.slf4j.Slf4j; -import java.math.BigDecimal; import java.sql.SQLException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.List; -import java.util.Objects; - -import static java.math.BigDecimal.ROUND_CEILING; -import static org.apache.seatunnel.connectors.cdc.base.utils.ObjectUtils.doubleCompare; /** The {@code ChunkSplitter} used to split table into a set of chunks for JDBC data source. */ @Slf4j -public class SqlServerChunkSplitter implements JdbcSourceChunkSplitter { - - private final JdbcSourceConfig sourceConfig; - private final JdbcDataSourceDialect dialect; +public class SqlServerChunkSplitter extends AbstractJdbcSourceChunkSplitter { public SqlServerChunkSplitter(JdbcSourceConfig sourceConfig, JdbcDataSourceDialect dialect) { - this.sourceConfig = sourceConfig; - this.dialect = dialect; - } - - @Override - public Collection generateSplits(TableId tableId) { - try (JdbcConnection jdbc = dialect.openJdbcConnection(sourceConfig)) { - - log.info("Start splitting table {} into chunks...", tableId); - long start = System.currentTimeMillis(); - - Column splitColumn = getSplitColumn(jdbc, dialect, tableId); - final List chunks; - try { - chunks = splitTableIntoChunks(jdbc, tableId, splitColumn); - } catch (SQLException e) { - throw new RuntimeException("Failed to split chunks for table " + tableId, e); - } - - // convert chunks into splits - List splits = new ArrayList<>(); - SeaTunnelRowType splitType = getSplitType(splitColumn); - for (int i = 0; i < chunks.size(); i++) { - ChunkRange chunk = chunks.get(i); - SnapshotSplit split = - createSnapshotSplit( - jdbc, - tableId, - i, - splitType, - chunk.getChunkStart(), - chunk.getChunkEnd()); - splits.add(split); - } - - long end = System.currentTimeMillis(); - log.info( - "Split table {} into {} chunks, time cost: {}ms.", - tableId, - splits.size(), - end - start); - return splits; - } catch (Exception e) { - throw new RuntimeException( - String.format("Generate Splits for table %s error", tableId), e); - } + super(sourceConfig, dialect); } @Override @@ -150,242 +90,4 @@ public String buildSplitScanQuery( public SeaTunnelDataType fromDbzColumn(Column splitColumn) { return SqlServerTypeUtils.convertFromColumn(splitColumn); } - - // -------------------------------------------------------------------------------------------- - // Utilities - // -------------------------------------------------------------------------------------------- - - /** - * We can use evenly-sized chunks or unevenly-sized chunks when split table into chunks, using - * evenly-sized chunks which is much efficient, using unevenly-sized chunks which will request - * many queries and is not efficient. - */ - private List splitTableIntoChunks( - JdbcConnection jdbc, TableId tableId, Column splitColumn) throws SQLException { - final String splitColumnName = splitColumn.name(); - final Object[] minMax = queryMinMax(jdbc, tableId, splitColumnName); - final Object min = minMax[0]; - final Object max = minMax[1]; - if (min == null || max == null || min.equals(max)) { - // empty table, or only one row, return full table scan as a chunk - return Collections.singletonList(ChunkRange.all()); - } - - final int chunkSize = sourceConfig.getSplitSize(); - final double distributionFactorUpper = sourceConfig.getDistributionFactorUpper(); - final double distributionFactorLower = sourceConfig.getDistributionFactorLower(); - - if (isEvenlySplitColumn(splitColumn)) { - long approximateRowCnt = queryApproximateRowCnt(jdbc, tableId); - double distributionFactor = - calculateDistributionFactor(tableId, min, max, approximateRowCnt); - - boolean dataIsEvenlyDistributed = - doubleCompare(distributionFactor, distributionFactorLower) >= 0 - && doubleCompare(distributionFactor, distributionFactorUpper) <= 0; - - if (dataIsEvenlyDistributed) { - // the minimum dynamic chunk size is at least 1 - final int dynamicChunkSize = Math.max((int) (distributionFactor * chunkSize), 1); - return splitEvenlySizedChunks( - tableId, min, max, approximateRowCnt, chunkSize, dynamicChunkSize); - } else { - int shardCount = (int) (approximateRowCnt / chunkSize); - if (sourceConfig.getSampleShardingThreshold() < shardCount) { - Object[] sample = - sampleDataFromColumn( - jdbc, - tableId, - splitColumnName, - sourceConfig.getInverseSamplingRate()); - // In order to prevent data loss due to the absence of the minimum value in the - // sampled data, the minimum value is directly added here. - Object[] newSample = new Object[sample.length + 1]; - newSample[0] = min; - System.arraycopy(sample, 0, newSample, 1, sample.length); - return efficientShardingThroughSampling( - tableId, newSample, approximateRowCnt, shardCount); - } - return splitUnevenlySizedChunks( - jdbc, tableId, splitColumnName, min, max, chunkSize); - } - } else { - return splitUnevenlySizedChunks(jdbc, tableId, splitColumnName, min, max, chunkSize); - } - } - - private List efficientShardingThroughSampling( - TableId tableId, Object[] sampleData, long approximateRowCnt, int shardCount) { - log.info( - "Use efficient sharding through sampling optimization for table {}, the approximate row count is {}, the shardCount is {}", - tableId, - approximateRowCnt, - shardCount); - - final List splits = new ArrayList<>(); - - // Calculate the shard boundaries - for (int i = 0; i < shardCount; i++) { - Object chunkStart = sampleData[(int) ((long) i * sampleData.length / shardCount)]; - Object chunkEnd = - i < shardCount - 1 - ? sampleData[(int) (((long) i + 1) * sampleData.length / shardCount)] - : null; - splits.add(ChunkRange.of(chunkStart, chunkEnd)); - } - - return splits; - } - - /** - * Split table into evenly sized chunks based on the numeric min and max value of split column, - * and tumble chunks in step size. - */ - private List splitEvenlySizedChunks( - TableId tableId, - Object min, - Object max, - long approximateRowCnt, - int chunkSize, - int dynamicChunkSize) { - log.info( - "Use evenly-sized chunk optimization for table {}, the approximate row count is {}, the chunk size is {}, the dynamic chunk size is {}", - tableId, - approximateRowCnt, - chunkSize, - dynamicChunkSize); - if (approximateRowCnt <= chunkSize) { - // there is no more than one chunk, return full table as a chunk - return Collections.singletonList(ChunkRange.all()); - } - - final List splits = new ArrayList<>(); - Object chunkStart = null; - Object chunkEnd = ObjectUtils.plus(min, dynamicChunkSize); - while (ObjectUtils.compare(chunkEnd, max) <= 0) { - splits.add(ChunkRange.of(chunkStart, chunkEnd)); - chunkStart = chunkEnd; - try { - chunkEnd = ObjectUtils.plus(chunkEnd, dynamicChunkSize); - } catch (ArithmeticException e) { - // Stop chunk split to avoid dead loop when number overflows. - break; - } - } - // add the ending split - splits.add(ChunkRange.of(chunkStart, null)); - return splits; - } - - /** Split table into unevenly sized chunks by continuously calculating next chunk max value. */ - private List splitUnevenlySizedChunks( - JdbcConnection jdbc, - TableId tableId, - String splitColumnName, - Object min, - Object max, - int chunkSize) - throws SQLException { - log.info( - "Use unevenly-sized chunks for table {}, the chunk size is {}", tableId, chunkSize); - final List splits = new ArrayList<>(); - Object chunkStart = null; - Object chunkEnd = nextChunkEnd(jdbc, min, tableId, splitColumnName, max, chunkSize); - int count = 0; - while (chunkEnd != null && ObjectUtils.compare(chunkEnd, max) <= 0) { - // we start from [null, min + chunk_size) and avoid [null, min) - splits.add(ChunkRange.of(chunkStart, chunkEnd)); - // may sleep a while to avoid DDOS on MySQL server - maySleep(count++, tableId); - chunkStart = chunkEnd; - chunkEnd = nextChunkEnd(jdbc, chunkEnd, tableId, splitColumnName, max, chunkSize); - } - // add the ending split - splits.add(ChunkRange.of(chunkStart, null)); - return splits; - } - - private Object nextChunkEnd( - JdbcConnection jdbc, - Object previousChunkEnd, - TableId tableId, - String splitColumnName, - Object max, - int chunkSize) - throws SQLException { - // chunk end might be null when max values are removed - Object chunkEnd = - queryNextChunkMax(jdbc, tableId, splitColumnName, chunkSize, previousChunkEnd); - if (Objects.equals(previousChunkEnd, chunkEnd)) { - // we don't allow equal chunk start and end, - // should query the next one larger than chunkEnd - chunkEnd = queryMin(jdbc, tableId, splitColumnName, chunkEnd); - } - if (ObjectUtils.compare(chunkEnd, max) >= 0) { - return null; - } else { - return chunkEnd; - } - } - - private SnapshotSplit createSnapshotSplit( - JdbcConnection jdbc, - TableId tableId, - int chunkId, - SeaTunnelRowType splitKeyType, - Object chunkStart, - Object chunkEnd) { - // currently, we only support single split column - Object[] splitStart = chunkStart == null ? null : new Object[] {chunkStart}; - Object[] splitEnd = chunkEnd == null ? null : new Object[] {chunkEnd}; - return new SnapshotSplit( - splitId(tableId, chunkId), tableId, splitKeyType, splitStart, splitEnd); - } - - // ------------------------------------------------------------------------------------------ - /** Returns the distribution factor of the given table. */ - @SuppressWarnings("MagicNumber") - private double calculateDistributionFactor( - TableId tableId, Object min, Object max, long approximateRowCnt) { - - if (!min.getClass().equals(max.getClass())) { - throw new IllegalStateException( - String.format( - "Unsupported operation type, the MIN value type %s is different with MAX value type %s.", - min.getClass().getSimpleName(), max.getClass().getSimpleName())); - } - if (approximateRowCnt == 0) { - return Double.MAX_VALUE; - } - BigDecimal difference = ObjectUtils.minus(max, min); - // factor = (max - min + 1) / rowCount - final BigDecimal subRowCnt = difference.add(BigDecimal.valueOf(1)); - double distributionFactor = - subRowCnt.divide(new BigDecimal(approximateRowCnt), 4, ROUND_CEILING).doubleValue(); - log.info( - "The distribution factor of table {} is {} according to the min split key {}, max split key {} and approximate row count {}", - tableId, - distributionFactor, - min, - max, - approximateRowCnt); - return distributionFactor; - } - - private static String splitId(TableId tableId, int chunkId) { - return tableId.toString() + ":" + chunkId; - } - - @SuppressWarnings("MagicNumber") - private static void maySleep(int count, TableId tableId) { - // every 100 queries to sleep 1s - if (count % 10 == 0) { - try { - Thread.sleep(100); - } catch (InterruptedException e) { - // nothing to do - } - log.info("JdbcSourceChunkSplitter has split {} chunks for table {}", count, tableId); - } - } }