Skip to content

Commit

Permalink
[Feature][Transform] Sql transform support array and split function (#…
Browse files Browse the repository at this point in the history
…8016)

Co-authored-by: njh_cmss <[email protected]>
  • Loading branch information
CosmosNi and njh_cmss authored Nov 13, 2024
1 parent d2576ff commit 21f7711
Show file tree
Hide file tree
Showing 5 changed files with 190 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,12 @@ public void testSQLTransform(TestContainer container) throws IOException, Interr
Container.ExecResult execResultBySqlWithOuter =
container.executeJob("/sql_transform/explode_transform_with_outer.conf");
Assertions.assertEquals(0, execResultBySqlWithOuter.getExitCode());

Container.ExecResult arraySql = container.executeJob("/sql_transform/func_array.conf");
Assertions.assertEquals(0, arraySql.getExitCode());

Container.ExecResult splitSql = container.executeJob("/sql_transform/func_split.conf");
Assertions.assertEquals(0, splitSql.getExitCode());
}

@TestTemplate
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
######
###### This config file is a demonstration of streaming processing in seatunnel config
######

env {
job.mode = "BATCH"
parallelism = 1
}

source {
FakeSource {
result_table_name = "fake"
schema = {
fields {
pk_id = string
name = string
}
primaryKey {
name = "pk_id"
columnNames = [pk_id]
}
}
rows = [
{
kind = INSERT
fields = ["id001", "zhangsan,zhangsan"]
}
]
}
}

transform {
Sql {
source_table_name = "fake"
result_table_name = "fake1"
query = "SELECT *,Array('c_1','c_2') as c_array FROM fake "
}
}

sink{
assert {
rules =
{
row_rules = [
{
rule_type = MAX_ROW
rule_value = 1
},
{
rule_type = MIN_ROW
rule_value = 1
}
],
field_rules = [
{
field_name = pk_id
field_type = string
field_value = [{equals_to = id001}]
},
{
field_name = name
field_type = string
field_value = [{equals_to = "zhangsan,zhangsan"}]
},
{
field_name = c_array
field_type = array<string>
field_value = [{equals_to = ["c_1" ,"c_2"]}]
}
]
}
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
######
###### This config file is a demonstration of streaming processing in seatunnel config
######

env {
job.mode = "BATCH"
parallelism = 1
}

source {
FakeSource {
result_table_name = "fake"
schema = {
fields {
pk_id = string
name = string
}
primaryKey {
name = "pk_id"
columnNames = [pk_id]
}
}
rows = [
{
kind = INSERT
fields = ["id001", "zhangsan,zhangsan"]
}
]
}
}

transform {
Sql {
source_table_name = "fake"
result_table_name = "fake1"
query = "SELECT pk_id,SPLIT(name,',') as name FROM fake "
}
}

sink{
assert {
rules =
{
row_rules = [
{
rule_type = MAX_ROW
rule_value = 1
},
{
rule_type = MIN_ROW
rule_value = 1
}
],
field_rules = [
{
field_name = pk_id
field_type = string
field_value = [{equals_to = id001}]
},
{
field_name = name
field_type = array<string>
field_value = [{equals_to = ["zhangsan" ,"zhangsan"]}]
}
]
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.seatunnel.transform.sql.zeta;

import org.apache.seatunnel.api.table.type.ArrayType;
import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.DecimalType;
import org.apache.seatunnel.api.table.type.LocalTimeType;
Expand Down Expand Up @@ -446,6 +447,9 @@ private SeaTunnelDataType<?> getFunctionType(Function function) {
case ZetaSQLFunction.TRUNC:
case ZetaSQLFunction.TRUNCATE:
return BasicType.DOUBLE_TYPE;
case ZetaSQLFunction.ARRAY:
case ZetaSQLFunction.SPLIT:
return ArrayType.STRING_ARRAY_TYPE;
case ZetaSQLFunction.NOW:
case ZetaSQLFunction.DATE_TRUNC:
return LocalTimeType.LOCAL_DATE_TIME_TYPE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@

import org.apache.commons.collections4.CollectionUtils;

import com.google.common.collect.Lists;

import java.math.BigDecimal;
import java.math.RoundingMode;
import java.nio.charset.StandardCharsets;
Expand Down Expand Up @@ -70,11 +68,13 @@ public static Object nullif(List<Object> args) {
return v1;
}

public static Object array(List<Object> args) {
public static String[] array(List<Object> args) {
if (CollectionUtils.isNotEmpty(args)) {
return args.toArray();
return args.stream()
.map(obj -> obj == null ? null : obj.toString())
.toArray(String[]::new);
}
return Lists.newArrayList();
return new String[0];
}

public static Object castAs(Object arg, SeaTunnelDataType<?> type) {
Expand Down

0 comments on commit 21f7711

Please sign in to comment.