From 573b25cba8e09cc8adfc40c1a4cb67251f90ded5 Mon Sep 17 00:00:00 2001 From: tyrantlucifer Date: Thu, 8 Jun 2023 14:50:30 +0800 Subject: [PATCH] [Feature][Core] Upgrade flink source translation --- .../pom.xml | 41 ++++--- .../resources/examples/fake_to_console.conf | 4 +- .../pom.xml | 7 ++ .../flink/source/FlinkRowCollector.java | 52 +++++++++ .../translation/flink/source/FlinkSource.java | 104 ++++++++++++++++++ .../flink/source/FlinkSourceEnumerator.java | 87 +++++++++++++++ .../flink/source/FlinkSourceReader.java | 101 +++++++++++++++++ .../source/FlinkSourceReaderContext.java | 65 +++++++++++ .../FlinkSourceSplitEnumeratorContext.java | 75 +++++++++++++ .../flink/source/SourceEventWrapper.java | 33 ++++++ .../flink/source/SplitWrapper.java | 39 +++++++ .../flink/source/SplitWrapperSerializer.java | 68 ++++++++++++ 12 files changed, 656 insertions(+), 20 deletions(-) create mode 100644 seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkRowCollector.java create mode 100644 seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSource.java create mode 100644 seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSourceEnumerator.java create mode 100644 seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSourceReader.java create mode 100644 seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSourceReaderContext.java create mode 100644 seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSourceSplitEnumeratorContext.java create mode 100644 seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/SourceEventWrapper.java create mode 100644 seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/SplitWrapper.java create mode 100644 seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/SplitWrapperSerializer.java diff --git a/seatunnel-examples/seatunnel-flink-connector-v2-example/pom.xml b/seatunnel-examples/seatunnel-flink-connector-v2-example/pom.xml index d7029f9cba2a..8e073cbc04d6 100644 --- a/seatunnel-examples/seatunnel-flink-connector-v2-example/pom.xml +++ b/seatunnel-examples/seatunnel-flink-connector-v2-example/pom.xml @@ -44,7 +44,7 @@ org.apache.seatunnel - seatunnel-flink-13-starter + seatunnel-flink-15-starter ${project.version} @@ -72,39 +72,46 @@ org.apache.flink flink-java - ${flink.1.13.6.version} + ${flink.1.15.3.version} ${flink.scope} + org.apache.flink - flink-table-planner_${scala.binary.version} - ${flink.1.13.6.version} - ${flink.scope} + flink-streaming-java + ${flink.1.15.3.version} + org.apache.flink - flink-table-planner-blink_${scala.binary.version} - ${flink.1.13.6.version} - ${flink.scope} + flink-table-api-java + ${flink.1.15.3.version} + org.apache.flink - flink-streaming-java_${scala.binary.version} - ${flink.1.13.6.version} - ${flink.scope} + flink-table-api-java-bridge + ${flink.1.15.3.version} + org.apache.flink - flink-clients_${scala.binary.version} - ${flink.1.13.6.version} - ${flink.scope} + flink-table-planner-loader + 1.15.3 + org.apache.flink - flink-runtime-web_${scala.binary.version} - ${flink.1.13.6.version} - ${flink.scope} + flink-table-runtime + 1.15.3 + + + org.apache.flink + flink-clients + 1.15.3 + + com.squareup.okhttp3 mockwebserver diff --git a/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/fake_to_console.conf b/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/fake_to_console.conf index 0f927351fb63..b8d3fca1c06e 100644 --- a/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/fake_to_console.conf +++ b/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/fake_to_console.conf @@ -20,8 +20,8 @@ env { # You can set flink configuration here + job.mode = "BATCH" execution.parallelism = 2 - job.mode = "STREAMING" #execution.checkpoint.interval = 10000 #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint" } @@ -29,7 +29,6 @@ env { source { # This is a example source plugin **only for test and demonstrate the feature source plugin** FakeSource { - parallelism = 2 result_table_name = "fake" row.num = 16 schema = { @@ -52,7 +51,6 @@ transform { sink { Console { - parallelism = 3 } # If you would like to get more information about how to configure seatunnel and see full list of sink plugins, diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/pom.xml b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/pom.xml index de0f3702bbf4..bcf9d0c60f60 100644 --- a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/pom.xml +++ b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/pom.xml @@ -56,6 +56,13 @@ provided + + org.apache.flink + flink-connector-base + ${flink.1.15.3.version} + provided + + diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkRowCollector.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkRowCollector.java new file mode 100644 index 000000000000..a3d57008513d --- /dev/null +++ b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkRowCollector.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.translation.flink.source; + +import org.apache.seatunnel.api.source.Collector; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.translation.flink.serialization.FlinkRowConverter; + +import org.apache.flink.api.connector.source.ReaderOutput; +import org.apache.flink.types.Row; + +public class FlinkRowCollector implements Collector { + + private final ReaderOutput readerOutput; + + private final FlinkRowConverter rowSerialization; + + public FlinkRowCollector(ReaderOutput readerOutput, SeaTunnelRowType seaTunnelRowType) { + this.readerOutput = readerOutput; + this.rowSerialization = new FlinkRowConverter(seaTunnelRowType); + } + + @Override + public void collect(SeaTunnelRow record) { + try { + readerOutput.collect(rowSerialization.convert(record)); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Override + public Object getCheckpointLock() { + return this; + } +} diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSource.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSource.java new file mode 100644 index 000000000000..fa28850a01bc --- /dev/null +++ b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSource.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.translation.flink.source; + +import org.apache.seatunnel.api.serialization.Serializer; +import org.apache.seatunnel.api.source.SeaTunnelSource; +import org.apache.seatunnel.api.source.SourceSplit; +import org.apache.seatunnel.api.source.SourceSplitEnumerator; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.translation.flink.serialization.FlinkSimpleVersionedSerializer; +import org.apache.seatunnel.translation.flink.utils.TypeConverterUtils; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.types.Row; + +import java.io.Serializable; + +public class FlinkSource + implements Source, EnumStateT>, ResultTypeQueryable { + + private final SeaTunnelSource source; + + public FlinkSource(SeaTunnelSource source) { + this.source = source; + } + + @Override + public Boundedness getBoundedness() { + org.apache.seatunnel.api.source.Boundedness boundedness = source.getBoundedness(); + return boundedness == org.apache.seatunnel.api.source.Boundedness.BOUNDED + ? Boundedness.BOUNDED + : Boundedness.CONTINUOUS_UNBOUNDED; + } + + @Override + public SourceReader> createReader(SourceReaderContext readerContext) + throws Exception { + org.apache.seatunnel.api.source.SourceReader.Context context = + new FlinkSourceReaderContext(readerContext, source); + org.apache.seatunnel.api.source.SourceReader reader = + source.createReader(context); + return new FlinkSourceReader<>(reader, (SeaTunnelRowType) source.getProducedType()); + } + + @Override + public SplitEnumerator, EnumStateT> createEnumerator( + SplitEnumeratorContext> enumContext) throws Exception { + SourceSplitEnumerator.Context context = + new FlinkSourceSplitEnumeratorContext<>(enumContext); + SourceSplitEnumerator enumerator = source.createEnumerator(context); + return new FlinkSourceEnumerator<>(enumerator, enumContext); + } + + @Override + public SplitEnumerator, EnumStateT> restoreEnumerator( + SplitEnumeratorContext> enumContext, EnumStateT checkpoint) + throws Exception { + FlinkSourceSplitEnumeratorContext context = + new FlinkSourceSplitEnumeratorContext<>(enumContext); + SourceSplitEnumerator enumerator = + source.restoreEnumerator(context, checkpoint); + return new FlinkSourceEnumerator<>(enumerator, enumContext); + } + + @Override + public SimpleVersionedSerializer> getSplitSerializer() { + return new SplitWrapperSerializer<>(source.getSplitSerializer()); + } + + @Override + public SimpleVersionedSerializer getEnumeratorCheckpointSerializer() { + Serializer enumeratorStateSerializer = source.getEnumeratorStateSerializer(); + return new FlinkSimpleVersionedSerializer<>(enumeratorStateSerializer); + } + + @Override + public TypeInformation getProducedType() { + return (TypeInformation) TypeConverterUtils.convert(source.getProducedType()); + } +} diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSourceEnumerator.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSourceEnumerator.java new file mode 100644 index 000000000000..cb66a6858c93 --- /dev/null +++ b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSourceEnumerator.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.translation.flink.source; + +import org.apache.seatunnel.api.source.SourceSplit; +import org.apache.seatunnel.api.source.SourceSplitEnumerator; + +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.List; +import java.util.stream.Collectors; + +public class FlinkSourceEnumerator + implements SplitEnumerator, EnumStateT> { + + private final SourceSplitEnumerator sourceSplitEnumerator; + + private final SplitEnumeratorContext> enumeratorContext; + + private boolean isRun = false; + + public FlinkSourceEnumerator( + SourceSplitEnumerator enumerator, + SplitEnumeratorContext> enumContext) { + this.sourceSplitEnumerator = enumerator; + this.enumeratorContext = enumContext; + } + + @Override + public void start() { + sourceSplitEnumerator.open(); + } + + @Override + public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) { + sourceSplitEnumerator.handleSplitRequest(subtaskId); + } + + @Override + public void addSplitsBack(List> splits, int subtaskId) { + sourceSplitEnumerator.addSplitsBack( + splits.stream().map(SplitWrapper::getSourceSplit).collect(Collectors.toList()), + subtaskId); + } + + @Override + public void addReader(int subtaskId) { + sourceSplitEnumerator.registerReader(subtaskId); + if (!isRun) { + try { + sourceSplitEnumerator.run(); + } catch (Exception e) { + throw new RuntimeException(e); + } + isRun = true; + } + } + + @Override + public EnumStateT snapshotState(long checkpointId) throws Exception { + return sourceSplitEnumerator.snapshotState(checkpointId); + } + + @Override + public void close() throws IOException { + sourceSplitEnumerator.close(); + } +} diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSourceReader.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSourceReader.java new file mode 100644 index 000000000000..6bcd4a767dbb --- /dev/null +++ b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSourceReader.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.translation.flink.source; + +import org.apache.seatunnel.api.source.SourceSplit; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; + +import org.apache.flink.api.connector.source.ReaderOutput; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.core.io.InputStatus; +import org.apache.flink.types.Row; + +import java.util.List; +import java.util.Objects; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +public class FlinkSourceReader + implements SourceReader> { + + private final org.apache.seatunnel.api.source.SourceReader sourceReader; + + private final SeaTunnelRowType seaTunnelRowType; + + private FlinkRowCollector flinkRowCollector = null; + + private InputStatus inputStatus = InputStatus.MORE_AVAILABLE; + + public FlinkSourceReader( + org.apache.seatunnel.api.source.SourceReader sourceReader, + SeaTunnelRowType seaTunnelRowType) { + this.sourceReader = sourceReader; + this.seaTunnelRowType = seaTunnelRowType; + } + + @Override + public void start() { + try { + sourceReader.open(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Override + public InputStatus pollNext(ReaderOutput output) throws Exception { + if (Objects.isNull(flinkRowCollector)) { + flinkRowCollector = new FlinkRowCollector(output, seaTunnelRowType); + } + sourceReader.pollNext(flinkRowCollector); + return inputStatus; + } + + @Override + public List> snapshotState(long checkpointId) { + try { + List splitTS = sourceReader.snapshotState(checkpointId); + return splitTS.stream().map(SplitWrapper::new).collect(Collectors.toList()); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Override + public CompletableFuture isAvailable() { + return CompletableFuture.completedFuture(null); + } + + @Override + public void addSplits(List> splits) { + sourceReader.addSplits( + splits.stream().map(SplitWrapper::getSourceSplit).collect(Collectors.toList())); + } + + @Override + public void notifyNoMoreSplits() { + sourceReader.handleNoMoreSplits(); + inputStatus = InputStatus.END_OF_INPUT; + } + + @Override + public void close() throws Exception { + sourceReader.close(); + } +} diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSourceReaderContext.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSourceReaderContext.java new file mode 100644 index 000000000000..bbd3c10d017e --- /dev/null +++ b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSourceReaderContext.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.translation.flink.source; + +import org.apache.seatunnel.api.common.metrics.MetricsContext; +import org.apache.seatunnel.api.source.SeaTunnelSource; +import org.apache.seatunnel.api.source.SourceEvent; +import org.apache.seatunnel.api.source.SourceReader; + +import org.apache.flink.api.connector.source.SourceReaderContext; + +public class FlinkSourceReaderContext implements SourceReader.Context { + + private final SourceReaderContext readerContext; + + private final SeaTunnelSource source; + + public FlinkSourceReaderContext(SourceReaderContext readerContext, SeaTunnelSource source) { + this.readerContext = readerContext; + this.source = source; + } + + @Override + public int getIndexOfSubtask() { + return readerContext.getIndexOfSubtask(); + } + + @Override + public org.apache.seatunnel.api.source.Boundedness getBoundedness() { + return source.getBoundedness(); + } + + @Override + public void signalNoMoreElement() {} + + @Override + public void sendSplitRequest() { + readerContext.sendSplitRequest(); + } + + @Override + public void sendSourceEventToEnumerator(SourceEvent sourceEvent) { + readerContext.sendSourceEventToCoordinator(new SourceEventWrapper(sourceEvent)); + } + + @Override + public MetricsContext getMetricsContext() { + return null; + } +} diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSourceSplitEnumeratorContext.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSourceSplitEnumeratorContext.java new file mode 100644 index 000000000000..b7b9df137a09 --- /dev/null +++ b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSourceSplitEnumeratorContext.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.translation.flink.source; + +import org.apache.seatunnel.api.common.metrics.MetricsContext; +import org.apache.seatunnel.api.source.SourceEvent; +import org.apache.seatunnel.api.source.SourceSplit; +import org.apache.seatunnel.api.source.SourceSplitEnumerator; + +import org.apache.flink.api.connector.source.SplitEnumeratorContext; + +import java.util.List; +import java.util.Set; + +public class FlinkSourceSplitEnumeratorContext + implements SourceSplitEnumerator.Context { + + private final SplitEnumeratorContext> enumContext; + + public FlinkSourceSplitEnumeratorContext( + SplitEnumeratorContext> enumContext) { + this.enumContext = enumContext; + } + + @Override + public int currentParallelism() { + return enumContext.currentParallelism(); + } + + @Override + public Set registeredReaders() { + return enumContext.registeredReaders().keySet(); + } + + @Override + public void assignSplit(int subtaskId, List splits) { + splits.forEach( + split -> { + enumContext.assignSplit(new SplitWrapper<>(split), subtaskId); + }); + } + + @Override + public void signalNoMoreSplits(int subtask) { + int parallelism = enumContext.currentParallelism(); + for (int i = 0; i < parallelism; i++) { + enumContext.signalNoMoreSplits(subtask); + } + } + + @Override + public void sendEventToSourceReader(int subtaskId, SourceEvent event) { + enumContext.sendEventToSourceReader(subtaskId, new SourceEventWrapper(event)); + } + + @Override + public MetricsContext getMetricsContext() { + return null; + } +} diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/SourceEventWrapper.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/SourceEventWrapper.java new file mode 100644 index 000000000000..8625c7e9be76 --- /dev/null +++ b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/SourceEventWrapper.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.translation.flink.source; + +import org.apache.flink.api.connector.source.SourceEvent; + +public class SourceEventWrapper implements SourceEvent { + + private final org.apache.seatunnel.api.source.SourceEvent sourceEvent; + + public SourceEventWrapper(org.apache.seatunnel.api.source.SourceEvent sourceEvent) { + this.sourceEvent = sourceEvent; + } + + public org.apache.seatunnel.api.source.SourceEvent getSourceEvent() { + return sourceEvent; + } +} diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/SplitWrapper.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/SplitWrapper.java new file mode 100644 index 000000000000..991037d082f6 --- /dev/null +++ b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/SplitWrapper.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.translation.flink.source; + +import org.apache.flink.api.connector.source.SourceSplit; + +public class SplitWrapper + implements SourceSplit { + + private final T sourceSplit; + + public SplitWrapper(T sourceSplit) { + this.sourceSplit = sourceSplit; + } + + public T getSourceSplit() { + return sourceSplit; + } + + @Override + public String splitId() { + return sourceSplit.splitId(); + } +} diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/SplitWrapperSerializer.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/SplitWrapperSerializer.java new file mode 100644 index 000000000000..f04c9d360550 --- /dev/null +++ b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/SplitWrapperSerializer.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.translation.flink.source; + +import org.apache.seatunnel.api.serialization.Serializer; +import org.apache.seatunnel.api.source.SourceSplit; + +import org.apache.flink.core.io.SimpleVersionedSerializer; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; + +public class SplitWrapperSerializer + implements SimpleVersionedSerializer> { + + private final Serializer serializer; + + public SplitWrapperSerializer(Serializer serializer) { + this.serializer = serializer; + } + + @Override + public int getVersion() { + return 0; + } + + @Override + public byte[] serialize(SplitWrapper obj) throws IOException { + try (final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + final DataOutputStream out = new DataOutputStream(baos)) { + byte[] serialize = serializer.serialize(obj.getSourceSplit()); + out.writeInt(serialize.length); + out.write(serialize); + out.flush(); + return baos.toByteArray(); + } + } + + @Override + public SplitWrapper deserialize(int version, byte[] serialized) throws IOException { + try (final ByteArrayInputStream bais = new ByteArrayInputStream(serialized); + final DataInputStream in = new DataInputStream(bais)) { + final int size = in.readInt(); + final byte[] stateBytes = new byte[size]; + in.read(stateBytes); + SplitT split = serializer.deserialize(stateBytes); + return new SplitWrapper<>(split); + } + } +}