Skip to content

Commit

Permalink
[Improve][Sls] Add sls sink connector、e2e、doc (#7830)
Browse files Browse the repository at this point in the history
Co-authored-by: XenosK <[email protected]>
  • Loading branch information
XenosK and XenosK authored Oct 14, 2024
1 parent cdda747 commit 048c47d
Show file tree
Hide file tree
Showing 18 changed files with 650 additions and 2 deletions.
84 changes: 84 additions & 0 deletions docs/en/connector-v2/sink/Sls.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
# Sls

> Sls sink connector
## Support Those Engines

> Spark<br/>
> Flink<br/>
> Seatunnel Zeta<br/>
## Key Features

- [ ] [exactly-once](../../concept/connector-v2-features.md)
- [ ] [cdc](../../concept/connector-v2-features.md)

## Description

Sink connector for Aliyun Sls.

## Supported DataSource Info

In order to use the Sls connector, the following dependencies are required.
They can be downloaded via install-plugin.sh or from the Maven central repository.

| Datasource | Supported Versions | Maven |
|------------|--------------------|-----------------------------------------------------------------------------------|
| Sls | Universal | [Download](https://mvnrepository.com/artifact/org.apache.seatunnel/connector-sls) |

## Source Options

| Name | Type | Required | Default | Description |
|-------------------------------------|---------|----------|------------------|--------------------------------------------------------------------------------------------------------------------------------------------------|
| project | String | Yes | - | [Aliyun Sls Project](https://help.aliyun.com/zh/sls/user-guide/manage-a-project?spm=a2c4g.11186623.0.0.6f9755ebyfaYSl) |
| logstore | String | Yes | - | [Aliyun Sls Logstore](https://help.aliyun.com/zh/sls/user-guide/manage-a-logstore?spm=a2c4g.11186623.0.0.13137c08nfuiBC) |
| endpoint | String | Yes | - | [Aliyun Access Endpoint](https://help.aliyun.com/zh/sls/developer-reference/api-sls-2020-12-30-endpoint?spm=a2c4g.11186623.0.0.548945a8UyJULa) |
| access_key_id | String | Yes | - | [Aliyun AccessKey ID](https://help.aliyun.com/zh/ram/user-guide/create-an-accesskey-pair?spm=a2c4g.11186623.0.0.4a6e4e554CKhSc#task-2245479) |
| access_key_secret | String | Yes | - | [Aliyun AccessKey Secret](https://help.aliyun.com/zh/ram/user-guide/create-an-accesskey-pair?spm=a2c4g.11186623.0.0.4a6e4e554CKhSc#task-2245479) |
| source | String | No | SeaTunnel-Source | Data Source marking in sls |
| topic | String | No | SeaTunnel-Topic | Data topic marking in sls |

## Task Example

### Simple

> This example write data to the sls's logstore1.And if you have not yet installed and deployed SeaTunnel, you need to follow the instructions in Install SeaTunnel to install and deploy SeaTunnel. And if you have not yet installed and deployed SeaTunnel, you need to follow the instructions in [Install SeaTunnel](../../start-v2/locally/deployment.md) to install and deploy SeaTunnel. And then follow the instructions in [Quick Start With SeaTunnel Engine](../../start-v2/locally/quick-start-seatunnel-engine.md) to run this job.
[Create RAM user and authorization](https://help.aliyun.com/zh/sls/create-a-ram-user-and-authorize-the-ram-user-to-access-log-service?spm=a2c4g.11186623.0.i4),Please ensure thr ram user have sufficient rights to perform, reference [RAM Custom Authorization Example](https://help.aliyun.com/zh/sls/use-custom-policies-to-grant-permissions-to-a-ram-user?spm=a2c4g.11186623.0.0.4a6e4e554CKhSc#reference-s3z-m1l-z2b)

```hocon
# Defining the runtime environment
env {
parallelism = 2
job.mode = "STREAMING"
checkpoint.interval = 30000
}
source {
FakeSource {
row.num = 10
map.size = 10
array.size = 10
bytes.length = 10
string.length = 10
schema = {
fields = {
id = "int"
name = "string"
description = "string"
weight = "string"
}
}
}
}
sink {
Sls {
endpoint = "cn-hangzhou-intranet.log.aliyuncs.com"
project = "project1"
logstore = "logstore1"
access_key_id = "xxxxxxxxxxxxxxxxxxxxxxxx"
access_key_secret = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
}
}
```

84 changes: 84 additions & 0 deletions docs/zh/connector-v2/sink/Sls.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
# Sls

> Sls sink connector
## Support Those Engines

> Spark<br/>
> Flink<br/>
> Seatunnel Zeta<br/>
## 主要特性

- [ ] [exactly-once](../../concept/connector-v2-features.md)
- [ ] [cdc](../../concept/connector-v2-features.md)

## 描述

Sink connector for Aliyun Sls.

从写入数据到阿里云Sls日志服务

为了使用Sls连接器,需要以下依赖关系。
它们可以通过install-plugin.sh或Maven中央存储库下载。

| Datasource | Supported Versions | Maven |
|------------|--------------------|-----------------------------------------------------------------------------------|
| Sls | Universal | [Download](https://mvnrepository.com/artifact/org.apache.seatunnel/connector-sls) |

## 支持的数据源信息

| Name | Type | Required | Default | Description |
|-------------------------------------|----------|----------|-------------------|------------------------------------------------------------------------------------------------------------------------------------|
| project | String | Yes | - | [阿里云 Sls 项目](https://help.aliyun.com/zh/sls/user-guide/manage-a-project?spm=a2c4g.11186623.0.0.6f9755ebyfaYSl) |
| logstore | String | Yes | - | [阿里云 Sls 日志库](https://help.aliyun.com/zh/sls/user-guide/manage-a-logstore?spm=a2c4g.11186623.0.0.13137c08nfuiBC) |
| endpoint | String | Yes | - | [阿里云访问服务点](https://help.aliyun.com/zh/sls/developer-reference/api-sls-2020-12-30-endpoint?spm=a2c4g.11186623.0.0.548945a8UyJULa) |
| access_key_id | String | Yes | - | [阿里云访问用户ID](https://help.aliyun.com/zh/ram/user-guide/create-an-accesskey-pair?spm=a2c4g.11186623.0.0.4a6e4e554CKhSc#task-2245479) |
| access_key_secret | String | Yes | - | [阿里云访问用户密码](https://help.aliyun.com/zh/ram/user-guide/create-an-accesskey-pair?spm=a2c4g.11186623.0.0.4a6e4e554CKhSc#task-2245479) |
| source | String | No | SeaTunnel-Source | 在sls中数据来源标记 |
| topic | String | No | SeaTunnel-Topic | 在sls中数据主题标记 |

## 任务示例

### 简单示例

> 此示例写入sls的logstore1的数据。如果您尚未安装和部署SeaTunnel,则需要按照安装SeaTunnel中的说明安装和部署SeaTunnel。然后按照[快速启动SeaTunnel引擎](../../Start-v2/locale/Quick-Start SeaTunnel Engine.md)中的说明运行此作业。
[创建RAM用户及授权](https://help.aliyun.com/zh/sls/create-a-ram-user-and-authorize-the-ram-user-to-access-log-service?spm=a2c4g.11186623.0.i4), 请确认RAM用户有足够的权限来读取及管理数据,参考:[RAM自定义授权示例](https://help.aliyun.com/zh/sls/use-custom-policies-to-grant-permissions-to-a-ram-user?spm=a2c4g.11186623.0.0.4a6e4e554CKhSc#reference-s3z-m1l-z2b)

```hocon
# Defining the runtime environment
env {
parallelism = 2
job.mode = "STREAMING"
checkpoint.interval = 30000
}
source {
FakeSource {
row.num = 10
map.size = 10
array.size = 10
bytes.length = 10
string.length = 10
schema = {
fields = {
id = "int"
name = "string"
description = "string"
weight = "string"
}
}
}
}
sink {
Sls {
endpoint = "cn-hangzhou-intranet.log.aliyuncs.com"
project = "project1"
logstore = "logstore1"
access_key_id = "xxxxxxxxxxxxxxxxxxxxxxxx"
access_key_secret = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
}
}
```

1 change: 1 addition & 0 deletions plugin-mapping.properties
Original file line number Diff line number Diff line change
Expand Up @@ -149,4 +149,5 @@ seatunnel.transform.Copy = seatunnel-transforms-v2
seatunnel.transform.DynamicCompile = seatunnel-transforms-v2
seatunnel.transform.LLM = seatunnel-transforms-v2
seatunnel.transform.Embedding = seatunnel-transforms-v2
seatunnel.sink.Sls = connector-sls

5 changes: 5 additions & 0 deletions seatunnel-connectors-v2/connector-sls/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -49,5 +49,10 @@
<artifactId>seatunnel-format-text</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-format-json</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -79,4 +79,22 @@ public class Config {
.defaultValue(-1L)
.withDescription(
"The interval for dynamically discovering topics and partitions.");

public static final Option<String> SOURCE =
Options.key("source")
.stringType()
.defaultValue("SeaTunnel-Source")
.withDescription("Aliyun sls producer source");

public static final Option<String> TOPIC =
Options.key("topic")
.stringType()
.defaultValue("SeaTunnel-Topic")
.withDescription("Aliyun sls producer topic");

public static final Option<Integer> LOG_GROUP_SIZE =
Options.key("log_group_size")
.intType()
.defaultValue(100)
.withDescription("Aliyun sls log group write size");
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.sls.serialization;

import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.format.json.JsonSerializationSchema;

import com.aliyun.openservices.log.common.LogContent;
import com.aliyun.openservices.log.common.LogItem;

import java.util.ArrayList;
import java.util.List;

public class SeatunnelRowSerialization {
JsonSerializationSchema jsonSerializationSchema;

public SeatunnelRowSerialization(SeaTunnelRowType rowType) {
this.jsonSerializationSchema = new JsonSerializationSchema(rowType);
}

public List<LogItem> serializeRow(SeaTunnelRow row) {
List<LogItem> logGroup = new ArrayList<LogItem>();
LogItem logItem = new LogItem();
String rowJson = new String(jsonSerializationSchema.serialize(row));
LogContent content = new LogContent("content", rowJson);
logItem.PushBack(content);
logGroup.add(logItem);
return logGroup;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.sls.sink;

import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.seatunnel.sls.state.SlsAggregatedCommitInfo;
import org.apache.seatunnel.connectors.seatunnel.sls.state.SlsCommitInfo;
import org.apache.seatunnel.connectors.seatunnel.sls.state.SlsSinkState;

import java.io.IOException;
import java.util.Collections;

public class SlsSink
implements SeaTunnelSink<
SeaTunnelRow, SlsSinkState, SlsCommitInfo, SlsAggregatedCommitInfo> {
private final ReadonlyConfig pluginConfig;
private final SeaTunnelRowType seaTunnelRowType;

public SlsSink(ReadonlyConfig pluginConfig, SeaTunnelRowType rowType) {
this.pluginConfig = pluginConfig;
this.seaTunnelRowType = rowType;
}

@Override
public String getPluginName() {
return org.apache.seatunnel.connectors.seatunnel.sls.config.Config.CONNECTOR_IDENTITY;
}

@Override
public SinkWriter<SeaTunnelRow, SlsCommitInfo, SlsSinkState> createWriter(
SinkWriter.Context context) throws IOException {
return new SlsSinkWriter(context, seaTunnelRowType, pluginConfig, Collections.emptyList());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.sls.sink;

import org.apache.seatunnel.api.sink.SinkCommitter;
import org.apache.seatunnel.connectors.seatunnel.sls.state.SlsCommitInfo;

import java.io.IOException;
import java.util.List;

public class SlsSinkCommitter implements SinkCommitter<SlsCommitInfo> {
@Override
public List<SlsCommitInfo> commit(List<SlsCommitInfo> commitInfos) throws IOException {
// nothing to do, when write function, data had sended
return null;
}

@Override
public void abort(List<SlsCommitInfo> commitInfos) throws IOException {}
}
Loading

0 comments on commit 048c47d

Please sign in to comment.