From 968ddcfc79747af1765c04c8ffec147f7a25cc02 Mon Sep 17 00:00:00 2001 From: Jast Date: Thu, 7 Nov 2024 16:56:23 +0800 Subject: [PATCH] [Doc][Format] Add protobuf doc (#7989) --- docs/en/connector-v2/formats/protobuf.md | 163 ++++++++++++++++++++++ docs/zh/connector-v2/formats/protobuf.md | 164 +++++++++++++++++++++++ 2 files changed, 327 insertions(+) create mode 100644 docs/en/connector-v2/formats/protobuf.md create mode 100644 docs/zh/connector-v2/formats/protobuf.md diff --git a/docs/en/connector-v2/formats/protobuf.md b/docs/en/connector-v2/formats/protobuf.md new file mode 100644 index 00000000000..8433528978d --- /dev/null +++ b/docs/en/connector-v2/formats/protobuf.md @@ -0,0 +1,163 @@ +# Protobuf Format + +Protobuf (Protocol Buffers) is a language-neutral, platform-independent data serialization format developed by Google. It provides an efficient way to encode structured data and supports multiple programming languages and platforms. + +Currently, Protobuf format can be used with Kafka. + +## Kafka Usage Example + +- Example of simulating a randomly generated data source and writing it to Kafka in Protobuf format + +```hocon +env { + parallelism = 1 + job.mode = "BATCH" +} + +source { + FakeSource { + parallelism = 1 + result_table_name = "fake" + row.num = 16 + schema = { + fields { + c_int32 = int + c_int64 = long + c_float = float + c_double = double + c_bool = boolean + c_string = string + c_bytes = bytes + + Address { + city = string + state = string + street = string + } + attributes = "map" + phone_numbers = "array" + } + } + } +} + +sink { + kafka { + topic = "test_protobuf_topic_fake_source" + bootstrap.servers = "kafkaCluster:9092" + format = protobuf + kafka.request.timeout.ms = 60000 + kafka.config = { + acks = "all" + request.timeout.ms = 60000 + buffer.memory = 33554432 + } + protobuf_message_name = Person + protobuf_schema = """ + syntax = "proto3"; + + package org.apache.seatunnel.format.protobuf; + + option java_outer_classname = "ProtobufE2E"; + + message Person { + int32 c_int32 = 1; + int64 c_int64 = 2; + float c_float = 3; + double c_double = 4; + bool c_bool = 5; + string c_string = 6; + bytes c_bytes = 7; + + message Address { + string street = 1; + string city = 2; + string state = 3; + string zip = 4; + } + + Address address = 8; + + map attributes = 9; + + repeated string phone_numbers = 10; + } + """ + } +} +``` + +- Example of reading data from Kafka in Protobuf format and printing it to the console + +```hocon +env { + parallelism = 1 + job.mode = "BATCH" +} + +source { + Kafka { + topic = "test_protobuf_topic_fake_source" + format = protobuf + protobuf_message_name = Person + protobuf_schema = """ + syntax = "proto3"; + + package org.apache.seatunnel.format.protobuf; + + option java_outer_classname = "ProtobufE2E"; + + message Person { + int32 c_int32 = 1; + int64 c_int64 = 2; + float c_float = 3; + double c_double = 4; + bool c_bool = 5; + string c_string = 6; + bytes c_bytes = 7; + + message Address { + string street = 1; + string city = 2; + string state = 3; + string zip = 4; + } + + Address address = 8; + + map attributes = 9; + + repeated string phone_numbers = 10; + } + """ + schema = { + fields { + c_int32 = int + c_int64 = long + c_float = float + c_double = double + c_bool = boolean + c_string = string + c_bytes = bytes + + Address { + city = string + state = string + street = string + } + attributes = "map" + phone_numbers = "array" + } + } + bootstrap.servers = "kafkaCluster:9092" + start_mode = "earliest" + result_table_name = "kafka_table" + } +} + +sink { + Console { + source_table_name = "kafka_table" + } +} +``` \ No newline at end of file diff --git a/docs/zh/connector-v2/formats/protobuf.md b/docs/zh/connector-v2/formats/protobuf.md new file mode 100644 index 00000000000..68c4176fd6f --- /dev/null +++ b/docs/zh/connector-v2/formats/protobuf.md @@ -0,0 +1,164 @@ +# Protobuf 格式 + +Protobuf(Protocol Buffers)是一种由Google开发的语言中立、平台无关的数据序列化格式。它提供了一种高效的方式来编码结构化数据,同时支持多种编程语言和平台。 + +目前支持在 Kafka 中使用 protobuf 格式。 + +## Kafka 使用示例 + +- 模拟随机生成数据源,并以 protobuf 的格式 写入 kafka 的实例 + +```hocon +env { + parallelism = 1 + job.mode = "BATCH" +} + +source { + FakeSource { + parallelism = 1 + result_table_name = "fake" + row.num = 16 + schema = { + fields { + c_int32 = int + c_int64 = long + c_float = float + c_double = double + c_bool = boolean + c_string = string + c_bytes = bytes + + Address { + city = string + state = string + street = string + } + attributes = "map" + phone_numbers = "array" + } + } + } +} + +sink { + kafka { + topic = "test_protobuf_topic_fake_source" + bootstrap.servers = "kafkaCluster:9092" + format = protobuf + kafka.request.timeout.ms = 60000 + kafka.config = { + acks = "all" + request.timeout.ms = 60000 + buffer.memory = 33554432 + } + protobuf_message_name = Person + protobuf_schema = """ + syntax = "proto3"; + + package org.apache.seatunnel.format.protobuf; + + option java_outer_classname = "ProtobufE2E"; + + message Person { + int32 c_int32 = 1; + int64 c_int64 = 2; + float c_float = 3; + double c_double = 4; + bool c_bool = 5; + string c_string = 6; + bytes c_bytes = 7; + + message Address { + string street = 1; + string city = 2; + string state = 3; + string zip = 4; + } + + Address address = 8; + + map attributes = 9; + + repeated string phone_numbers = 10; + } + """ + } +} +``` + +- 从 kafka 读取 protobuf 格式的数据并打印到控制台的示例 + +```hocon +env { + parallelism = 1 + job.mode = "BATCH" +} + +source { + Kafka { + topic = "test_protobuf_topic_fake_source" + format = protobuf + protobuf_message_name = Person + protobuf_schema = """ + syntax = "proto3"; + + package org.apache.seatunnel.format.protobuf; + + option java_outer_classname = "ProtobufE2E"; + + message Person { + int32 c_int32 = 1; + int64 c_int64 = 2; + float c_float = 3; + double c_double = 4; + bool c_bool = 5; + string c_string = 6; + bytes c_bytes = 7; + + message Address { + string street = 1; + string city = 2; + string state = 3; + string zip = 4; + } + + Address address = 8; + + map attributes = 9; + + repeated string phone_numbers = 10; + } + """ + schema = { + fields { + c_int32 = int + c_int64 = long + c_float = float + c_double = double + c_bool = boolean + c_string = string + c_bytes = bytes + + Address { + city = string + state = string + street = string + } + attributes = "map" + phone_numbers = "array" + } + } + bootstrap.servers = "kafkaCluster:9092" + start_mode = "earliest" + result_table_name = "kafka_table" + } +} + +sink { + Console { + source_table_name = "kafka_table" + } +} +``` +