From da7120e95c2bb81c4fc5750dc9d065e5f5e75ec2 Mon Sep 17 00:00:00 2001 From: jarvis Date: Wed, 30 Oct 2024 11:02:02 +0800 Subject: [PATCH] [Improve] update clickhouse connector, use factory to create source/sink --- .../clickhouse/sink/client/ClickhouseSink.java | 10 +++++++++- .../clickhouse/sink/client/ClickhouseSinkFactory.java | 2 +- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSink.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSink.java index 7459be90081..22f18694e23 100644 --- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSink.java +++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSink.java @@ -21,6 +21,7 @@ import org.apache.seatunnel.api.serialization.Serializer; import org.apache.seatunnel.api.sink.SeaTunnelSink; import org.apache.seatunnel.api.sink.SinkWriter; +import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ReaderOption; import org.apache.seatunnel.connectors.seatunnel.clickhouse.state.CKAggCommitInfo; @@ -35,9 +36,11 @@ public class ClickhouseSink implements SeaTunnelSink { private ReaderOption option; + private CatalogTable catalogTable; - public ClickhouseSink(ReaderOption option) { + public ClickhouseSink(ReaderOption option, CatalogTable catalogTable) { this.option = option; + this.catalogTable = catalogTable; } @Override @@ -61,4 +64,9 @@ public SinkWriter restoreWriter public Optional> getWriterStateSerializer() { return Optional.of(new DefaultSerializer<>()); } + + @Override + public Optional getWriteCatalogTable() { + return Optional.of(catalogTable); + } } diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSinkFactory.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSinkFactory.java index 92660addbbc..720efacc321 100644 --- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSinkFactory.java +++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSinkFactory.java @@ -138,7 +138,7 @@ public TableSink new ClickhouseSink(option); + return () -> new ClickhouseSink(option, catalogTable); } finally { proxy.close(); }