diff --git a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestSQLIT.java b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestSQLIT.java index a12eabe7ef6..76344b14007 100644 --- a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestSQLIT.java +++ b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestSQLIT.java @@ -71,6 +71,12 @@ public void testSQLTransform(TestContainer container) throws IOException, Interr Container.ExecResult execResultBySqlWithOuter = container.executeJob("/sql_transform/explode_transform_with_outer.conf"); Assertions.assertEquals(0, execResultBySqlWithOuter.getExitCode()); + + Container.ExecResult arraySql = container.executeJob("/sql_transform/func_array.conf"); + Assertions.assertEquals(0, arraySql.getExitCode()); + + Container.ExecResult splitSql = container.executeJob("/sql_transform/func_split.conf"); + Assertions.assertEquals(0, splitSql.getExitCode()); } @TestTemplate diff --git a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform/func_array.conf b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform/func_array.conf new file mode 100644 index 00000000000..e5355f41e32 --- /dev/null +++ b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform/func_array.conf @@ -0,0 +1,90 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + job.mode = "BATCH" + parallelism = 1 +} + +source { + FakeSource { + result_table_name = "fake" + schema = { + fields { + pk_id = string + name = string + } + primaryKey { + name = "pk_id" + columnNames = [pk_id] + } + } + rows = [ + { + kind = INSERT + fields = ["id001", "zhangsan,zhangsan"] + } + ] + } +} + +transform { + Sql { + source_table_name = "fake" + result_table_name = "fake1" + query = "SELECT *,Array('c_1','c_2') as c_array FROM fake " + } +} + +sink{ + assert { + rules = + { + row_rules = [ + { + rule_type = MAX_ROW + rule_value = 1 + }, + { + rule_type = MIN_ROW + rule_value = 1 + } + ], + field_rules = [ + { + field_name = pk_id + field_type = string + field_value = [{equals_to = id001}] + }, + { + field_name = name + field_type = string + field_value = [{equals_to = "zhangsan,zhangsan"}] + }, + { + field_name = c_array + field_type = array + field_value = [{equals_to = ["c_1" ,"c_2"]}] + } + ] + } + } +} + diff --git a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform/func_split.conf b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform/func_split.conf new file mode 100644 index 00000000000..2438f007449 --- /dev/null +++ b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform/func_split.conf @@ -0,0 +1,85 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + job.mode = "BATCH" + parallelism = 1 +} + +source { + FakeSource { + result_table_name = "fake" + schema = { + fields { + pk_id = string + name = string + } + primaryKey { + name = "pk_id" + columnNames = [pk_id] + } + } + rows = [ + { + kind = INSERT + fields = ["id001", "zhangsan,zhangsan"] + } + ] + } +} + +transform { + Sql { + source_table_name = "fake" + result_table_name = "fake1" + query = "SELECT pk_id,SPLIT(name,',') as name FROM fake " + } +} + +sink{ + assert { + rules = + { + row_rules = [ + { + rule_type = MAX_ROW + rule_value = 1 + }, + { + rule_type = MIN_ROW + rule_value = 1 + } + ], + field_rules = [ + { + field_name = pk_id + field_type = string + field_value = [{equals_to = id001}] + }, + { + field_name = name + field_type = array + field_value = [{equals_to = ["zhangsan" ,"zhangsan"]}] + } + ] + } + } +} + diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLType.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLType.java index f07f27ebbb9..127479536ca 100644 --- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLType.java +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLType.java @@ -17,6 +17,7 @@ package org.apache.seatunnel.transform.sql.zeta; +import org.apache.seatunnel.api.table.type.ArrayType; import org.apache.seatunnel.api.table.type.BasicType; import org.apache.seatunnel.api.table.type.DecimalType; import org.apache.seatunnel.api.table.type.LocalTimeType; @@ -446,6 +447,9 @@ private SeaTunnelDataType getFunctionType(Function function) { case ZetaSQLFunction.TRUNC: case ZetaSQLFunction.TRUNCATE: return BasicType.DOUBLE_TYPE; + case ZetaSQLFunction.ARRAY: + case ZetaSQLFunction.SPLIT: + return ArrayType.STRING_ARRAY_TYPE; case ZetaSQLFunction.NOW: case ZetaSQLFunction.DATE_TRUNC: return LocalTimeType.LOCAL_DATE_TIME_TYPE; diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/functions/SystemFunction.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/functions/SystemFunction.java index 3968fbf2e76..abd6c35c9ac 100644 --- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/functions/SystemFunction.java +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/functions/SystemFunction.java @@ -24,8 +24,6 @@ import org.apache.commons.collections4.CollectionUtils; -import com.google.common.collect.Lists; - import java.math.BigDecimal; import java.math.RoundingMode; import java.nio.charset.StandardCharsets; @@ -70,11 +68,13 @@ public static Object nullif(List args) { return v1; } - public static Object array(List args) { + public static String[] array(List args) { if (CollectionUtils.isNotEmpty(args)) { - return args.toArray(); + return args.stream() + .map(obj -> obj == null ? null : obj.toString()) + .toArray(String[]::new); } - return Lists.newArrayList(); + return new String[0]; } public static Object castAs(Object arg, SeaTunnelDataType type) {