Skip to content

Commit

Permalink
Merge branch 'apache:dev' into dev
Browse files Browse the repository at this point in the history
  • Loading branch information
zhdech authored Nov 15, 2024
2 parents 2fb9736 + 102d9f9 commit 74e0745
Show file tree
Hide file tree
Showing 4 changed files with 144 additions and 3 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@
<junit5.version>5.9.0</junit5.version>
<mockito.version>4.11.0</mockito.version>
<config.version>1.3.3</config.version>
<maven-shade-plugin.version>3.3.0</maven-shade-plugin.version>
<maven-shade-plugin.version>3.4.1</maven-shade-plugin.version>
<maven-helper-plugin.version>3.2.0</maven-helper-plugin.version>
<maven-git-commit-id-plugin.version>4.0.4</maven-git-commit-id-plugin.version>
<flatten-maven-plugin.version>1.3.0</flatten-maven-plugin.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,15 @@ public void testDynamicSingleCompileJava(TestContainer container)
Assertions.assertEquals(0, execResult.getExitCode());
}

@TestTemplate
public void testDynamicSingleCompileJavaOldVersionCompatible(TestContainer container)
throws IOException, InterruptedException {
Container.ExecResult execResult =
container.executeJob(
basePath + "single_dynamic_java_compile_transform_compatible.conf");
Assertions.assertEquals(0, execResult.getExitCode());
}

@TestTemplate
public void testDynamicMultipleCompileGroovy(TestContainer container)
throws IOException, InterruptedException {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
######
###### This config file is a demonstration of streaming processing in seatunnel config
######

env {
job.mode = "BATCH"
}

source {
FakeSource {
result_table_name = "fake"
row.num = 100
schema = {
fields {
id = "int"
name = "string"
}
}
}
}

transform {
DynamicCompile {
source_table_name = "fake"
result_table_name = "fake1"
compile_language = "JAVA"
compile_pattern = "SOURCE_CODE"
source_code = """
import org.apache.seatunnel.api.table.catalog.Column;
import org.apache.seatunnel.transform.common.SeaTunnelRowAccessor;
import org.apache.seatunnel.api.table.catalog.*;
import org.apache.seatunnel.api.table.type.*;
import java.util.ArrayList;


public Column[] getInlineOutputColumns(CatalogTable inputCatalogTable) {

ArrayList<Column> columns = new ArrayList<Column>();
PhysicalColumn destColumn =
PhysicalColumn.of(
"col1",
BasicType.STRING_TYPE,
10,
true,
"",
"");
return new Column[]{
destColumn
};

}
public Object[] getInlineOutputFieldValues(SeaTunnelRowAccessor inputRow) {

Object[] fieldValues = new Object[1];
fieldValues[0]="test1";
return fieldValues;
}
"""

}
}

sink {
Assert {
source_table_name = "fake1"
rules =
{
row_rules = [
{
rule_type = MIN_ROW
rule_value = 100
}
],
field_rules = [
{
field_name = id
field_type = int
field_value = [
{
rule_type = NOT_NULL
}
]
},
{
field_name = col1
field_type = string
field_value = [
{
rule_type = NOT_NULL
equals_to = "test1"

}

]
}
]
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.Column;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowAccessor;
import org.apache.seatunnel.common.utils.FileUtils;
import org.apache.seatunnel.common.utils.ReflectionUtils;
Expand All @@ -30,6 +31,7 @@
import org.apache.seatunnel.transform.exception.TransformException;

import java.nio.file.Paths;
import java.util.Optional;

import static org.apache.seatunnel.transform.dynamiccompile.CompileTransformErrorCode.COMPILE_TRANSFORM_ERROR_CODE;

Expand All @@ -42,6 +44,8 @@ public class DynamicCompileTransform extends MultipleFieldOutputTransform {

private final String sourceCode;

private final boolean compatibilityMode;

private final CompilePattern compilePattern;

private AbstractParse DynamicCompileParse;
Expand All @@ -68,6 +72,9 @@ public DynamicCompileTransform(ReadonlyConfig readonlyConfig, CatalogTable catal
readonlyConfig.get(
DynamicCompileTransformConfig.ABSOLUTE_PATH)));
}
compatibilityMode =
sourceCode.contains(
org.apache.seatunnel.transform.common.SeaTunnelRowAccessor.class.getName());
}

@Override
Expand Down Expand Up @@ -98,14 +105,24 @@ protected Object[] getOutputFieldValues(SeaTunnelRowAccessor inputRow) {
try {
result =
ReflectionUtils.invoke(
getCompileLanguageInstance(), getInlineOutputFieldValues, inputRow);

getCompileLanguageInstance(),
getInlineOutputFieldValues,
getCompatibilityAccessor(inputRow));
} catch (Exception e) {
throw new TransformException(COMPILE_TRANSFORM_ERROR_CODE, e.getMessage());
}
return (Object[]) result;
}

private Object getCompatibilityAccessor(SeaTunnelRowAccessor inputRow) {
if (compatibilityMode) {
Optional<Object> field = ReflectionUtils.getField(inputRow, "row");
SeaTunnelRow row = (SeaTunnelRow) field.get();
return new org.apache.seatunnel.transform.common.SeaTunnelRowAccessor(row);
}
return inputRow;
}

private Object getCompileLanguageInstance()
throws InstantiationException, IllegalAccessException {
Class<?> compileClass = DynamicCompileParse.parseClassSourceCode(sourceCode);
Expand Down

0 comments on commit 74e0745

Please sign in to comment.