Skip to content

Commit

Permalink
[Feature][Connector V2][File] Add config of 'file_filter_pattern', wh…
Browse files Browse the repository at this point in the history
…ich used for filtering files.
  • Loading branch information
FlechazoW committed Jul 27, 2023
1 parent 4b4b5f9 commit bc7469f
Show file tree
Hide file tree
Showing 25 changed files with 642 additions and 166 deletions.
5 changes: 5 additions & 0 deletions docs/en/connector-v2/source/CosFile.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ Read all the data in a split in a pollNext call. What splits are read will be sa
| schema | config | no | - |
| common-options | | no | - |
| sheet_name | string | no | - |
| file_filter_pattern | string | no | * |

### path [string]

Expand Down Expand Up @@ -247,6 +248,10 @@ Source plugin common parameters, please refer to [Source Common Options](common-

Reader the sheet of the workbook,Only used when file_format is excel.

### file_filter_pattern [string]

Filter pattern, which used for filtering files.

## Example

```hocon
Expand Down
5 changes: 5 additions & 0 deletions docs/en/connector-v2/source/HdfsFile.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ Read all the data in a split in a pollNext call. What splits are read will be sa
| schema | config | no | - |
| common-options | | no | - |
| sheet_name | string | no | - |
| file_filter_pattern | string | no | * |

### path [string]

Expand Down Expand Up @@ -245,6 +246,10 @@ Source plugin common parameters, please refer to [Source Common Options](common-

Reader the sheet of the workbook,Only used when file_format is excel.

### file_filter_pattern [string]

Filter pattern, which used for filtering files.

## Example

```hocon
Expand Down
5 changes: 5 additions & 0 deletions docs/en/connector-v2/source/LocalFile.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ Read all the data in a split in a pollNext call. What splits are read will be sa
| schema | config | no | - |
| common-options | | no | - |
| sheet_name | string | no | - |
| file_filter_pattern | string | no | * |

### path [string]

Expand Down Expand Up @@ -225,6 +226,10 @@ Source plugin common parameters, please refer to [Source Common Options](common-

Reader the sheet of the workbook,Only used when file_format is excel.

### file_filter_pattern [string]

Filter pattern, which used for filtering files.

## Example

```hocon
Expand Down
5 changes: 5 additions & 0 deletions docs/en/connector-v2/source/OssFile.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ Read all the data in a split in a pollNext call. What splits are read will be sa
| schema | config | no | - |
| common-options | | no | - |
| sheet_name | string | no | - |
| file_filter_pattern | string | no | * |

### path [string]

Expand Down Expand Up @@ -282,6 +283,10 @@ Reader the sheet of the workbook,Only used when file_format is excel.
```

### file_filter_pattern [string]

Filter pattern, which used for filtering files.

## Changelog

### 2.2.0-beta 2022-09-26
Expand Down
5 changes: 5 additions & 0 deletions docs/en/connector-v2/source/OssJindoFile.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ Read all the data in a split in a pollNext call. What splits are read will be sa
| schema | config | no | - |
| common-options | | no | - |
| sheet_name | string | no | - |
| file_filter_pattern | string | no | * |

### path [string]

Expand Down Expand Up @@ -248,6 +249,10 @@ Source plugin common parameters, please refer to [Source Common Options](common-

Reader the sheet of the workbook,Only used when file_format is excel.

### file_filter_pattern [string]

Filter pattern, which used for filtering files.

## Example

```hocon
Expand Down
5 changes: 5 additions & 0 deletions docs/en/connector-v2/source/S3File.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ Read all the data in a split in a pollNext call. What splits are read will be sa
| schema | config | no | - |
| common-options | | no | - |
| sheet_name | string | no | - |
| file_filter_pattern | string | no | * |

### path [string]

Expand Down Expand Up @@ -299,6 +300,10 @@ Reader the sheet of the workbook,Only used when file_format is excel.
```

### file_filter_pattern [string]

Filter pattern, which used for filtering files.

## Changelog

### 2.3.0-beta 2022-10-20
Expand Down
5 changes: 5 additions & 0 deletions docs/en/connector-v2/source/SftpFile.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ If you use SeaTunnel Engine, It automatically integrated the hadoop jar when you
| schema | config | no | - |
| common-options | | no | - |
| sheet_name | string | no | - |
| file_filter_pattern | string | no | * |

### host [string]

Expand Down Expand Up @@ -226,6 +227,10 @@ Source plugin common parameters, please refer to [Source Common Options](common-

Reader the sheet of the workbook,Only used when file_format is excel.

### file_filter_pattern [string]

Filter pattern, which used for filtering files.

## Example

```hocon
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,4 +112,11 @@ public class BaseSourceConfig {
.stringType()
.noDefaultValue()
.withDescription("To be read sheet name,only valid for excel files");

public static final Option<String> FILE_FILTER_PATTERN =
Options.key("file_filter_pattern")
.stringType()
.defaultValue("*")
.withDescription(
"File pattern. The connector will filter some files base on the pattern.");
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import static org.apache.parquet.avro.AvroReadSupport.READ_INT96_AS_FIXED;
import static org.apache.parquet.avro.AvroSchemaConverter.ADD_LIST_ELEMENT_RECORDS;
Expand Down Expand Up @@ -74,6 +77,8 @@ public abstract class AbstractReadStrategy implements ReadStrategy {
protected long skipHeaderNumber = BaseSourceConfig.SKIP_HEADER_ROW_NUMBER.defaultValue();
protected boolean isKerberosAuthorization = false;

protected Pattern pattern;

@Override
public void init(HadoopConf conf) {
this.hadoopConf = conf;
Expand Down Expand Up @@ -126,7 +131,7 @@ public List<String> getFileNamesByPath(HadoopConf hadoopConf, String path) throw
fileNames.addAll(getFileNamesByPath(hadoopConf, fileStatus.getPath().toString()));
continue;
}
if (fileStatus.isFile()) {
if (fileStatus.isFile() && filterFileByPattern(fileStatus)) {
// filter '_SUCCESS' file
if (!fileStatus.getPath().getName().equals("_SUCCESS")
&& !fileStatus.getPath().getName().startsWith(".")) {
Expand All @@ -146,6 +151,12 @@ public List<String> getFileNamesByPath(HadoopConf hadoopConf, String path) throw
}
}
}

if (fileNames.isEmpty()) {
throw new IllegalArgumentException(
"Got no file, please check the configuration parameters such as: [file_filter_pattern]");
}

return fileNames;
}

Expand All @@ -166,6 +177,11 @@ public void setPluginConfig(Config pluginConfig) {
if (pluginConfig.hasPath(BaseSourceConfig.READ_COLUMNS.key())) {
readColumns.addAll(pluginConfig.getStringList(BaseSourceConfig.READ_COLUMNS.key()));
}
if (pluginConfig.hasPath(BaseSourceConfig.FILE_FILTER_PATTERN.key())) {
String filterPattern =
pluginConfig.getString(BaseSourceConfig.FILE_FILTER_PATTERN.key());
this.pattern = Pattern.compile(Matcher.quoteReplacement(filterPattern));
}
}

@Override
Expand Down Expand Up @@ -214,4 +230,11 @@ protected SeaTunnelRowType mergePartitionTypes(String path, SeaTunnelRowType sea
// return merge row type
return new SeaTunnelRowType(newFieldNames, newFieldTypes);
}

protected boolean filterFileByPattern(FileStatus fileStatus) {
if (Objects.nonNull(pattern)) {
return pattern.matcher(fileStatus.getPath().getName()).matches();
}
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public OptionRule optionRule() {
.optional(BaseSourceConfig.DATE_FORMAT)
.optional(BaseSourceConfig.DATETIME_FORMAT)
.optional(BaseSourceConfig.TIME_FORMAT)
.optional(BaseSourceConfig.FILE_FILTER_PATTERN)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public OptionRule optionRule() {
.optional(BaseSourceConfig.DATE_FORMAT)
.optional(BaseSourceConfig.DATETIME_FORMAT)
.optional(BaseSourceConfig.TIME_FORMAT)
.optional(BaseSourceConfig.FILE_FILTER_PATTERN)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ public OptionRule optionRule() {
.optional(BaseSourceConfig.DATE_FORMAT)
.optional(BaseSourceConfig.DATETIME_FORMAT)
.optional(BaseSourceConfig.TIME_FORMAT)
.optional(BaseSourceConfig.FILE_FILTER_PATTERN)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public OptionRule optionRule() {
.optional(BaseSourceConfig.DATE_FORMAT)
.optional(BaseSourceConfig.DATETIME_FORMAT)
.optional(BaseSourceConfig.TIME_FORMAT)
.optional(BaseSourceConfig.FILE_FILTER_PATTERN)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public OptionRule optionRule() {
.optional(BaseSourceConfig.DATE_FORMAT)
.optional(BaseSourceConfig.DATETIME_FORMAT)
.optional(BaseSourceConfig.TIME_FORMAT)
.optional(BaseSourceConfig.FILE_FILTER_PATTERN)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public OptionRule optionRule() {
.optional(BaseSourceConfig.DATE_FORMAT)
.optional(BaseSourceConfig.DATETIME_FORMAT)
.optional(BaseSourceConfig.TIME_FORMAT)
.optional(BaseSourceConfig.FILE_FILTER_PATTERN)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ public OptionRule optionRule() {
.optional(BaseSourceConfig.DATE_FORMAT)
.optional(BaseSourceConfig.DATETIME_FORMAT)
.optional(BaseSourceConfig.TIME_FORMAT)
.optional(BaseSourceConfig.FILE_FILTER_PATTERN)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public OptionRule optionRule() {
.optional(BaseSourceConfig.DATE_FORMAT)
.optional(BaseSourceConfig.DATETIME_FORMAT)
.optional(BaseSourceConfig.TIME_FORMAT)
.optional(BaseSourceConfig.FILE_FILTER_PATTERN)
.build();
}

Expand Down
Loading

0 comments on commit bc7469f

Please sign in to comment.