Skip to content

Commit

Permalink
[Docs] update Dynamic-compile document
Browse files Browse the repository at this point in the history
  • Loading branch information
liunaijie committed Sep 24, 2024
1 parent b7be0cb commit 257709c
Show file tree
Hide file tree
Showing 2 changed files with 228 additions and 113 deletions.
173 changes: 116 additions & 57 deletions docs/en/transform-v2/dynamic-compile.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,6 @@ If the conversion is too complex, it may affect performance
| compile_pattern | Enum | no | SOURCE_CODE |
| absolute_path | string | no | |

### source_code [string]

The code must implement two methods: getInlineOutputColumns and getInlineOutputFieldValues. getInlineOutputColumns determines the columns you want to add or convert, and the original column structure can be obtained from CatalogTable
GetInlineOutputFieldValues determines your column values. You can fulfill any of your requirements, and even complete RPC requests to obtain new values based on the original columns
If there are third-party dependency packages, please place them in ${SEATUNNEL_HOME}/lib, if you use spark or flink, you need to put it under the libs of the corresponding service.

### common options [string]

Expand All @@ -47,6 +42,28 @@ If it is a SOURCE-CODE enumeration; the SOURCE-CODE attribute is required, and t

The absolute path of Java or Groovy files on the server

### source_code [string]

The source code.

### Detail about the Source Code

In the source code, you must implement two method:
- `Column[] getInlineOutputColumns(CatalogTable inputCatalogTable)`
- `Object[] getInlineOutputFieldValues(SeaTunnelRowAccessor inputRow)`

`getInlineOutputColumns` method, input parameter is `CatalogTable`, return type is `Column[]`.
you can get the current table's schema from `CatalogTable`.
if the return column exist in current schema, then it will overwrite by returned value (field type, comment, ...), if it's a new column, it will add into current schema.

`getInlineOutputFieldValues` method, input parameter is `SeaTunnelRowAccessor`, return type is `Object[]`
You can get the record from `SeaTunnelRowAccessor`, do you own customized data process logical.
The return `Object[]` array length should match with `getInlineOutputColumns` method result's length. and the order also need be match.

If there are third-party dependency packages, please place them in ${SEATUNNEL_HOME}/lib, if you use spark or flink, you need to put it under the libs of the corresponding service.
You need restart the server to load the lib file.


## Example

The data read from source is a table like this:
Expand All @@ -55,10 +72,14 @@ The data read from source is a table like this:
|----------|-----|------|
| Joy Ding | 20 | 123 |
| May Ding | 20 | 123 |
| Kin Dom | 20 | 123 |
| Joy Dom | 20 | 123 |
| Kin Dom | 30 | 123 |
| Joy Dom | 30 | 123 |

```
Use this DynamicCompile to add a new column `compile_language`, and update the `age` field by its original value (if age = 20, update to 40)


- use groovy
```hacon
transform {
DynamicCompile {
source_table_name = "fake"
Expand All @@ -73,29 +94,50 @@ transform {
import org.apache.seatunnel.api.table.type.*;
import java.util.ArrayList;
class demo {
public Column[] getInlineOutputColumns(CatalogTable inputCatalogTable) {
List<Column> columns = new ArrayList<>();
PhysicalColumn destColumn =
PhysicalColumn.of(
"compile_language",
BasicType.STRING_TYPE,
10,
true,
"",
"");
columns.add(destColumn);
return columns.toArray(new Column[0]);
}
public Object[] getInlineOutputFieldValues(SeaTunnelRowAccessor inputRow) {
Object[] fieldValues = new Object[1];
fieldValues[0]="GROOVY"
return fieldValues;
}
public Column[] getInlineOutputColumns(CatalogTable inputCatalogTable) {
PhysicalColumn col1 =
PhysicalColumn.of(
"compile_language",
BasicType.STRING_TYPE,
10L,
true,
"",
"");
PhysicalColumn col2 =
PhysicalColumn.of(
"age",
BasicType.INT_TYPE,
0L,
false,
false,
""
);
return new Column[]{
col1, col2
};
}
public Object[] getInlineOutputFieldValues(SeaTunnelRowAccessor inputRow) {
Object[] fieldValues = new Object[2];
// get age
Object ageField = inputRow.getField(1);
fieldValues[0] = "GROOVY";
if (Integer.parseInt(ageField.toString()) == 20) {
fieldValues[1] = 40;
} else {
fieldValues[1] = ageField;
}
return fieldValues;
}
};"""
}
}
```

- use java
```hacon
transform {
DynamicCompile {
source_table_name = "fake"
Expand All @@ -108,32 +150,49 @@ transform {
import org.apache.seatunnel.api.table.catalog.*;
import org.apache.seatunnel.api.table.type.*;
import java.util.ArrayList;
public Column[] getInlineOutputColumns(CatalogTable inputCatalogTable) {
ArrayList<Column> columns = new ArrayList<Column>();
PhysicalColumn destColumn =
PhysicalColumn.of(
"compile_language",
BasicType.STRING_TYPE,
10,
true,
"",
"");
return new Column[]{
destColumn
};
}
public Object[] getInlineOutputFieldValues(SeaTunnelRowAccessor inputRow) {
Object[] fieldValues = new Object[1];
fieldValues[0]="JAVA";
return fieldValues;
}
public Column[] getInlineOutputColumns(CatalogTable inputCatalogTable) {
PhysicalColumn col1 =
PhysicalColumn.of(
"compile_language",
BasicType.STRING_TYPE,
10L,
true,
"",
"");
PhysicalColumn col2 =
PhysicalColumn.of(
"age",
BasicType.INT_TYPE,
0L,
false,
false,
""
);
return new Column[]{
col1, col2
};
}
public Object[] getInlineOutputFieldValues(SeaTunnelRowAccessor inputRow) {
Object[] fieldValues = new Object[2];
// get age
Object ageField = inputRow.getField(1);
fieldValues[0] = "JAVA";
if (Integer.parseInt(ageField.toString()) == 20) {
fieldValues[1] = 40;
} else {
fieldValues[1] = ageField;
}
return fieldValues;
}
"""
}
}
```
- use absolute path to read code
```hacon
transform {
DynamicCompile {
source_table_name = "fake"
Expand All @@ -148,21 +207,21 @@ transform {

Then the data in result table `groovy_out` will like this

| name | age | card | compile_language |
| name | age | card | compile_language |
|----------|-----|------|------------------|
| Joy Ding | 20 | 123 | GROOVY |
| May Ding | 20 | 123 | GROOVY |
| Kin Dom | 20 | 123 | GROOVY |
| Joy Dom | 20 | 123 | GROOVY |
| Joy Ding | 40 | 123 | GROOVY |
| May Ding | 40 | 123 | GROOVY |
| Kin Dom | 30 | 123 | GROOVY |
| Joy Dom | 30 | 123 | GROOVY |

Then the data in result table `java_out` will like this

| name | age | card | compile_language |
|----------|-----|------|------------------|
| Joy Ding | 20 | 123 | JAVA |
| May Ding | 20 | 123 | JAVA |
| Kin Dom | 20 | 123 | JAVA |
| Joy Dom | 20 | 123 | JAVA |
| Joy Ding | 40 | 123 | JAVA |
| May Ding | 40 | 123 | JAVA |
| Kin Dom | 30 | 123 | JAVA |
| Joy Dom | 30 | 123 | JAVA |

More complex examples can be referred to
https://github.com/apache/seatunnel/tree/dev/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf
Expand Down
Loading

0 comments on commit 257709c

Please sign in to comment.