Skip to content

Commit

Permalink
[Feature][transform] transform support explode (#7928)
Browse files Browse the repository at this point in the history
Co-authored-by: njh_cmss <[email protected]>
  • Loading branch information
CosmosNi and njh_cmss authored Nov 9, 2024
1 parent 79406bc commit 132278c
Show file tree
Hide file tree
Showing 29 changed files with 990 additions and 179 deletions.
31 changes: 31 additions & 0 deletions docs/en/transform-v2/sql-functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,14 @@ Example:

REPLACE(NAME, ' ')

### SPLIT

Split a string into an array.

Example:

select SPLIT(test,';') as arrays

### SOUNDEX

```SOUNDEX(string)```
Expand Down Expand Up @@ -984,3 +992,26 @@ Example:

select UUID() as seatunnel_uuid

### ARRAY

Generate an array.

Example:

select ARRAY('test1','test2','test3') as arrays


### LATERAL VIEW
#### EXPLODE

explode array column to rows.
OUTER EXPLODE will return NULL, while array is NULL or empty
EXPLODE(SPLIT(FIELD_NAME,separator))Used to split string type. The first parameter of SPLIT function is the field name, the second parameter is the separator
EXPLODE(ARRAY(value1,value2)) Used to custom array type.
```
SELECT * FROM fake
LATERAL VIEW EXPLODE ( SPLIT ( NAME, ',' ) ) AS NAME
LATERAL VIEW EXPLODE ( SPLIT ( pk_id, ';' ) ) AS pk_id
LATERAL VIEW OUTER EXPLODE ( age ) AS age
LATERAL VIEW OUTER EXPLODE ( ARRAY(1,1) ) AS num
```
32 changes: 32 additions & 0 deletions docs/zh/transform-v2/sql-functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,15 @@ REPEAT(NAME || ' ', 10)

REPLACE(NAME, ' ')


### SPLIT

将字符串切分成数组。

示例:

select SPLIT(test,';') as arrays

### SOUNDEX

```SOUNDEX(string)```
Expand Down Expand Up @@ -975,3 +984,26 @@ case when c_string in ('c_string') then 1 else 0 end

select UUID() as seatunnel_uuid


### ARRAY

生成一个数组。

示例:

select ARRAY('test1','test2','test3') as arrays

### LATERAL VIEW
#### EXPLODE

将 array 列展开成多行。
OUTER EXPLODE 当 array 为NULL或者为空时,返回NULL
EXPLODE(SPLIT(FIELD_NAME,separator))用来切分字符串类型,SPLIT 第一个参数是字段名,第二个参数是分隔符
EXPLODE(ARRAY(value1,value2)) 用于自定义数组切分,在原有基础上生成一个新的字段。
```
SELECT * FROM fake
LATERAL VIEW EXPLODE ( SPLIT ( NAME, ',' ) ) AS NAME
LATERAL VIEW EXPLODE ( SPLIT ( pk_id, ';' ) ) AS pk_id
LATERAL VIEW OUTER EXPLODE ( age ) AS age
LATERAL VIEW OUTER EXPLODE ( ARRAY(1,1) ) AS num
```
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@
<maven-scm-provider-jgit.version>2.0.0</maven-scm-provider-jgit.version>
<testcontainer.version>1.17.6</testcontainer.version>
<spotless.version>2.29.0</spotless.version>
<jsqlparser.version>4.5</jsqlparser.version>
<jsqlparser.version>4.9</jsqlparser.version>
<json-path.version>2.7.0</json-path.version>
<groovy.version>4.0.16</groovy.version>
<jetty.version>9.4.56.v20240826</jetty.version>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seatunnel.api.transform;

import java.util.List;

public interface SeaTunnelMultiRowTransform<T> extends SeaTunnelTransform<T> {

/**
* Transform input data to {@link this#getProducedCatalogTable().getSeaTunnelRowType()} types
* data.
*
* @param row the data need be transformed.
* @return transformed data.
*/
List<T> flatMap(T row);

default T map(T row) {
throw new UnsupportedOperationException("Heads-up conversion is not supported");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import net.sf.jsqlparser.statement.insert.Insert;
import net.sf.jsqlparser.statement.select.PlainSelect;
import net.sf.jsqlparser.statement.select.Select;
import net.sf.jsqlparser.statement.select.SelectExpressionItem;
import net.sf.jsqlparser.statement.select.SelectItem;

import java.nio.file.Files;
Expand Down Expand Up @@ -360,12 +359,12 @@ private static void parseInsertSql(
String sourceTableName;
String resultTableName;
if (plainSelect.getFromItem() == null) {
List<SelectItem> selectItems = plainSelect.getSelectItems();
List<SelectItem<?>> selectItems = plainSelect.getSelectItems();
if (selectItems.size() != 1) {
throw new ParserException(
"Source table must be specified in SQL: " + insertSql);
}
SelectExpressionItem selectItem = (SelectExpressionItem) selectItems.get(0);
SelectItem<?> selectItem = selectItems.get(0);
Column column = (Column) selectItem.getExpression();
sourceTableName = column.getColumnName();
resultTableName = sourceTableName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,8 @@ protected Collection<JdbcSourceSplit> createSplits(
partitionEnd = range.getRight();
}
if (partitionStart == null || partitionEnd == null) {
JdbcSourceSplit spilt = createSingleSplit(table);
return Collections.singletonList(spilt);
JdbcSourceSplit split = createSingleSplit(table);
return Collections.singletonList(split);
}

return createNumberColumnSplits(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,6 @@
import net.sf.jsqlparser.statement.select.AllColumns;
import net.sf.jsqlparser.statement.select.PlainSelect;
import net.sf.jsqlparser.statement.select.Select;
import net.sf.jsqlparser.statement.select.SelectBody;
import net.sf.jsqlparser.statement.select.SelectExpressionItem;
import net.sf.jsqlparser.statement.select.SelectItem;

import java.math.BigDecimal;
Expand Down Expand Up @@ -83,7 +81,7 @@ public static PlainSelect convertToPlainSelect(String query) {
throw new IllegalArgumentException("Only SELECT statements are supported.");
}
Select select = (Select) statement;
SelectBody selectBody = select.getSelectBody();
Select selectBody = select.getSelectBody();
if (!(selectBody instanceof PlainSelect)) {
throw new IllegalArgumentException("Only simple SELECT statements are supported.");
}
Expand All @@ -101,18 +99,15 @@ public static PlainSelect convertToPlainSelect(String query) {
public static int[] convertSqlSelectToPaimonProjectionIndex(
String[] fieldNames, PlainSelect plainSelect) {
int[] projectionIndex = null;
List<SelectItem> selectItems = plainSelect.getSelectItems();
List<SelectItem<?>> selectItems = plainSelect.getSelectItems();

List<String> columnNames = new ArrayList<>();
for (SelectItem selectItem : selectItems) {
if (selectItem instanceof AllColumns) {
if (selectItem.getExpression() instanceof AllColumns) {
return null;
} else if (selectItem instanceof SelectExpressionItem) {
SelectExpressionItem selectExpressionItem = (SelectExpressionItem) selectItem;
String columnName = selectExpressionItem.getExpression().toString();
columnNames.add(columnName);
} else {
throw new IllegalArgumentException("Error encountered parsing query fields.");
String columnName = ((Column) selectItem.getExpression()).getColumnName();
columnNames.add(columnName);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,19 @@
import org.apache.seatunnel.api.table.factory.TableTransformFactory;
import org.apache.seatunnel.api.table.factory.TableTransformFactoryContext;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.transform.SeaTunnelMultiRowTransform;
import org.apache.seatunnel.api.transform.SeaTunnelTransform;
import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
import org.apache.seatunnel.core.starter.execution.PluginUtil;
import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelFactoryDiscovery;
import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelTransformPluginDiscovery;

import org.apache.commons.collections.CollectionUtils;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.operators.StreamMap;
import org.apache.flink.util.Collector;

import java.net.URL;
import java.util.ArrayList;
Expand Down Expand Up @@ -140,6 +144,11 @@ public List<DataStreamTableInfo> execute(List<DataStreamTableInfo> upstreamDataS

protected DataStream<SeaTunnelRow> flinkTransform(
SeaTunnelTransform transform, DataStream<SeaTunnelRow> stream) {
if (transform instanceof SeaTunnelMultiRowTransform) {
return stream.flatMap(
new ArrayFlatMap(transform), TypeInformation.of(SeaTunnelRow.class));
}

return stream.transform(
String.format("%s-Transform", transform.getPluginName()),
TypeInformation.of(SeaTunnelRow.class),
Expand All @@ -151,4 +160,24 @@ protected DataStream<SeaTunnelRow> flinkTransform(
((SeaTunnelTransform<SeaTunnelRow>) transform)
.map(row))));
}

public static class ArrayFlatMap implements FlatMapFunction<SeaTunnelRow, SeaTunnelRow> {

private SeaTunnelTransform transform;

public ArrayFlatMap(SeaTunnelTransform transform) {
this.transform = transform;
}

@Override
public void flatMap(SeaTunnelRow row, Collector<SeaTunnelRow> collector) {
List<SeaTunnelRow> rows =
((SeaTunnelMultiRowTransform<SeaTunnelRow>) transform).flatMap(row);
if (CollectionUtils.isNotEmpty(rows)) {
for (SeaTunnelRow rowResult : rows) {
collector.collect(rowResult);
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.seatunnel.api.table.factory.TableTransformFactoryContext;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.transform.SeaTunnelMultiRowTransform;
import org.apache.seatunnel.api.transform.SeaTunnelTransform;
import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
import org.apache.seatunnel.core.starter.execution.PluginUtil;
Expand All @@ -35,7 +36,8 @@
import org.apache.seatunnel.translation.spark.serialization.SeaTunnelRowConverter;
import org.apache.seatunnel.translation.spark.utils.TypeConverterUtils;

import org.apache.spark.api.java.function.MapPartitionsFunction;
import org.apache.commons.collections.CollectionUtils;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
Expand All @@ -45,7 +47,6 @@

import lombok.extern.slf4j.Slf4j;

import java.io.Serializable;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -164,57 +165,49 @@ private Dataset<Row> sparkTransform(SeaTunnelTransform transform, DatasetTableIn
SeaTunnelRowConverter inputRowConverter = new SeaTunnelRowConverter(inputDataType);
SeaTunnelRowConverter outputRowConverter = new SeaTunnelRowConverter(outputDataTYpe);
ExpressionEncoder<Row> encoder = RowEncoder.apply(outputSchema);
return stream.mapPartitions(
(MapPartitionsFunction<Row, Row>)
(Iterator<Row> rowIterator) ->
new TransformIterator(
rowIterator,
transform,
outputSchema,
inputRowConverter,
outputRowConverter),

return stream.flatMap(
new TransformMapPartitionsFunction(
transform, inputRowConverter, outputRowConverter),
encoder)
.filter(Objects::nonNull);
}

private static class TransformIterator implements Iterator<Row>, Serializable {
private Iterator<Row> sourceIterator;
private static class TransformMapPartitionsFunction implements FlatMapFunction<Row, Row> {
private SeaTunnelTransform<SeaTunnelRow> transform;
private StructType structType;
private SeaTunnelRowConverter inputRowConverter;
private SeaTunnelRowConverter outputRowConverter;

public TransformIterator(
Iterator<Row> sourceIterator,
public TransformMapPartitionsFunction(
SeaTunnelTransform<SeaTunnelRow> transform,
StructType structType,
SeaTunnelRowConverter inputRowConverter,
SeaTunnelRowConverter outputRowConverter) {
this.sourceIterator = sourceIterator;
this.transform = transform;
this.structType = structType;
this.inputRowConverter = inputRowConverter;
this.outputRowConverter = outputRowConverter;
}

@Override
public boolean hasNext() {
return sourceIterator.hasNext();
}

@Override
public Row next() {
try {
Row row = sourceIterator.next();
SeaTunnelRow seaTunnelRow = inputRowConverter.unpack((GenericRowWithSchema) row);
seaTunnelRow = (SeaTunnelRow) transform.map(seaTunnelRow);
if (seaTunnelRow == null) {
return null;
public Iterator<Row> call(Row row) throws Exception {
List<Row> rows = new ArrayList<>();

SeaTunnelRow seaTunnelRow = inputRowConverter.unpack((GenericRowWithSchema) row);
if (transform instanceof SeaTunnelMultiRowTransform) {
List<SeaTunnelRow> seaTunnelRows =
((SeaTunnelMultiRowTransform<SeaTunnelRow>) transform)
.flatMap(seaTunnelRow);
if (CollectionUtils.isNotEmpty(seaTunnelRows)) {
for (SeaTunnelRow seaTunnelRowTransform : seaTunnelRows) {
rows.add(outputRowConverter.parcel(seaTunnelRowTransform));
}
}
} else {
SeaTunnelRow seaTunnelRowTransform = transform.map(seaTunnelRow);
if (seaTunnelRowTransform != null) {
rows.add(outputRowConverter.parcel(seaTunnelRowTransform));
}
return outputRowConverter.parcel(seaTunnelRow);
} catch (Exception e) {
throw new TaskExecuteException("Row convert failed, caused: " + e.getMessage(), e);
}
return rows.iterator();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,18 @@ public void testSQLTransform(TestContainer container) throws IOException, Interr
Assertions.assertEquals(0, sqlAllColumns.getExitCode());
Container.ExecResult caseWhenSql = container.executeJob("/sql_transform/case_when.conf");
Assertions.assertEquals(0, caseWhenSql.getExitCode());

Container.ExecResult execResultBySql =
container.executeJob("/sql_transform/explode_transform.conf");
Assertions.assertEquals(0, execResultBySql.getExitCode());

Container.ExecResult execResultBySqlWithoutOuter =
container.executeJob("/sql_transform/explode_transform_without_outer.conf");
Assertions.assertEquals(0, execResultBySqlWithoutOuter.getExitCode());

Container.ExecResult execResultBySqlWithOuter =
container.executeJob("/sql_transform/explode_transform_with_outer.conf");
Assertions.assertEquals(0, execResultBySqlWithOuter.getExitCode());
}

@TestTemplate
Expand Down
Loading

0 comments on commit 132278c

Please sign in to comment.