Skip to content

Commit

Permalink
[Feature][Flink] Add external configuration parameters (#5480)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhilinli123 authored Oct 26, 2023
1 parent df07159 commit 90bc2fe
Show file tree
Hide file tree
Showing 5 changed files with 254 additions and 0 deletions.
79 changes: 79 additions & 0 deletions docs/en/other-engine/flink.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
# Seatunnel runs on Flink

Flink is a powerful high-performance distributed stream processing engine,More information about it you can,You can search for `Apacke Flink`

### Set Flink configuration information in the job

Begin with `flink.`

Example:
I set a precise Checkpoint for this job

```
env {
execution.parallelism = 1
flink.execution.checkpointing.unaligned.enabled=true
}
```

Enumeration types are not currently supported, you need to specify them in the Flink conf file ,Only these types of Settings are supported for the time being:<br/>
Integer/Boolean/String/Duration

### How to set up a simple Flink job

This is a simple job that runs on Flink Randomly generated data is printed to the console

```
env {
execution.parallelism = 1
flink.execution.checkpointing.interval=5000
}
source {
FakeSource {
row.num = 16
result_table_name = "fake_table"
schema = {
fields {
c_map = "map<string, string>"
c_array = "array<int>"
c_string = string
c_boolean = boolean
c_int = int
c_bigint = bigint
c_double = double
c_bytes = bytes
c_date = date
c_decimal = "decimal(33, 18)"
c_timestamp = timestamp
c_row = {
c_map = "map<string, string>"
c_array = "array<int>"
c_string = string
c_boolean = boolean
c_int = int
c_bigint = bigint
c_double = double
c_bytes = bytes
c_date = date
c_decimal = "decimal(33, 18)"
c_timestamp = timestamp
}
}
}
}
}
transform {
# If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
# please go to https://seatunnel.apache.org/docs/transform-v2/sql
}
sink{
Console{}
}
```

### How to run a job in a project

After you pull the code to the local, go to the `seatunnel-examples/seatunnel-flink-connector-v2-example` module find `org.apache.seatunnel.example.flink.v2.SeaTunnelApiExample` To complete the operation of the job
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,12 @@
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>connector-common</artifactId>
<version>${project.version}</version>
</dependency>

</dependencies>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.seatunnel.core.starter.flink.utils;

import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigValue;

import org.apache.seatunnel.common.config.CheckResult;

Expand All @@ -29,6 +30,7 @@

import lombok.extern.slf4j.Slf4j;

import java.util.Map;
import java.util.concurrent.TimeUnit;

@Slf4j
Expand Down Expand Up @@ -116,5 +118,16 @@ public static void initConfiguration(Config config, Configuration configuration)
PipelineOptions.CLASSPATHS.key(), pipeline.getString("classpaths"));
}
}
String prefixConf = "flink.";
if (!config.isEmpty()) {
for (Map.Entry<String, ConfigValue> entryConfKey : config.entrySet()) {
String confKey = entryConfKey.getKey().trim();

if (confKey.startsWith(prefixConf)) {
configuration.setString(
confKey.replaceFirst(prefixConf, ""), entryConfKey.getValue().render());
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seatunnel.core.starter.flink;

import org.apache.seatunnel.shade.com.typesafe.config.Config;

import org.apache.seatunnel.common.config.DeployMode;
import org.apache.seatunnel.core.starter.flink.args.FlinkCommandArgs;
import org.apache.seatunnel.core.starter.flink.utils.EnvironmentUtil;
import org.apache.seatunnel.core.starter.utils.ConfigBuilder;
import org.apache.seatunnel.core.starter.utils.FileUtils;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;

public class TestFlinkParameter {

@Test
public void testFlinkParameter() throws Exception {
// Verified Map
List<String> checkList = new ArrayList<>();
checkList.add("execution.checkpointing.interval=5000");
checkList.add("execution.checkpointing.unaligned.enabled=true");
checkList.add("execution.checkpointing.aligned-checkpoint-timeout=100000");
checkList.add("jobstore.cache-size=52428801");
checkList.add("state.backend.rocksdb.predefined-options=SPINNING_DISK_OPTIMIZED_HIGH_MEM");
FlinkCommandArgs flinkCommandArgs = new FlinkCommandArgs();
flinkCommandArgs.setDeployMode(DeployMode.RUN);
flinkCommandArgs.setJobName("SeaTunnelFlinkParameter");
flinkCommandArgs.setEncrypt(false);
flinkCommandArgs.setDecrypt(false);
flinkCommandArgs.setHelp(false);
flinkCommandArgs.setConfigFile("src/test/java/resources/test_flink_run_parameter.conf");
flinkCommandArgs.setVariables(null);
Path configFile = FileUtils.getConfigPath(flinkCommandArgs);
Config config = ConfigBuilder.of(configFile).getConfig("env");

// set Flink Configuration
Configuration configurations = new Configuration();
EnvironmentUtil.initConfiguration(config, configurations);
StreamExecutionEnvironment executionEnvironment =
StreamExecutionEnvironment.getExecutionEnvironment(configurations);
List<String> ExternalSettingLists = new ArrayList<>();
// Replace excess conceits for easy validation of parameters
String[] split =
executionEnvironment
.getConfiguration()
.toString()
.replaceAll(" ", "")
.replaceAll("\\{", "")
.replaceAll("\\}", "")
.replaceAll("\"", "")
.trim()
.split(",");
for (String value : split) {
if (checkList.contains(value)) {
ExternalSettingLists.add(value);
}
}
// Sort keeping order
checkList.sort(null);
ExternalSettingLists.sort(null);
Assertions.assertIterableEquals(checkList, ExternalSettingLists);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#


env {
execution.parallelism = 1
flink.execution.checkpointing.interval=5000
flink.execution.checkpointing.unaligned.enabled=true
flink.execution.checkpointing.aligned-checkpoint-timeout=100000
flink.jobstore.cache-size=52428801
flink.state.backend.rocksdb.predefined-options=SPINNING_DISK_OPTIMIZED_HIGH_MEM

}

source {
FakeSource {
row.num = 16
schema = {
fields {
c_map = "map<string, string>"
c_array = "array<int>"
c_string = string
c_boolean = boolean
c_int = int
c_bigint = bigint
c_double = double
c_bytes = bytes
c_date = date
c_decimal = "decimal(33, 18)"
c_timestamp = timestamp
c_row = {
c_map = "map<string, string>"
c_array = "array<int>"
c_string = string
c_boolean = boolean
c_int = int
c_bigint = bigint
c_double = double
c_bytes = bytes
c_date = date
c_decimal = "decimal(33, 18)"
c_timestamp = timestamp
}
}
}
}
}

transform {
# If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
# please go to https://seatunnel.apache.org/docs/transform-v2/sql
}

sink{
Console{}
}

0 comments on commit 90bc2fe

Please sign in to comment.