Skip to content

Commit

Permalink
[Improve][SQL-Transform] add e2e test
Browse files Browse the repository at this point in the history
  • Loading branch information
liunaijie committed Mar 12, 2024
1 parent 096bd1e commit da7afc0
Show file tree
Hide file tree
Showing 4 changed files with 184 additions and 1 deletion.
60 changes: 60 additions & 0 deletions docs/en/transform-v2/sql.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ The source table name, the query SQL table name must match this field.

The query SQL, it's a simple SQL supported base function and criteria filter operation. But the complex SQL unsupported yet, include: multi source table/rows JOIN and AGGREGATE operation and the like.

the query expression can be `select [table_name.]column_a` to query the column that named `column_a`. and the table name is optional.
or `select c_row.c_inner_row.column_b` to query the inline struct column that named `column_b` within `c_row` column and `c_inner_row` column. **In this query expression, can't have table name.**

## Example

The data read from source is a table like this:
Expand Down Expand Up @@ -56,6 +59,61 @@ Then the data in result table `fake1` will update to
| 3 | Kin Dom_ | 25 |
| 4 | Joy Dom_ | 23 |

### Struct query

if your upstream data schema is like this:

```hacon
source {
FakeSource {
result_table_name = "fake"
row.num = 100
string.template = ["innerQuery"]
schema = {
fields {
name = "string"
c_date = "date"
c_row = {
c_inner_row = {
c_inner_int = "int"
c_inner_string = "string"
c_inner_timestamp = "timestamp"
c_map_1 = "map<string, string>"
c_map_2 = "map<string, map<string,string>>"
}
c_string = "string"
}
}
}
}
}
```

Those query all are valid:

```sql
select
name,
c_date,
c_row,
c_row.c_inner_row,
c_row.c_string,
c_row.c_inner_row.c_inner_int,
c_row.c_inner_row.c_inner_string,
c_row.c_inner_row.c_inner_timestamp,
c_row.c_inner_row.c_map_1,
c_row.c_inner_row.c_map_1.some_key
```

But this query are not valid:

```sql
select
c_row.c_inner_row.c_map_2.some_key.inner_map_key
```

The map must be the latest struct, can query the nesting map.

## Job Config Example

```
Expand Down Expand Up @@ -94,6 +152,8 @@ sink {

## Changelog

- Support struct query

### new version

- Add SQL Transform Connector
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@

package org.apache.seatunnel.e2e.transform;

import org.apache.seatunnel.e2e.common.container.EngineType;
import org.apache.seatunnel.e2e.common.container.TestContainer;
import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.TestTemplate;
Expand Down Expand Up @@ -58,4 +60,14 @@ public void testSQLTransform(TestContainer container) throws IOException, Interr
Container.ExecResult caseWhenSql = container.executeJob("/sql_transform/case_when.conf");
Assertions.assertEquals(0, caseWhenSql.getExitCode());
}

@TestTemplate
@DisabledOnContainer(
value = {},
type = {EngineType.SPARK})
public void testInnerQuery(TestContainer container) throws IOException, InterruptedException {
Container.ExecResult innerQuerySql =
container.executeJob("/sql_transform/inner_query.conf");
Assertions.assertEquals(0, innerQuerySql.getExitCode());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
######
###### This config file is a demonstration of streaming processing in seatunnel config
######

env {
job.mode = "BATCH"
}

source {
FakeSource {
result_table_name = "fake"
row.num = 100
string.template = ["innerQuery"]
schema = {
fields {
name = "string"
c_date = "date"
c_row = {
c_inner_row = {
c_inner_int = "int"
c_inner_string = "string"
c_inner_timestamp = "timestamp"
c_map = "map<string, string>"
}
c_string = "string"
}
}
}
}
}

transform {
Sql {
source_table_name = "fake"
result_table_name = "tmp1"
query = """select c_date,
c_row.c_string c_string,
c_row.c_inner_row.c_inner_string c_inner_string,
c_row.c_inner_row.c_inner_timestamp c_inner_timestamp,
c_row.c_inner_row.c_map.innerQuery map_val,
c_row.c_inner_row.c_map.notExistKey map_not_exist_val
from fake"""
}
}

sink {
Assert {
source_table_name = "tmp1"
rules = {
field_rules = [{
field_name = "c_date"
field_type = "date"
field_value = [
{rule_type = NOT_NULL}
]
},
{
field_name = "c_string"
field_type = "string"
field_value = [
{equals_to = "innerQuery"}
]
},
{
field_name = "c_inner_string"
field_type = "string"
field_value = [
{equals_to = "innerQuery"}
]
},
{
field_name = "c_inner_timestamp"
field_type = "timestamp"
field_value = [
{rule_type = NOT_NULL}
]
},
{
field_name = "map_val"
field_type = "string"
field_value = [
{rule_type = NOT_NULL}
]
},
{
field_name = "map_not_exist_val"
field_type = "null"
field_value = [
{rule_type = NULL}
]
}
]
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ public SeaTunnelDataType<?> getExpressionType(Expression expression) {
parRowType = (SeaTunnelRowType) filedTypeRes;
} else if (filedTypeRes instanceof MapType) {
// for map type. only support it's the latest struct.
if (i != deep - 1) {
if (i != deep - 2) {
throw new IllegalArgumentException(
"For now, we only support map struct is the latest struct in inner query function! Please modify your query!");
}
Expand Down

0 comments on commit da7afc0

Please sign in to comment.