Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] [File Connector] Supports writing column names when the output type is file (CSV) #5459

Merged
merged 51 commits into from
Sep 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
7a44e95
[Feature] [File Connector] Supports writing column names when the out…
Sep 11, 2023
de2859b
[Feature] [File Connector] fix code style and lineSeparator #5443
Sep 13, 2023
4acdda8
[Feature] [File Connector] add enable_header_write,false:dont write h…
Sep 13, 2023
b7c90dc
[Feature] [File Connector] fix code style #5443
Sep 13, 2023
ee306b7
Merge branch 'dev' of https://github.com/apache/seatunnel into dev
Sep 13, 2023
388771b
Merge branch 'dev' of https://github.com/apache/seatunnel into dev
Sep 14, 2023
d1da88a
[Feature] [File Connector] add enable_header_write explain #5443
Sep 14, 2023
dc26b73
[Feature] [File Connector]fix code style #5443
Sep 15, 2023
6da518a
Update docs/en/connector-v2/sink/LocalFile.md
EricJoy2048 Sep 15, 2023
98d2571
Update docs/en/connector-v2/sink/LocalFile.md
EricJoy2048 Sep 15, 2023
5bad203
Merge branch 'dev' of https://github.com/apache/seatunnel into dev
Sep 15, 2023
84c0ff5
Merge branch 'dev' of https://github.com/zck573693104/seatunnel into dev
Sep 15, 2023
17e6710
[Feature] [File Connector]fix code style #5443
Sep 15, 2023
c30321a
Merge branch 'dev' of https://github.com/apache/seatunnel into dev
Sep 18, 2023
6846e37
[Feature] [File Connector]add junit test #5443
Sep 19, 2023
d305e6f
Merge branch 'dev' of https://github.com/apache/seatunnel into dev
Sep 20, 2023
03a24a9
[Feature] [File Connector]add license header: #5443
Sep 20, 2023
1ffa54f
[Feature] [File Connector] Supports writing column names when the out…
Sep 11, 2023
cbc07f6
[Feature] [File Connector] fix code style and lineSeparator #5443
Sep 13, 2023
2b1e15c
[Feature] [File Connector] add enable_header_write,false:dont write h…
Sep 13, 2023
ce3fdb1
[Feature] [File Connector] fix code style #5443
Sep 13, 2023
8388a31
[Feature] [File Connector] add enable_header_write explain #5443
Sep 14, 2023
dc8ce8a
[Feature] [File Connector]fix code style #5443
Sep 15, 2023
3271580
Update docs/en/connector-v2/sink/LocalFile.md
EricJoy2048 Sep 15, 2023
d912a39
Update docs/en/connector-v2/sink/LocalFile.md
EricJoy2048 Sep 15, 2023
8ae9ca0
[Feature] [File Connector]fix code style #5443
Sep 15, 2023
f899f6b
[Feature] [File Connector]add junit test #5443
Sep 19, 2023
52a3f8d
[Feature] [File Connector]add license header: #5443
Sep 20, 2023
4ca2652
Merge branch 'dev' of https://github.com/zck573693104/seatunnel into dev
Sep 20, 2023
a5f4fe8
Merge branch 'dev' of https://github.com/apache/seatunnel into dev
Sep 21, 2023
921e7d8
[Feature] [File Connector]add junit: #5443
Sep 21, 2023
4f66bad
[Feature] [File Connector]add junit: #5443
Sep 21, 2023
ae1e347
[Feature] [File Connector]remove scala: #5443
Sep 22, 2023
03c369f
[Feature] [File Connector]modify md style: #5443
Sep 22, 2023
bf7df66
[Feature] [File Connector] Supports writing column names when the out…
Sep 11, 2023
301058e
[Feature] [File Connector] fix code style and lineSeparator #5443
Sep 13, 2023
147302a
[Feature] [File Connector] add enable_header_write,false:dont write h…
Sep 13, 2023
9f7ea83
[Feature] [File Connector] fix code style #5443
Sep 13, 2023
afb7ce5
[Feature] [File Connector] add enable_header_write explain #5443
Sep 14, 2023
c9b7b38
[Feature] [File Connector]fix code style #5443
Sep 15, 2023
e0885bb
Update docs/en/connector-v2/sink/LocalFile.md
EricJoy2048 Sep 15, 2023
0f9987f
Update docs/en/connector-v2/sink/LocalFile.md
EricJoy2048 Sep 15, 2023
1f37a64
[Feature] [File Connector]fix code style #5443
Sep 15, 2023
5761244
[Feature] [File Connector]add junit test #5443
Sep 19, 2023
af3b5f2
[Feature] [File Connector]add license header: #5443
Sep 20, 2023
37f3a15
[Feature] [File Connector]add junit: #5443
Sep 21, 2023
3b809e7
[Feature] [File Connector]add junit: #5443
Sep 21, 2023
738e36a
[Feature] [File Connector]remove scala: #5443
Sep 22, 2023
bc5a7e0
[Feature] [File Connector]modify md style: #5443
Sep 22, 2023
f6c115a
Merge branch 'dev' of https://github.com/zck573693104/seatunnel into dev
Sep 25, 2023
2bc2fa5
[Feature] [File Connector]junit modify: #5443
Sep 25, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 25 additions & 20 deletions docs/en/connector-v2/sink/LocalFile.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,26 +30,27 @@ By default, we use 2PC commit to ensure `exactly-once`

## Options

| name | type | required | default value | remarks |
|----------------------------------|---------|----------|--------------------------------------------|-----------------------------------------------------------|
| path | string | yes | - | |
| custom_filename | boolean | no | false | Whether you need custom the filename |
| file_name_expression | string | no | "${transactionId}" | Only used when custom_filename is true |
| filename_time_format | string | no | "yyyy.MM.dd" | Only used when custom_filename is true |
| file_format_type | string | no | "csv" | |
| field_delimiter | string | no | '\001' | Only used when file_format_type is text |
| row_delimiter | string | no | "\n" | Only used when file_format_type is text |
| have_partition | boolean | no | false | Whether you need processing partitions. |
| partition_by | array | no | - | Only used then have_partition is true |
| partition_dir_expression | string | no | "${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/" | Only used then have_partition is true |
| is_partition_field_write_in_file | boolean | no | false | Only used then have_partition is true |
| sink_columns | array | no | | When this parameter is empty, all fields are sink columns |
| is_enable_transaction | boolean | no | true | |
| batch_size | int | no | 1000000 | |
| compress_codec | string | no | none | |
| common-options | object | no | - | |
| max_rows_in_memory | int | no | - | Only used when file_format_type is excel. |
| sheet_name | string | no | Sheet${Random number} | Only used when file_format_type is excel. |
| name | type | required | default value | remarks |
|----------------------------------|---------|----------|--------------------------------------------|-----------------------------------------------------------------------------------------------|
| path | string | yes | - | |
| custom_filename | boolean | no | false | Whether you need custom the filename |
| file_name_expression | string | no | "${transactionId}" | Only used when custom_filename is true |
| filename_time_format | string | no | "yyyy.MM.dd" | Only used when custom_filename is true |
| file_format_type | string | no | "csv" | |
| field_delimiter | string | no | '\001' | Only used when file_format_type is text |
| row_delimiter | string | no | "\n" | Only used when file_format_type is text |
| have_partition | boolean | no | false | Whether you need processing partitions. |
| partition_by | array | no | - | Only used then have_partition is true |
| partition_dir_expression | string | no | "${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/" | Only used then have_partition is true |
| is_partition_field_write_in_file | boolean | no | false | Only used then have_partition is true |
| sink_columns | array | no | | When this parameter is empty, all fields are sink columns |
| is_enable_transaction | boolean | no | true | |
| batch_size | int | no | 1000000 | |
| compress_codec | string | no | none | |
| common-options | object | no | - | |
| max_rows_in_memory | int | no | - | Only used when file_format_type is excel. |
| sheet_name | string | no | Sheet${Random number} | Only used when file_format_type is excel. |
| enable_header_write | boolean | no | false | Only used when file_format_type is text,csv.<br/> false:don't write header,true:write header. |

### path [string]

Expand Down Expand Up @@ -166,6 +167,10 @@ When File Format is Excel,The maximum number of data items that can be cached in

Writer the sheet of the workbook

### enable_header_write [boolean]

Only used when file_format_type is text,csv.false:don't write header,true:write header.

## Example

For orc file format simple config
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public class BaseFileSinkConfig implements DelimiterConfig, Serializable {
protected DateUtils.Formatter dateFormat = DateUtils.Formatter.YYYY_MM_DD;
protected DateTimeUtils.Formatter datetimeFormat = DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS;
protected TimeUtils.Formatter timeFormat = TimeUtils.Formatter.HH_MM_SS;
protected Boolean enableHeaderWriter = false;

public BaseFileSinkConfig(@NonNull Config config) {
if (config.hasPath(BaseSinkConfig.COMPRESS_CODEC.key())) {
Expand Down Expand Up @@ -99,6 +100,10 @@ public BaseFileSinkConfig(@NonNull Config config) {
timeFormat =
TimeUtils.Formatter.parse(config.getString(BaseSinkConfig.TIME_FORMAT.key()));
}

if (config.hasPath(BaseSinkConfig.ENABLE_HEADER_WRITE.key())) {
enableHeaderWriter = config.getBoolean(BaseSinkConfig.ENABLE_HEADER_WRITE.key());
}
}

public BaseFileSinkConfig() {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,4 +232,10 @@ public class BaseSinkConfig {
.stringType()
.noDefaultValue()
.withDescription("To be written sheet name,only valid for excel files");

public static final Option<Boolean> ENABLE_HEADER_WRITE =
Options.key("enable_header_write")
.booleanType()
.defaultValue(false)
.withDescription("false:dont write header,true:write header");
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.seatunnel.common.utils.DateTimeUtils;
import org.apache.seatunnel.common.utils.DateUtils;
import org.apache.seatunnel.common.utils.TimeUtils;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
import org.apache.seatunnel.connectors.seatunnel.file.sink.config.FileSinkConfig;
import org.apache.seatunnel.format.text.TextSerializationSchema;
Expand All @@ -47,6 +48,8 @@ public class TextWriteStrategy extends AbstractWriteStrategy {
private final DateUtils.Formatter dateFormat;
private final DateTimeUtils.Formatter dateTimeFormat;
private final TimeUtils.Formatter timeFormat;
private final FileFormat fileFormat;
private final Boolean enableHeaderWriter;
private SerializationSchema serializationSchema;

public TextWriteStrategy(FileSinkConfig fileSinkConfig) {
Expand All @@ -58,6 +61,8 @@ public TextWriteStrategy(FileSinkConfig fileSinkConfig) {
this.dateFormat = fileSinkConfig.getDateFormat();
this.dateTimeFormat = fileSinkConfig.getDatetimeFormat();
this.timeFormat = fileSinkConfig.getTimeFormat();
this.fileFormat = fileSinkConfig.getFileFormat();
this.enableHeaderWriter = fileSinkConfig.getEnableHeaderWriter();
}

@Override
Expand Down Expand Up @@ -133,15 +138,18 @@ private FSDataOutputStream getOrCreateOutputStream(@NonNull String filePath) {
OutputStream out =
lzo.createOutputStream(fileSystemUtils.getOutputStream(filePath));
fsDataOutputStream = new FSDataOutputStream(out, null);
enableWriteHeader(fsDataOutputStream);
break;
case NONE:
fsDataOutputStream = fileSystemUtils.getOutputStream(filePath);
enableWriteHeader(fsDataOutputStream);
break;
default:
log.warn(
"Text file does not support this compress type: {}",
compressFormat.getCompressCodec());
fsDataOutputStream = fileSystemUtils.getOutputStream(filePath);
enableWriteHeader(fsDataOutputStream);
break;
}
beingWrittenOutputStream.put(filePath, fsDataOutputStream);
Expand All @@ -155,4 +163,15 @@ private FSDataOutputStream getOrCreateOutputStream(@NonNull String filePath) {
}
return fsDataOutputStream;
}

private void enableWriteHeader(FSDataOutputStream fsDataOutputStream) throws IOException {
if (enableHeaderWriter) {
fsDataOutputStream.write(
String.join(
FileFormat.CSV.equals(fileFormat) ? "," : fieldDelimiter,
seaTunnelRowType.getFieldNames())
.getBytes());
fsDataOutputStream.write(rowDelimiter.getBytes());
}
}
}
Loading
Loading