Skip to content

Commit

Permalink
[Feature][Connector-v2] Support S3 filesystem of paimon connector (#8036
Browse files Browse the repository at this point in the history
)
  • Loading branch information
dailai authored Nov 14, 2024
1 parent 1cee437 commit e2a4772
Show file tree
Hide file tree
Showing 12 changed files with 596 additions and 10 deletions.
54 changes: 54 additions & 0 deletions docs/en/connector-v2/sink/Paimon.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,13 @@ All `changelog-producer` modes are currently supported. The default is `none`.
> note:
> When you use a streaming mode to read paimon table,different mode will produce [different results](https://github.com/apache/seatunnel/blob/dev/docs/en/connector-v2/source/Paimon.md#changelog)
## Filesystems
The Paimon connector supports writing data to multiple file systems. Currently, the supported file systems are hdfs and s3.
If you use the s3 filesystem. You can configure the `fs.s3a.access-key``fs.s3a.secret-key``fs.s3a.endpoint``fs.s3a.path.style.access``fs.s3a.aws.credentials.provider` properties in the `paimon.hadoop.conf` option.
Besides, the warehouse should start with `s3a://`.



## Examples

### Single table
Expand Down Expand Up @@ -94,6 +101,53 @@ sink {
}
```

### Single table with s3 filesystem

```hocon
env {
execution.parallelism = 1
job.mode = "BATCH"
}
source {
FakeSource {
schema = {
fields {
c_map = "map<string, string>"
c_array = "array<int>"
c_string = string
c_boolean = boolean
c_tinyint = tinyint
c_smallint = smallint
c_int = int
c_bigint = bigint
c_float = float
c_double = double
c_bytes = bytes
c_date = date
c_decimal = "decimal(38, 18)"
c_timestamp = timestamp
}
}
}
}
sink {
Paimon {
warehouse = "s3a://test/"
database = "seatunnel_namespace11"
table = "st_test"
paimon.hadoop.conf = {
fs.s3a.access-key=G52pnxg67819khOZ9ezX
fs.s3a.secret-key=SHJuAQqHsLrgZWikvMa3lJf5T0NfM5LMFliJh9HF
fs.s3a.endpoint="http://minio4:9000"
fs.s3a.path.style.access=true
fs.s3a.aws.credentials.provider=org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider
}
}
}
```

### Single table(Specify hadoop HA config and kerberos config)

```hocon
Expand Down
32 changes: 32 additions & 0 deletions docs/en/connector-v2/source/Paimon.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,11 @@ Properties in hadoop conf

The specified loading path for the 'core-site.xml', 'hdfs-site.xml', 'hive-site.xml' files

## Filesystems
The Paimon connector supports writing data to multiple file systems. Currently, the supported file systems are hdfs and s3.
If you use the s3 filesystem. You can configure the `fs.s3a.access-key``fs.s3a.secret-key``fs.s3a.endpoint``fs.s3a.path.style.access``fs.s3a.aws.credentials.provider` properties in the `paimon.hadoop.conf` option.
Besides, the warehouse should start with `s3a://`.

## Examples

### Simple example
Expand Down Expand Up @@ -109,6 +114,33 @@ source {
}
```

### S3 example
```hocon
env {
execution.parallelism = 1
job.mode = "BATCH"
}
source {
Paimon {
warehouse = "s3a://test/"
database = "seatunnel_namespace11"
table = "st_test"
paimon.hadoop.conf = {
fs.s3a.access-key=G52pnxg67819khOZ9ezX
fs.s3a.secret-key=SHJuAQqHsLrgZWikvMa3lJf5T0NfM5LMFliJh9HF
fs.s3a.endpoint="http://minio4:9000"
fs.s3a.path.style.access=true
fs.s3a.aws.credentials.provider=org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider
}
}
}
sink {
Console{}
}
```

### Hadoop conf example

```hocon
Expand Down
54 changes: 53 additions & 1 deletion docs/zh/connector-v2/sink/Paimon.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,12 @@ Paimon表的changelog产生模式有[四种](https://paimon.apache.org/docs/mast
* [`lookup`](https://paimon.apache.org/docs/master/primary-key-table/changelog-producer/#lookup)
* [`full-compaction`](https://paimon.apache.org/docs/master/primary-key-table/changelog-producer/#full-compaction)
> 注意:
> 当你使用流模式去读paimon表的数据时,不同模式将会产生[不同的结果](https://github.com/apache/seatunnel/blob/dev/docs/en/connector-v2/source/Paimon.md#changelog)
> 当你使用流模式去读paimon表的数据时,不同模式将会产生[不同的结果](https://github.com/apache/seatunnel/blob/dev/docs/en/connector-v2/source/Paimon.md#changelog)
## 文件系统
Paimon连接器支持向多文件系统写入数据。目前支持的文件系统有hdfs和s3。
如果您使用s3文件系统。您可以配置`fs.s3a.access-key ``fs.s3a.secret-key``fs.s3a.endpoint``fs.s3a.path.style.access``fs.s3a.aws.credentials`。在`paimon.hadoop.conf`选项中设置提供程序的属性。
除此之外,warehouse应该以`s3a://`开头。

## 示例

Expand Down Expand Up @@ -93,6 +98,53 @@ sink {
}
```

### 单表(基于S3文件系统)

```hocon
env {
execution.parallelism = 1
job.mode = "BATCH"
}
source {
FakeSource {
schema = {
fields {
c_map = "map<string, string>"
c_array = "array<int>"
c_string = string
c_boolean = boolean
c_tinyint = tinyint
c_smallint = smallint
c_int = int
c_bigint = bigint
c_float = float
c_double = double
c_bytes = bytes
c_date = date
c_decimal = "decimal(38, 18)"
c_timestamp = timestamp
}
}
}
}
sink {
Paimon {
warehouse = "s3a://test/"
database = "seatunnel_namespace11"
table = "st_test"
paimon.hadoop.conf = {
fs.s3a.access-key=G52pnxg67819khOZ9ezX
fs.s3a.secret-key=SHJuAQqHsLrgZWikvMa3lJf5T0NfM5LMFliJh9HF
fs.s3a.endpoint="http://minio4:9000"
fs.s3a.path.style.access=true
fs.s3a.aws.credentials.provider=org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider
}
}
}
```

### 单表(指定hadoop HA配置和kerberos配置)

```hocon
Expand Down
34 changes: 34 additions & 0 deletions seatunnel-connectors-v2/connector-paimon/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
<properties>
<paimon.version>0.7.0-incubating</paimon.version>
<hive.version>2.3.9</hive.version>
<connector.name>connector.paimon</connector.name>
</properties>

<dependencies>
Expand All @@ -47,6 +48,12 @@
<version>${paimon.version}</version>
</dependency>

<dependency>
<groupId>org.apache.paimon</groupId>
<artifactId>paimon-s3-impl</artifactId>
<version>${paimon.version}</version>
</dependency>

<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-guava</artifactId>
Expand Down Expand Up @@ -98,4 +105,31 @@

</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>shade</goal>
</goals>
<phase>package</phase>
<configuration>
<filters>
<filter>
<artifact>org.apache.paimon:paimon-s3-impl</artifact>
<excludes>
<exclude>org/apache/hadoop/**</exclude>
</excludes>
</filter>
</filters>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public class PaimonCatalogLoader implements Serializable {
private static final String HDFS_DEF_FS_NAME = "fs.defaultFS";

private static final String HDFS_PREFIX = "hdfs://";
private static final String S3A_PREFIX = "s3a://";
/** ********* Hdfs constants ************* */
private static final String HDFS_IMPL = "org.apache.hadoop.hdfs.DistributedFileSystem";

Expand All @@ -63,20 +64,20 @@ public PaimonCatalogLoader(PaimonConfig paimonConfig) {
}

public Catalog loadCatalog() {
// When using the seatunel engine, set the current class loader to prevent loading failures
// When using the seatunnel engine, set the current class loader to prevent loading failures
Thread.currentThread().setContextClassLoader(PaimonCatalogLoader.class.getClassLoader());
final Map<String, String> optionsMap = new HashMap<>(1);
optionsMap.put(CatalogOptions.WAREHOUSE.key(), warehouse);
optionsMap.put(CatalogOptions.METASTORE.key(), catalogType.getType());
if (warehouse.startsWith(HDFS_PREFIX)) {
checkConfiguration(paimonHadoopConfiguration, HDFS_DEF_FS_NAME);
paimonHadoopConfiguration.set(HDFS_IMPL_KEY, HDFS_IMPL);
} else if (warehouse.startsWith(S3A_PREFIX)) {
optionsMap.putAll(paimonHadoopConfiguration.getPropsWithPrefix(StringUtils.EMPTY));
}
if (PaimonCatalogEnum.HIVE.getType().equals(catalogType.getType())) {
optionsMap.put(CatalogOptions.URI.key(), catalogUri);
paimonHadoopConfiguration
.getPropsWithPrefix(StringUtils.EMPTY)
.forEach((k, v) -> optionsMap.put(k, v));
optionsMap.putAll(paimonHadoopConfiguration.getPropsWithPrefix(StringUtils.EMPTY));
}
final Options options = Options.fromMap(optionsMap);
PaimonSecurityContext.shouldEnableKerberos(paimonHadoopConfiguration);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.paimon.filesystem;

import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.FileIOLoader;
import org.apache.paimon.fs.Path;
import org.apache.paimon.s3.S3FileIO;

import java.util.ArrayList;
import java.util.List;

public class S3Loader implements FileIOLoader {
@Override
public String getScheme() {
return "s3a";
}

@Override
public List<String[]> requiredOptions() {
List<String[]> options = new ArrayList<>();
options.add(new String[] {"fs.s3a.access-key", "fs.s3a.access.key"});
options.add(new String[] {"fs.s3a.secret-key", "fs.s3a.secret.key"});
options.add(new String[] {"fs.s3a.endpoint", "fs.s3a.endpoint"});
return options;
}

@Override
public FileIO load(Path path) {
return new S3FileIO();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

org.apache.seatunnel.connectors.seatunnel.paimon.filesystem.S3Loader
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,32 @@
<artifactId>connector-paimon-e2e</artifactId>
<name>SeaTunnel : E2E : Connector V2 : Paimon</name>

<properties>
<testcontainer.version>1.19.1</testcontainer.version>
<minio.version>8.5.6</minio.version>
</properties>

<dependencies>
<!-- minio containers -->
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>connector-fake</artifactId>
<version>${project.version}</version>
<groupId>org.testcontainers</groupId>
<artifactId>minio</artifactId>
<version>${testcontainer.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.minio</groupId>
<artifactId>minio</artifactId>
<version>${minio.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>connector-paimon</artifactId>
<artifactId>connector-seatunnel-e2e-base</artifactId>
<version>${project.version}</version>
<classifier>tests</classifier>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
Expand All @@ -44,6 +59,18 @@
<classifier>optional</classifier>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>connector-fake</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>connector-paimon</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>connector-assert</artifactId>
Expand Down
Loading

0 comments on commit e2a4772

Please sign in to comment.