From 048c47d966f274293d510bbec737375a41a57e66 Mon Sep 17 00:00:00 2001 From: GumKey <743344516@qq.com> Date: Mon, 14 Oct 2024 17:26:09 +0800 Subject: [PATCH] =?UTF-8?q?[Improve][Sls]=20Add=20sls=20sink=20connector?= =?UTF-8?q?=E3=80=81e2e=E3=80=81doc=20(#7830)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: XenosK --- docs/en/connector-v2/sink/Sls.md | 84 +++++++++++++ docs/zh/connector-v2/sink/Sls.md | 84 +++++++++++++ plugin-mapping.properties | 1 + seatunnel-connectors-v2/connector-sls/pom.xml | 5 + .../seatunnel/sls/config/Config.java | 18 +++ .../SeatunnelRowSerialization.java | 46 +++++++ .../seatunnel/sls/sink/SlsSink.java | 53 +++++++++ .../seatunnel/sls/sink/SlsSinkCommitter.java | 35 ++++++ .../seatunnel/sls/sink/SlsSinkFactory.java | 56 +++++++++ .../seatunnel/sls/sink/SlsSinkWriter.java | 112 ++++++++++++++++++ .../sls/source/SlsConsumerThread.java | 2 +- .../seatunnel/sls/source/SlsSourceReader.java | 2 +- .../sls/state/SlsAggregatedCommitInfo.java | 29 +++++ .../seatunnel/sls/state/SlsCommitInfo.java | 30 +++++ .../seatunnel/sls/state/SlsSinkState.java | 30 +++++ .../seatunnel/sls/SlsFactoryTest.java | 2 + .../seatunnel/e2e/connector/sls/SlsIT.java | 7 ++ .../test/resources/sls_sink_to_console.conf | 56 +++++++++ 18 files changed, 650 insertions(+), 2 deletions(-) create mode 100644 docs/en/connector-v2/sink/Sls.md create mode 100644 docs/zh/connector-v2/sink/Sls.md create mode 100644 seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/serialization/SeatunnelRowSerialization.java create mode 100644 seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/sink/SlsSink.java create mode 100644 seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/sink/SlsSinkCommitter.java create mode 100644 seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/sink/SlsSinkFactory.java create mode 100644 seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/sink/SlsSinkWriter.java create mode 100644 seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/state/SlsAggregatedCommitInfo.java create mode 100644 seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/state/SlsCommitInfo.java create mode 100644 seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/state/SlsSinkState.java create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-sls-e2e/src/test/resources/sls_sink_to_console.conf diff --git a/docs/en/connector-v2/sink/Sls.md b/docs/en/connector-v2/sink/Sls.md new file mode 100644 index 00000000000..487786548d0 --- /dev/null +++ b/docs/en/connector-v2/sink/Sls.md @@ -0,0 +1,84 @@ +# Sls + +> Sls sink connector + +## Support Those Engines + +> Spark
+> Flink
+> Seatunnel Zeta
+ +## Key Features + +- [ ] [exactly-once](../../concept/connector-v2-features.md) +- [ ] [cdc](../../concept/connector-v2-features.md) + +## Description + +Sink connector for Aliyun Sls. + +## Supported DataSource Info + +In order to use the Sls connector, the following dependencies are required. +They can be downloaded via install-plugin.sh or from the Maven central repository. + +| Datasource | Supported Versions | Maven | +|------------|--------------------|-----------------------------------------------------------------------------------| +| Sls | Universal | [Download](https://mvnrepository.com/artifact/org.apache.seatunnel/connector-sls) | + +## Source Options + +| Name | Type | Required | Default | Description | +|-------------------------------------|---------|----------|------------------|--------------------------------------------------------------------------------------------------------------------------------------------------| +| project | String | Yes | - | [Aliyun Sls Project](https://help.aliyun.com/zh/sls/user-guide/manage-a-project?spm=a2c4g.11186623.0.0.6f9755ebyfaYSl) | +| logstore | String | Yes | - | [Aliyun Sls Logstore](https://help.aliyun.com/zh/sls/user-guide/manage-a-logstore?spm=a2c4g.11186623.0.0.13137c08nfuiBC) | +| endpoint | String | Yes | - | [Aliyun Access Endpoint](https://help.aliyun.com/zh/sls/developer-reference/api-sls-2020-12-30-endpoint?spm=a2c4g.11186623.0.0.548945a8UyJULa) | +| access_key_id | String | Yes | - | [Aliyun AccessKey ID](https://help.aliyun.com/zh/ram/user-guide/create-an-accesskey-pair?spm=a2c4g.11186623.0.0.4a6e4e554CKhSc#task-2245479) | +| access_key_secret | String | Yes | - | [Aliyun AccessKey Secret](https://help.aliyun.com/zh/ram/user-guide/create-an-accesskey-pair?spm=a2c4g.11186623.0.0.4a6e4e554CKhSc#task-2245479) | +| source | String | No | SeaTunnel-Source | Data Source marking in sls | +| topic | String | No | SeaTunnel-Topic | Data topic marking in sls | + +## Task Example + +### Simple + +> This example write data to the sls's logstore1.And if you have not yet installed and deployed SeaTunnel, you need to follow the instructions in Install SeaTunnel to install and deploy SeaTunnel. And if you have not yet installed and deployed SeaTunnel, you need to follow the instructions in [Install SeaTunnel](../../start-v2/locally/deployment.md) to install and deploy SeaTunnel. And then follow the instructions in [Quick Start With SeaTunnel Engine](../../start-v2/locally/quick-start-seatunnel-engine.md) to run this job. + +[Create RAM user and authorization](https://help.aliyun.com/zh/sls/create-a-ram-user-and-authorize-the-ram-user-to-access-log-service?spm=a2c4g.11186623.0.i4),Please ensure thr ram user have sufficient rights to perform, reference [RAM Custom Authorization Example](https://help.aliyun.com/zh/sls/use-custom-policies-to-grant-permissions-to-a-ram-user?spm=a2c4g.11186623.0.0.4a6e4e554CKhSc#reference-s3z-m1l-z2b) + +```hocon +# Defining the runtime environment +env { + parallelism = 2 + job.mode = "STREAMING" + checkpoint.interval = 30000 +} +source { + FakeSource { + row.num = 10 + map.size = 10 + array.size = 10 + bytes.length = 10 + string.length = 10 + schema = { + fields = { + id = "int" + name = "string" + description = "string" + weight = "string" + } + } + } +} + +sink { + Sls { + endpoint = "cn-hangzhou-intranet.log.aliyuncs.com" + project = "project1" + logstore = "logstore1" + access_key_id = "xxxxxxxxxxxxxxxxxxxxxxxx" + access_key_secret = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" + } +} +``` + diff --git a/docs/zh/connector-v2/sink/Sls.md b/docs/zh/connector-v2/sink/Sls.md new file mode 100644 index 00000000000..94e4f3c07a8 --- /dev/null +++ b/docs/zh/connector-v2/sink/Sls.md @@ -0,0 +1,84 @@ +# Sls + +> Sls sink connector + +## Support Those Engines + +> Spark
+> Flink
+> Seatunnel Zeta
+ +## 主要特性 + +- [ ] [exactly-once](../../concept/connector-v2-features.md) +- [ ] [cdc](../../concept/connector-v2-features.md) + +## 描述 + +Sink connector for Aliyun Sls. + +从写入数据到阿里云Sls日志服务 + +为了使用Sls连接器,需要以下依赖关系。 +它们可以通过install-plugin.sh或Maven中央存储库下载。 + +| Datasource | Supported Versions | Maven | +|------------|--------------------|-----------------------------------------------------------------------------------| +| Sls | Universal | [Download](https://mvnrepository.com/artifact/org.apache.seatunnel/connector-sls) | + +## 支持的数据源信息 + +| Name | Type | Required | Default | Description | +|-------------------------------------|----------|----------|-------------------|------------------------------------------------------------------------------------------------------------------------------------| +| project | String | Yes | - | [阿里云 Sls 项目](https://help.aliyun.com/zh/sls/user-guide/manage-a-project?spm=a2c4g.11186623.0.0.6f9755ebyfaYSl) | +| logstore | String | Yes | - | [阿里云 Sls 日志库](https://help.aliyun.com/zh/sls/user-guide/manage-a-logstore?spm=a2c4g.11186623.0.0.13137c08nfuiBC) | +| endpoint | String | Yes | - | [阿里云访问服务点](https://help.aliyun.com/zh/sls/developer-reference/api-sls-2020-12-30-endpoint?spm=a2c4g.11186623.0.0.548945a8UyJULa) | +| access_key_id | String | Yes | - | [阿里云访问用户ID](https://help.aliyun.com/zh/ram/user-guide/create-an-accesskey-pair?spm=a2c4g.11186623.0.0.4a6e4e554CKhSc#task-2245479) | +| access_key_secret | String | Yes | - | [阿里云访问用户密码](https://help.aliyun.com/zh/ram/user-guide/create-an-accesskey-pair?spm=a2c4g.11186623.0.0.4a6e4e554CKhSc#task-2245479) | +| source | String | No | SeaTunnel-Source | 在sls中数据来源标记 | +| topic | String | No | SeaTunnel-Topic | 在sls中数据主题标记 | + +## 任务示例 + +### 简单示例 + +> 此示例写入sls的logstore1的数据。如果您尚未安装和部署SeaTunnel,则需要按照安装SeaTunnel中的说明安装和部署SeaTunnel。然后按照[快速启动SeaTunnel引擎](../../Start-v2/locale/Quick-Start SeaTunnel Engine.md)中的说明运行此作业。 + +[创建RAM用户及授权](https://help.aliyun.com/zh/sls/create-a-ram-user-and-authorize-the-ram-user-to-access-log-service?spm=a2c4g.11186623.0.i4), 请确认RAM用户有足够的权限来读取及管理数据,参考:[RAM自定义授权示例](https://help.aliyun.com/zh/sls/use-custom-policies-to-grant-permissions-to-a-ram-user?spm=a2c4g.11186623.0.0.4a6e4e554CKhSc#reference-s3z-m1l-z2b) + +```hocon +# Defining the runtime environment +env { + parallelism = 2 + job.mode = "STREAMING" + checkpoint.interval = 30000 +} +source { + FakeSource { + row.num = 10 + map.size = 10 + array.size = 10 + bytes.length = 10 + string.length = 10 + schema = { + fields = { + id = "int" + name = "string" + description = "string" + weight = "string" + } + } + } +} + +sink { + Sls { + endpoint = "cn-hangzhou-intranet.log.aliyuncs.com" + project = "project1" + logstore = "logstore1" + access_key_id = "xxxxxxxxxxxxxxxxxxxxxxxx" + access_key_secret = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" + } +} +``` + diff --git a/plugin-mapping.properties b/plugin-mapping.properties index 3673ade48f6..86c95bc3e22 100644 --- a/plugin-mapping.properties +++ b/plugin-mapping.properties @@ -149,4 +149,5 @@ seatunnel.transform.Copy = seatunnel-transforms-v2 seatunnel.transform.DynamicCompile = seatunnel-transforms-v2 seatunnel.transform.LLM = seatunnel-transforms-v2 seatunnel.transform.Embedding = seatunnel-transforms-v2 +seatunnel.sink.Sls = connector-sls diff --git a/seatunnel-connectors-v2/connector-sls/pom.xml b/seatunnel-connectors-v2/connector-sls/pom.xml index dd47dd0864b..b7b72933477 100644 --- a/seatunnel-connectors-v2/connector-sls/pom.xml +++ b/seatunnel-connectors-v2/connector-sls/pom.xml @@ -49,5 +49,10 @@ seatunnel-format-text ${project.version} + + org.apache.seatunnel + seatunnel-format-json + ${project.version} + diff --git a/seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/config/Config.java b/seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/config/Config.java index 46917b8b84a..d279b10c7dc 100644 --- a/seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/config/Config.java +++ b/seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/config/Config.java @@ -79,4 +79,22 @@ public class Config { .defaultValue(-1L) .withDescription( "The interval for dynamically discovering topics and partitions."); + + public static final Option SOURCE = + Options.key("source") + .stringType() + .defaultValue("SeaTunnel-Source") + .withDescription("Aliyun sls producer source"); + + public static final Option TOPIC = + Options.key("topic") + .stringType() + .defaultValue("SeaTunnel-Topic") + .withDescription("Aliyun sls producer topic"); + + public static final Option LOG_GROUP_SIZE = + Options.key("log_group_size") + .intType() + .defaultValue(100) + .withDescription("Aliyun sls log group write size"); } diff --git a/seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/serialization/SeatunnelRowSerialization.java b/seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/serialization/SeatunnelRowSerialization.java new file mode 100644 index 00000000000..a9308d89c62 --- /dev/null +++ b/seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/serialization/SeatunnelRowSerialization.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.sls.serialization; + +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.format.json.JsonSerializationSchema; + +import com.aliyun.openservices.log.common.LogContent; +import com.aliyun.openservices.log.common.LogItem; + +import java.util.ArrayList; +import java.util.List; + +public class SeatunnelRowSerialization { + JsonSerializationSchema jsonSerializationSchema; + + public SeatunnelRowSerialization(SeaTunnelRowType rowType) { + this.jsonSerializationSchema = new JsonSerializationSchema(rowType); + } + + public List serializeRow(SeaTunnelRow row) { + List logGroup = new ArrayList(); + LogItem logItem = new LogItem(); + String rowJson = new String(jsonSerializationSchema.serialize(row)); + LogContent content = new LogContent("content", rowJson); + logItem.PushBack(content); + logGroup.add(logItem); + return logGroup; + } +} diff --git a/seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/sink/SlsSink.java b/seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/sink/SlsSink.java new file mode 100644 index 00000000000..cad767e6355 --- /dev/null +++ b/seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/sink/SlsSink.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.sls.sink; + +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.sink.SeaTunnelSink; +import org.apache.seatunnel.api.sink.SinkWriter; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.connectors.seatunnel.sls.state.SlsAggregatedCommitInfo; +import org.apache.seatunnel.connectors.seatunnel.sls.state.SlsCommitInfo; +import org.apache.seatunnel.connectors.seatunnel.sls.state.SlsSinkState; + +import java.io.IOException; +import java.util.Collections; + +public class SlsSink + implements SeaTunnelSink< + SeaTunnelRow, SlsSinkState, SlsCommitInfo, SlsAggregatedCommitInfo> { + private final ReadonlyConfig pluginConfig; + private final SeaTunnelRowType seaTunnelRowType; + + public SlsSink(ReadonlyConfig pluginConfig, SeaTunnelRowType rowType) { + this.pluginConfig = pluginConfig; + this.seaTunnelRowType = rowType; + } + + @Override + public String getPluginName() { + return org.apache.seatunnel.connectors.seatunnel.sls.config.Config.CONNECTOR_IDENTITY; + } + + @Override + public SinkWriter createWriter( + SinkWriter.Context context) throws IOException { + return new SlsSinkWriter(context, seaTunnelRowType, pluginConfig, Collections.emptyList()); + } +} diff --git a/seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/sink/SlsSinkCommitter.java b/seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/sink/SlsSinkCommitter.java new file mode 100644 index 00000000000..18ea3d582dd --- /dev/null +++ b/seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/sink/SlsSinkCommitter.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.sls.sink; + +import org.apache.seatunnel.api.sink.SinkCommitter; +import org.apache.seatunnel.connectors.seatunnel.sls.state.SlsCommitInfo; + +import java.io.IOException; +import java.util.List; + +public class SlsSinkCommitter implements SinkCommitter { + @Override + public List commit(List commitInfos) throws IOException { + // nothing to do, when write function, data had sended + return null; + } + + @Override + public void abort(List commitInfos) throws IOException {} +} diff --git a/seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/sink/SlsSinkFactory.java b/seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/sink/SlsSinkFactory.java new file mode 100644 index 00000000000..82b0d598986 --- /dev/null +++ b/seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/sink/SlsSinkFactory.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.sls.sink; + +import org.apache.seatunnel.api.configuration.util.OptionRule; +import org.apache.seatunnel.api.table.connector.TableSink; +import org.apache.seatunnel.api.table.factory.Factory; +import org.apache.seatunnel.api.table.factory.TableSinkFactory; +import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext; +import org.apache.seatunnel.connectors.seatunnel.sls.config.Config; + +import com.google.auto.service.AutoService; + +@AutoService(Factory.class) +public class SlsSinkFactory implements TableSinkFactory { + @Override + public String factoryIdentifier() { + return Config.CONNECTOR_IDENTITY; + } + + @Override + public OptionRule optionRule() { + return OptionRule.builder() + .required( + Config.ENDPOINT, + Config.PROJECT, + Config.LOGSTORE, + Config.ACCESS_KEY_ID, + Config.ACCESS_KEY_SECRET) + .optional(Config.SOURCE, Config.TOPIC) + .build(); + } + + @Override + public TableSink createSink(TableSinkFactoryContext context) { + return () -> + new SlsSink( + context.getOptions(), + context.getCatalogTable().getTableSchema().toPhysicalRowDataType()); + } +} diff --git a/seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/sink/SlsSinkWriter.java b/seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/sink/SlsSinkWriter.java new file mode 100644 index 00000000000..4de6efc64dd --- /dev/null +++ b/seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/sink/SlsSinkWriter.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.sls.sink; + +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.sink.SinkWriter; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.connectors.seatunnel.sls.serialization.SeatunnelRowSerialization; +import org.apache.seatunnel.connectors.seatunnel.sls.state.SlsCommitInfo; +import org.apache.seatunnel.connectors.seatunnel.sls.state.SlsSinkState; + +import com.aliyun.openservices.log.Client; +import com.aliyun.openservices.log.common.LogItem; +import com.aliyun.openservices.log.request.PutLogsRequest; +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; + +import static org.apache.seatunnel.connectors.seatunnel.sls.config.Config.ACCESS_KEY_ID; +import static org.apache.seatunnel.connectors.seatunnel.sls.config.Config.ACCESS_KEY_SECRET; +import static org.apache.seatunnel.connectors.seatunnel.sls.config.Config.ENDPOINT; +import static org.apache.seatunnel.connectors.seatunnel.sls.config.Config.LOGSTORE; +import static org.apache.seatunnel.connectors.seatunnel.sls.config.Config.LOG_GROUP_SIZE; +import static org.apache.seatunnel.connectors.seatunnel.sls.config.Config.PROJECT; +import static org.apache.seatunnel.connectors.seatunnel.sls.config.Config.SOURCE; +import static org.apache.seatunnel.connectors.seatunnel.sls.config.Config.TOPIC; + +@Slf4j +public class SlsSinkWriter implements SinkWriter { + + private final Client client; + private final String project; + private final String logStore; + private final String topic; + private final String source; + private final Integer logGroupSize; + private final SinkWriter.Context context; + private final List slsStates; + private final SeatunnelRowSerialization seatunnelRowSerialization; + + public SlsSinkWriter( + SinkWriter.Context context, + SeaTunnelRowType seaTunnelRowType, + ReadonlyConfig pluginConfig, + List slsStates) { + + this.client = + new Client( + pluginConfig.get(ENDPOINT), + pluginConfig.get(ACCESS_KEY_ID), + pluginConfig.get(ACCESS_KEY_SECRET)); + this.project = pluginConfig.get(PROJECT); + this.logStore = pluginConfig.get(LOGSTORE); + this.topic = pluginConfig.get(TOPIC); + this.source = pluginConfig.get(SOURCE); + this.logGroupSize = pluginConfig.get(LOG_GROUP_SIZE); + this.context = context; + this.slsStates = slsStates; + this.seatunnelRowSerialization = new SeatunnelRowSerialization(seaTunnelRowType); + } + + @Override + public void write(SeaTunnelRow element) throws IOException { + List data = this.seatunnelRowSerialization.serializeRow(element); + PutLogsRequest plr = new PutLogsRequest(project, logStore, topic, source, data); + try { + this.client.PutLogs(plr); + } catch (Throwable e) { + log.error("write logs failed", e); + e.printStackTrace(); + throw new IOException(e); + } + } + + @Override + public Optional prepareCommit() throws IOException { + // nothing to do, when write function, data had sended + return Optional.empty(); + } + + @Override + public void abortPrepare() {} + + @Override + public List snapshotState(long checkpointId) { + return new ArrayList<>(); + } + + @Override + public void close() throws IOException { + this.client.shutdown(); + } +} diff --git a/seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/source/SlsConsumerThread.java b/seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/source/SlsConsumerThread.java index 7a2b9f65ba4..9816f7d7d64 100644 --- a/seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/source/SlsConsumerThread.java +++ b/seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/source/SlsConsumerThread.java @@ -55,7 +55,7 @@ public void run() { } finally { try { if (client != null) { - /** now do nothine, do not need close */ + client.shutdown(); } } catch (Throwable t) { throw new RuntimeException(t); diff --git a/seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/source/SlsSourceReader.java b/seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/source/SlsSourceReader.java index 819b3b07d60..9b4ed8308dd 100644 --- a/seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/source/SlsSourceReader.java +++ b/seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/source/SlsSourceReader.java @@ -163,7 +163,7 @@ public List snapshotState(long checkpointId) throws Exception { return sourceSplits.stream().map(SlsSourceSplit::copy).collect(Collectors.toList()); } - // 接受 + // received splits and do somethins for this @Override public void addSplits(List splits) { running = true; diff --git a/seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/state/SlsAggregatedCommitInfo.java b/seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/state/SlsAggregatedCommitInfo.java new file mode 100644 index 00000000000..eac7d7946fe --- /dev/null +++ b/seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/state/SlsAggregatedCommitInfo.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.sls.state; + +import lombok.AllArgsConstructor; +import lombok.Data; + +import java.util.List; + +@Data +@AllArgsConstructor +public class SlsAggregatedCommitInfo { + List commitInfos; +} diff --git a/seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/state/SlsCommitInfo.java b/seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/state/SlsCommitInfo.java new file mode 100644 index 00000000000..f378950d460 --- /dev/null +++ b/seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/state/SlsCommitInfo.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.sls.state; + +import lombok.AllArgsConstructor; +import lombok.Data; + +import java.io.Serializable; + +@Data +@AllArgsConstructor +public class SlsCommitInfo implements Serializable { + + private final String data; +} diff --git a/seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/state/SlsSinkState.java b/seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/state/SlsSinkState.java new file mode 100644 index 00000000000..6d1aaf1aac9 --- /dev/null +++ b/seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/state/SlsSinkState.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.sls.state; + +import lombok.AllArgsConstructor; +import lombok.Data; + +import java.io.Serializable; + +@Data +@AllArgsConstructor +public class SlsSinkState implements Serializable { + + private final String data; +} diff --git a/seatunnel-connectors-v2/connector-sls/src/test/java/org/apache/seatunnel/connectors/seatunnel/sls/SlsFactoryTest.java b/seatunnel-connectors-v2/connector-sls/src/test/java/org/apache/seatunnel/connectors/seatunnel/sls/SlsFactoryTest.java index 1d7c2ab2da7..84780c77ddb 100644 --- a/seatunnel-connectors-v2/connector-sls/src/test/java/org/apache/seatunnel/connectors/seatunnel/sls/SlsFactoryTest.java +++ b/seatunnel-connectors-v2/connector-sls/src/test/java/org/apache/seatunnel/connectors/seatunnel/sls/SlsFactoryTest.java @@ -17,6 +17,7 @@ package org.apache.seatunnel.connectors.seatunnel.sls; +import org.apache.seatunnel.connectors.seatunnel.sls.sink.SlsSinkFactory; import org.apache.seatunnel.connectors.seatunnel.sls.source.SlsSourceFactory; import org.junit.jupiter.api.Assertions; @@ -27,5 +28,6 @@ public class SlsFactoryTest { @Test void optionRule() { Assertions.assertNotNull((new SlsSourceFactory()).optionRule()); + Assertions.assertNotNull((new SlsSinkFactory()).optionRule()); } } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-sls-e2e/src/test/java/org/apache/seatunnel/e2e/connector/sls/SlsIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-sls-e2e/src/test/java/org/apache/seatunnel/e2e/connector/sls/SlsIT.java index 07d368ec8cd..6a7d52515f5 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-sls-e2e/src/test/java/org/apache/seatunnel/e2e/connector/sls/SlsIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-sls-e2e/src/test/java/org/apache/seatunnel/e2e/connector/sls/SlsIT.java @@ -44,6 +44,13 @@ public void startUp() throws Exception {} @Override public void tearDown() throws Exception {} + @TestTemplate + public void testSlsStreamingSink(TestContainer container) + throws IOException, InterruptedException { + Container.ExecResult execResult1 = container.executeJob("/sls_sink_to_console.conf"); + Assertions.assertEquals(0, execResult1.getExitCode(), execResult1.getStderr()); + } + @TestTemplate public void testSlsStreamingSource(TestContainer container) throws IOException, InterruptedException { diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-sls-e2e/src/test/resources/sls_sink_to_console.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-sls-e2e/src/test/resources/sls_sink_to_console.conf new file mode 100644 index 00000000000..2b3a280459d --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-sls-e2e/src/test/resources/sls_sink_to_console.conf @@ -0,0 +1,56 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + parallelism = 1 + job.mode = "BATCH" + + #spark config + spark.app.name = "SeaTunnel" + spark.executor.instances = 1 + spark.executor.cores = 1 + spark.executor.memory = "1g" + spark.master = local +} + +source { + FakeSource { + row.num = 10 + map.size = 10 + array.size = 10 + bytes.length = 10 + string.length = 10 + schema = { + fields = { + id = "int" + name = "string" + description = "string" + weight = "string" + } + } + } +} + +sink { + Sls { + endpoint = "xxxxxx" + project = "xxxxxx" + logstore = "xxxxxx" + access_key_id = "xxxxxx" + access_key_secret = "xxxxxxx" + } +} \ No newline at end of file