diff --git a/seatunnel-examples/seatunnel-flink-connector-v2-example/pom.xml b/seatunnel-examples/seatunnel-flink-connector-v2-example/pom.xml
index d7029f9cba2a..8e073cbc04d6 100644
--- a/seatunnel-examples/seatunnel-flink-connector-v2-example/pom.xml
+++ b/seatunnel-examples/seatunnel-flink-connector-v2-example/pom.xml
@@ -44,7 +44,7 @@
org.apache.seatunnel
- seatunnel-flink-13-starter
+ seatunnel-flink-15-starter
${project.version}
@@ -72,39 +72,46 @@
org.apache.flink
flink-java
- ${flink.1.13.6.version}
+ ${flink.1.15.3.version}
${flink.scope}
+
org.apache.flink
- flink-table-planner_${scala.binary.version}
- ${flink.1.13.6.version}
- ${flink.scope}
+ flink-streaming-java
+ ${flink.1.15.3.version}
+
org.apache.flink
- flink-table-planner-blink_${scala.binary.version}
- ${flink.1.13.6.version}
- ${flink.scope}
+ flink-table-api-java
+ ${flink.1.15.3.version}
+
org.apache.flink
- flink-streaming-java_${scala.binary.version}
- ${flink.1.13.6.version}
- ${flink.scope}
+ flink-table-api-java-bridge
+ ${flink.1.15.3.version}
+
org.apache.flink
- flink-clients_${scala.binary.version}
- ${flink.1.13.6.version}
- ${flink.scope}
+ flink-table-planner-loader
+ 1.15.3
+
org.apache.flink
- flink-runtime-web_${scala.binary.version}
- ${flink.1.13.6.version}
- ${flink.scope}
+ flink-table-runtime
+ 1.15.3
+
+
+ org.apache.flink
+ flink-clients
+ 1.15.3
+
+
com.squareup.okhttp3
mockwebserver
diff --git a/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/fake_to_console.conf b/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/fake_to_console.conf
index 0f927351fb63..b8d3fca1c06e 100644
--- a/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/fake_to_console.conf
+++ b/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/fake_to_console.conf
@@ -20,8 +20,8 @@
env {
# You can set flink configuration here
+ job.mode = "BATCH"
execution.parallelism = 2
- job.mode = "STREAMING"
#execution.checkpoint.interval = 10000
#execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
}
@@ -29,7 +29,6 @@ env {
source {
# This is a example source plugin **only for test and demonstrate the feature source plugin**
FakeSource {
- parallelism = 2
result_table_name = "fake"
row.num = 16
schema = {
@@ -52,7 +51,6 @@ transform {
sink {
Console {
- parallelism = 3
}
# If you would like to get more information about how to configure seatunnel and see full list of sink plugins,
diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/pom.xml b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/pom.xml
index de0f3702bbf4..bcf9d0c60f60 100644
--- a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/pom.xml
+++ b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/pom.xml
@@ -56,6 +56,13 @@
provided
+
+ org.apache.flink
+ flink-connector-base
+ ${flink.1.15.3.version}
+ provided
+
+
diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkRowCollector.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkRowCollector.java
new file mode 100644
index 000000000000..a3d57008513d
--- /dev/null
+++ b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkRowCollector.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.translation.flink.source;
+
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.translation.flink.serialization.FlinkRowConverter;
+
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.types.Row;
+
+public class FlinkRowCollector implements Collector {
+
+ private final ReaderOutput readerOutput;
+
+ private final FlinkRowConverter rowSerialization;
+
+ public FlinkRowCollector(ReaderOutput readerOutput, SeaTunnelRowType seaTunnelRowType) {
+ this.readerOutput = readerOutput;
+ this.rowSerialization = new FlinkRowConverter(seaTunnelRowType);
+ }
+
+ @Override
+ public void collect(SeaTunnelRow record) {
+ try {
+ readerOutput.collect(rowSerialization.convert(record));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public Object getCheckpointLock() {
+ return this;
+ }
+}
diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSource.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSource.java
new file mode 100644
index 000000000000..fa28850a01bc
--- /dev/null
+++ b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSource.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.translation.flink.source;
+
+import org.apache.seatunnel.api.serialization.Serializer;
+import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceSplit;
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.translation.flink.serialization.FlinkSimpleVersionedSerializer;
+import org.apache.seatunnel.translation.flink.utils.TypeConverterUtils;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.types.Row;
+
+import java.io.Serializable;
+
+public class FlinkSource
+ implements Source, EnumStateT>, ResultTypeQueryable {
+
+ private final SeaTunnelSource source;
+
+ public FlinkSource(SeaTunnelSource source) {
+ this.source = source;
+ }
+
+ @Override
+ public Boundedness getBoundedness() {
+ org.apache.seatunnel.api.source.Boundedness boundedness = source.getBoundedness();
+ return boundedness == org.apache.seatunnel.api.source.Boundedness.BOUNDED
+ ? Boundedness.BOUNDED
+ : Boundedness.CONTINUOUS_UNBOUNDED;
+ }
+
+ @Override
+ public SourceReader> createReader(SourceReaderContext readerContext)
+ throws Exception {
+ org.apache.seatunnel.api.source.SourceReader.Context context =
+ new FlinkSourceReaderContext(readerContext, source);
+ org.apache.seatunnel.api.source.SourceReader reader =
+ source.createReader(context);
+ return new FlinkSourceReader<>(reader, (SeaTunnelRowType) source.getProducedType());
+ }
+
+ @Override
+ public SplitEnumerator, EnumStateT> createEnumerator(
+ SplitEnumeratorContext> enumContext) throws Exception {
+ SourceSplitEnumerator.Context context =
+ new FlinkSourceSplitEnumeratorContext<>(enumContext);
+ SourceSplitEnumerator enumerator = source.createEnumerator(context);
+ return new FlinkSourceEnumerator<>(enumerator, enumContext);
+ }
+
+ @Override
+ public SplitEnumerator, EnumStateT> restoreEnumerator(
+ SplitEnumeratorContext> enumContext, EnumStateT checkpoint)
+ throws Exception {
+ FlinkSourceSplitEnumeratorContext context =
+ new FlinkSourceSplitEnumeratorContext<>(enumContext);
+ SourceSplitEnumerator enumerator =
+ source.restoreEnumerator(context, checkpoint);
+ return new FlinkSourceEnumerator<>(enumerator, enumContext);
+ }
+
+ @Override
+ public SimpleVersionedSerializer> getSplitSerializer() {
+ return new SplitWrapperSerializer<>(source.getSplitSerializer());
+ }
+
+ @Override
+ public SimpleVersionedSerializer getEnumeratorCheckpointSerializer() {
+ Serializer enumeratorStateSerializer = source.getEnumeratorStateSerializer();
+ return new FlinkSimpleVersionedSerializer<>(enumeratorStateSerializer);
+ }
+
+ @Override
+ public TypeInformation getProducedType() {
+ return (TypeInformation) TypeConverterUtils.convert(source.getProducedType());
+ }
+}
diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSourceEnumerator.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSourceEnumerator.java
new file mode 100644
index 000000000000..cb66a6858c93
--- /dev/null
+++ b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSourceEnumerator.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.translation.flink.source;
+
+import org.apache.seatunnel.api.source.SourceSplit;
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class FlinkSourceEnumerator
+ implements SplitEnumerator, EnumStateT> {
+
+ private final SourceSplitEnumerator sourceSplitEnumerator;
+
+ private final SplitEnumeratorContext> enumeratorContext;
+
+ private boolean isRun = false;
+
+ public FlinkSourceEnumerator(
+ SourceSplitEnumerator enumerator,
+ SplitEnumeratorContext> enumContext) {
+ this.sourceSplitEnumerator = enumerator;
+ this.enumeratorContext = enumContext;
+ }
+
+ @Override
+ public void start() {
+ sourceSplitEnumerator.open();
+ }
+
+ @Override
+ public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) {
+ sourceSplitEnumerator.handleSplitRequest(subtaskId);
+ }
+
+ @Override
+ public void addSplitsBack(List> splits, int subtaskId) {
+ sourceSplitEnumerator.addSplitsBack(
+ splits.stream().map(SplitWrapper::getSourceSplit).collect(Collectors.toList()),
+ subtaskId);
+ }
+
+ @Override
+ public void addReader(int subtaskId) {
+ sourceSplitEnumerator.registerReader(subtaskId);
+ if (!isRun) {
+ try {
+ sourceSplitEnumerator.run();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ isRun = true;
+ }
+ }
+
+ @Override
+ public EnumStateT snapshotState(long checkpointId) throws Exception {
+ return sourceSplitEnumerator.snapshotState(checkpointId);
+ }
+
+ @Override
+ public void close() throws IOException {
+ sourceSplitEnumerator.close();
+ }
+}
diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSourceReader.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSourceReader.java
new file mode 100644
index 000000000000..6bcd4a767dbb
--- /dev/null
+++ b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSourceReader.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.translation.flink.source;
+
+import org.apache.seatunnel.api.source.SourceSplit;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.core.io.InputStatus;
+import org.apache.flink.types.Row;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+public class FlinkSourceReader
+ implements SourceReader> {
+
+ private final org.apache.seatunnel.api.source.SourceReader sourceReader;
+
+ private final SeaTunnelRowType seaTunnelRowType;
+
+ private FlinkRowCollector flinkRowCollector = null;
+
+ private InputStatus inputStatus = InputStatus.MORE_AVAILABLE;
+
+ public FlinkSourceReader(
+ org.apache.seatunnel.api.source.SourceReader sourceReader,
+ SeaTunnelRowType seaTunnelRowType) {
+ this.sourceReader = sourceReader;
+ this.seaTunnelRowType = seaTunnelRowType;
+ }
+
+ @Override
+ public void start() {
+ try {
+ sourceReader.open();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public InputStatus pollNext(ReaderOutput output) throws Exception {
+ if (Objects.isNull(flinkRowCollector)) {
+ flinkRowCollector = new FlinkRowCollector(output, seaTunnelRowType);
+ }
+ sourceReader.pollNext(flinkRowCollector);
+ return inputStatus;
+ }
+
+ @Override
+ public List> snapshotState(long checkpointId) {
+ try {
+ List splitTS = sourceReader.snapshotState(checkpointId);
+ return splitTS.stream().map(SplitWrapper::new).collect(Collectors.toList());
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public CompletableFuture isAvailable() {
+ return CompletableFuture.completedFuture(null);
+ }
+
+ @Override
+ public void addSplits(List> splits) {
+ sourceReader.addSplits(
+ splits.stream().map(SplitWrapper::getSourceSplit).collect(Collectors.toList()));
+ }
+
+ @Override
+ public void notifyNoMoreSplits() {
+ sourceReader.handleNoMoreSplits();
+ inputStatus = InputStatus.END_OF_INPUT;
+ }
+
+ @Override
+ public void close() throws Exception {
+ sourceReader.close();
+ }
+}
diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSourceReaderContext.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSourceReaderContext.java
new file mode 100644
index 000000000000..bbd3c10d017e
--- /dev/null
+++ b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSourceReaderContext.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.translation.flink.source;
+
+import org.apache.seatunnel.api.common.metrics.MetricsContext;
+import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceEvent;
+import org.apache.seatunnel.api.source.SourceReader;
+
+import org.apache.flink.api.connector.source.SourceReaderContext;
+
+public class FlinkSourceReaderContext implements SourceReader.Context {
+
+ private final SourceReaderContext readerContext;
+
+ private final SeaTunnelSource source;
+
+ public FlinkSourceReaderContext(SourceReaderContext readerContext, SeaTunnelSource source) {
+ this.readerContext = readerContext;
+ this.source = source;
+ }
+
+ @Override
+ public int getIndexOfSubtask() {
+ return readerContext.getIndexOfSubtask();
+ }
+
+ @Override
+ public org.apache.seatunnel.api.source.Boundedness getBoundedness() {
+ return source.getBoundedness();
+ }
+
+ @Override
+ public void signalNoMoreElement() {}
+
+ @Override
+ public void sendSplitRequest() {
+ readerContext.sendSplitRequest();
+ }
+
+ @Override
+ public void sendSourceEventToEnumerator(SourceEvent sourceEvent) {
+ readerContext.sendSourceEventToCoordinator(new SourceEventWrapper(sourceEvent));
+ }
+
+ @Override
+ public MetricsContext getMetricsContext() {
+ return null;
+ }
+}
diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSourceSplitEnumeratorContext.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSourceSplitEnumeratorContext.java
new file mode 100644
index 000000000000..b7b9df137a09
--- /dev/null
+++ b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSourceSplitEnumeratorContext.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.translation.flink.source;
+
+import org.apache.seatunnel.api.common.metrics.MetricsContext;
+import org.apache.seatunnel.api.source.SourceEvent;
+import org.apache.seatunnel.api.source.SourceSplit;
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+
+import java.util.List;
+import java.util.Set;
+
+public class FlinkSourceSplitEnumeratorContext
+ implements SourceSplitEnumerator.Context {
+
+ private final SplitEnumeratorContext> enumContext;
+
+ public FlinkSourceSplitEnumeratorContext(
+ SplitEnumeratorContext> enumContext) {
+ this.enumContext = enumContext;
+ }
+
+ @Override
+ public int currentParallelism() {
+ return enumContext.currentParallelism();
+ }
+
+ @Override
+ public Set registeredReaders() {
+ return enumContext.registeredReaders().keySet();
+ }
+
+ @Override
+ public void assignSplit(int subtaskId, List splits) {
+ splits.forEach(
+ split -> {
+ enumContext.assignSplit(new SplitWrapper<>(split), subtaskId);
+ });
+ }
+
+ @Override
+ public void signalNoMoreSplits(int subtask) {
+ int parallelism = enumContext.currentParallelism();
+ for (int i = 0; i < parallelism; i++) {
+ enumContext.signalNoMoreSplits(subtask);
+ }
+ }
+
+ @Override
+ public void sendEventToSourceReader(int subtaskId, SourceEvent event) {
+ enumContext.sendEventToSourceReader(subtaskId, new SourceEventWrapper(event));
+ }
+
+ @Override
+ public MetricsContext getMetricsContext() {
+ return null;
+ }
+}
diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/SourceEventWrapper.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/SourceEventWrapper.java
new file mode 100644
index 000000000000..8625c7e9be76
--- /dev/null
+++ b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/SourceEventWrapper.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.translation.flink.source;
+
+import org.apache.flink.api.connector.source.SourceEvent;
+
+public class SourceEventWrapper implements SourceEvent {
+
+ private final org.apache.seatunnel.api.source.SourceEvent sourceEvent;
+
+ public SourceEventWrapper(org.apache.seatunnel.api.source.SourceEvent sourceEvent) {
+ this.sourceEvent = sourceEvent;
+ }
+
+ public org.apache.seatunnel.api.source.SourceEvent getSourceEvent() {
+ return sourceEvent;
+ }
+}
diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/SplitWrapper.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/SplitWrapper.java
new file mode 100644
index 000000000000..991037d082f6
--- /dev/null
+++ b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/SplitWrapper.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.translation.flink.source;
+
+import org.apache.flink.api.connector.source.SourceSplit;
+
+public class SplitWrapper
+ implements SourceSplit {
+
+ private final T sourceSplit;
+
+ public SplitWrapper(T sourceSplit) {
+ this.sourceSplit = sourceSplit;
+ }
+
+ public T getSourceSplit() {
+ return sourceSplit;
+ }
+
+ @Override
+ public String splitId() {
+ return sourceSplit.splitId();
+ }
+}
diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/SplitWrapperSerializer.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/SplitWrapperSerializer.java
new file mode 100644
index 000000000000..f04c9d360550
--- /dev/null
+++ b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/SplitWrapperSerializer.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.translation.flink.source;
+
+import org.apache.seatunnel.api.serialization.Serializer;
+import org.apache.seatunnel.api.source.SourceSplit;
+
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+public class SplitWrapperSerializer
+ implements SimpleVersionedSerializer> {
+
+ private final Serializer serializer;
+
+ public SplitWrapperSerializer(Serializer serializer) {
+ this.serializer = serializer;
+ }
+
+ @Override
+ public int getVersion() {
+ return 0;
+ }
+
+ @Override
+ public byte[] serialize(SplitWrapper obj) throws IOException {
+ try (final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ final DataOutputStream out = new DataOutputStream(baos)) {
+ byte[] serialize = serializer.serialize(obj.getSourceSplit());
+ out.writeInt(serialize.length);
+ out.write(serialize);
+ out.flush();
+ return baos.toByteArray();
+ }
+ }
+
+ @Override
+ public SplitWrapper deserialize(int version, byte[] serialized) throws IOException {
+ try (final ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
+ final DataInputStream in = new DataInputStream(bais)) {
+ final int size = in.readInt();
+ final byte[] stateBytes = new byte[size];
+ in.read(stateBytes);
+ SplitT split = serializer.deserialize(stateBytes);
+ return new SplitWrapper<>(split);
+ }
+ }
+}