Skip to content

Commit

Permalink
[Feature][Core] Upgrade flink source translation
Browse files Browse the repository at this point in the history
  • Loading branch information
TyrantLucifer committed Jul 27, 2023
1 parent 3c02b32 commit 573b25c
Show file tree
Hide file tree
Showing 12 changed files with 656 additions and 20 deletions.
41 changes: 24 additions & 17 deletions seatunnel-examples/seatunnel-flink-connector-v2-example/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@

<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-flink-13-starter</artifactId>
<artifactId>seatunnel-flink-15-starter</artifactId>
<version>${project.version}</version>
</dependency>

Expand Down Expand Up @@ -72,39 +72,46 @@
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.1.13.6.version}</version>
<version>${flink.1.15.3.version}</version>
<scope>${flink.scope}</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<version>${flink.1.13.6.version}</version>
<scope>${flink.scope}</scope>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.1.15.3.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
<version>${flink.1.13.6.version}</version>
<scope>${flink.scope}</scope>
<artifactId>flink-table-api-java</artifactId>
<version>${flink.1.15.3.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.1.13.6.version}</version>
<scope>${flink.scope}</scope>
<artifactId>flink-table-api-java-bridge</artifactId>
<version>${flink.1.15.3.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.1.13.6.version}</version>
<scope>${flink.scope}</scope>
<artifactId>flink-table-planner-loader</artifactId>
<version>1.15.3</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web_${scala.binary.version}</artifactId>
<version>${flink.1.13.6.version}</version>
<scope>${flink.scope}</scope>
<artifactId>flink-table-runtime</artifactId>
<version>1.15.3</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>1.15.3</version>
</dependency>

<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>mockwebserver</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,15 @@

env {
# You can set flink configuration here
job.mode = "BATCH"
execution.parallelism = 2
job.mode = "STREAMING"
#execution.checkpoint.interval = 10000
#execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
}

source {
# This is a example source plugin **only for test and demonstrate the feature source plugin**
FakeSource {
parallelism = 2
result_table_name = "fake"
row.num = 16
schema = {
Expand All @@ -52,7 +51,6 @@ transform {

sink {
Console {
parallelism = 3
}

# If you would like to get more information about how to configure seatunnel and see full list of sink plugins,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,13 @@
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>${flink.1.15.3.version}</version>
<scope>provided</scope>
</dependency>

</dependencies>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.translation.flink.source;

import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.translation.flink.serialization.FlinkRowConverter;

import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.types.Row;

public class FlinkRowCollector implements Collector<SeaTunnelRow> {

private final ReaderOutput<Row> readerOutput;

private final FlinkRowConverter rowSerialization;

public FlinkRowCollector(ReaderOutput<Row> readerOutput, SeaTunnelRowType seaTunnelRowType) {
this.readerOutput = readerOutput;
this.rowSerialization = new FlinkRowConverter(seaTunnelRowType);
}

@Override
public void collect(SeaTunnelRow record) {
try {
readerOutput.collect(rowSerialization.convert(record));
} catch (Exception e) {
throw new RuntimeException(e);
}
}

@Override
public Object getCheckpointLock() {
return this;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.translation.flink.source;

import org.apache.seatunnel.api.serialization.Serializer;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceSplit;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.translation.flink.serialization.FlinkSimpleVersionedSerializer;
import org.apache.seatunnel.translation.flink.utils.TypeConverterUtils;

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.types.Row;

import java.io.Serializable;

public class FlinkSource<SplitT extends SourceSplit, EnumStateT extends Serializable>
implements Source<Row, SplitWrapper<SplitT>, EnumStateT>, ResultTypeQueryable<Row> {

private final SeaTunnelSource<SeaTunnelRow, SplitT, EnumStateT> source;

public FlinkSource(SeaTunnelSource<SeaTunnelRow, SplitT, EnumStateT> source) {
this.source = source;
}

@Override
public Boundedness getBoundedness() {
org.apache.seatunnel.api.source.Boundedness boundedness = source.getBoundedness();
return boundedness == org.apache.seatunnel.api.source.Boundedness.BOUNDED
? Boundedness.BOUNDED
: Boundedness.CONTINUOUS_UNBOUNDED;
}

@Override
public SourceReader<Row, SplitWrapper<SplitT>> createReader(SourceReaderContext readerContext)
throws Exception {
org.apache.seatunnel.api.source.SourceReader.Context context =
new FlinkSourceReaderContext(readerContext, source);
org.apache.seatunnel.api.source.SourceReader<SeaTunnelRow, SplitT> reader =
source.createReader(context);
return new FlinkSourceReader<>(reader, (SeaTunnelRowType) source.getProducedType());
}

@Override
public SplitEnumerator<SplitWrapper<SplitT>, EnumStateT> createEnumerator(
SplitEnumeratorContext<SplitWrapper<SplitT>> enumContext) throws Exception {
SourceSplitEnumerator.Context<SplitT> context =
new FlinkSourceSplitEnumeratorContext<>(enumContext);
SourceSplitEnumerator<SplitT, EnumStateT> enumerator = source.createEnumerator(context);
return new FlinkSourceEnumerator<>(enumerator, enumContext);
}

@Override
public SplitEnumerator<SplitWrapper<SplitT>, EnumStateT> restoreEnumerator(
SplitEnumeratorContext<SplitWrapper<SplitT>> enumContext, EnumStateT checkpoint)
throws Exception {
FlinkSourceSplitEnumeratorContext<SplitT> context =
new FlinkSourceSplitEnumeratorContext<>(enumContext);
SourceSplitEnumerator<SplitT, EnumStateT> enumerator =
source.restoreEnumerator(context, checkpoint);
return new FlinkSourceEnumerator<>(enumerator, enumContext);
}

@Override
public SimpleVersionedSerializer<SplitWrapper<SplitT>> getSplitSerializer() {
return new SplitWrapperSerializer<>(source.getSplitSerializer());
}

@Override
public SimpleVersionedSerializer<EnumStateT> getEnumeratorCheckpointSerializer() {
Serializer<EnumStateT> enumeratorStateSerializer = source.getEnumeratorStateSerializer();
return new FlinkSimpleVersionedSerializer<>(enumeratorStateSerializer);
}

@Override
public TypeInformation<Row> getProducedType() {
return (TypeInformation<Row>) TypeConverterUtils.convert(source.getProducedType());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.translation.flink.source;

import org.apache.seatunnel.api.source.SourceSplit;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;

import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;

import javax.annotation.Nullable;

import java.io.IOException;
import java.util.List;
import java.util.stream.Collectors;

public class FlinkSourceEnumerator<SplitT extends SourceSplit, EnumStateT>
implements SplitEnumerator<SplitWrapper<SplitT>, EnumStateT> {

private final SourceSplitEnumerator<SplitT, EnumStateT> sourceSplitEnumerator;

private final SplitEnumeratorContext<SplitWrapper<SplitT>> enumeratorContext;

private boolean isRun = false;

public FlinkSourceEnumerator(
SourceSplitEnumerator<SplitT, EnumStateT> enumerator,
SplitEnumeratorContext<SplitWrapper<SplitT>> enumContext) {
this.sourceSplitEnumerator = enumerator;
this.enumeratorContext = enumContext;
}

@Override
public void start() {
sourceSplitEnumerator.open();
}

@Override
public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) {
sourceSplitEnumerator.handleSplitRequest(subtaskId);
}

@Override
public void addSplitsBack(List<SplitWrapper<SplitT>> splits, int subtaskId) {
sourceSplitEnumerator.addSplitsBack(
splits.stream().map(SplitWrapper::getSourceSplit).collect(Collectors.toList()),
subtaskId);
}

@Override
public void addReader(int subtaskId) {
sourceSplitEnumerator.registerReader(subtaskId);
if (!isRun) {
try {
sourceSplitEnumerator.run();
} catch (Exception e) {
throw new RuntimeException(e);
}
isRun = true;
}
}

@Override
public EnumStateT snapshotState(long checkpointId) throws Exception {
return sourceSplitEnumerator.snapshotState(checkpointId);
}

@Override
public void close() throws IOException {
sourceSplitEnumerator.close();
}
}
Loading

0 comments on commit 573b25c

Please sign in to comment.