Skip to content

Commit

Permalink
[Improve] update clickhouse connector, use factory to create source/sink
Browse files Browse the repository at this point in the history
  • Loading branch information
liunaijie committed Oct 30, 2024
1 parent bdffbcc commit da7120e
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.seatunnel.api.serialization.Serializer;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ReaderOption;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.state.CKAggCommitInfo;
Expand All @@ -35,9 +36,11 @@ public class ClickhouseSink
implements SeaTunnelSink<SeaTunnelRow, ClickhouseSinkState, CKCommitInfo, CKAggCommitInfo> {

private ReaderOption option;
private CatalogTable catalogTable;

public ClickhouseSink(ReaderOption option) {
public ClickhouseSink(ReaderOption option, CatalogTable catalogTable) {
this.option = option;
this.catalogTable = catalogTable;
}

@Override
Expand All @@ -61,4 +64,9 @@ public SinkWriter<SeaTunnelRow, CKCommitInfo, ClickhouseSinkState> restoreWriter
public Optional<Serializer<ClickhouseSinkState>> getWriterStateSerializer() {
return Optional.of(new DefaultSerializer<>());
}

@Override
public Optional<CatalogTable> getWriteCatalogTable() {
return Optional.of(catalogTable);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ public TableSink<SeaTunnelRow, ClickhouseSinkState, CKCommitInfo, CKAggCommitInf
.supportUpsert(supportUpsert)
.allowExperimentalLightweightDelete(allowExperimentalLightweightDelete)
.build();
return () -> new ClickhouseSink(option);
return () -> new ClickhouseSink(option, catalogTable);
} finally {
proxy.close();
}
Expand Down

0 comments on commit da7120e

Please sign in to comment.