Skip to content

Commit

Permalink
[Improve][Connector-V2] Improve jdbc merge table from path and query …
Browse files Browse the repository at this point in the history
…when type is decimal (#7917)
  • Loading branch information
Hisoka-X authored Oct 30, 2024
1 parent b69fcec commit 8baa012
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -252,9 +252,9 @@ public Column convert(BasicTypeDefine typeDefine) {
? typeDefine.getLength().intValue()
: MAX_PRECISION - 4;
if (scale == null) {
builder.dataType(new DecimalType((int) precision, MAX_SCALE));
builder.dataType(new DecimalType((int) precision, 0));
builder.columnLength(precision);
builder.scale(MAX_SCALE);
builder.scale(0);
} else if (scale < 0) {
int newPrecision = (int) (precision - scale);
if (newPrecision == 1) {
Expand All @@ -277,16 +277,17 @@ public Column convert(BasicTypeDefine typeDefine) {
}
break;
case HANA_SMALLDECIMAL:
int smallDecimalScale = typeDefine.getScale() != null ? typeDefine.getScale() : 0;
if (typeDefine.getPrecision() == null) {
builder.dataType(new DecimalType(DEFAULT_PRECISION, MAX_SMALL_DECIMAL_SCALE));
builder.dataType(new DecimalType(DEFAULT_PRECISION, smallDecimalScale));
builder.columnLength((long) DEFAULT_PRECISION);
builder.scale(MAX_SMALL_DECIMAL_SCALE);
builder.scale(smallDecimalScale);
} else {
builder.dataType(
new DecimalType(
typeDefine.getPrecision().intValue(), MAX_SMALL_DECIMAL_SCALE));
typeDefine.getPrecision().intValue(), smallDecimalScale));
builder.columnLength(typeDefine.getPrecision());
builder.scale(MAX_SMALL_DECIMAL_SCALE);
builder.scale(smallDecimalScale);
}
break;
case HANA_REAL:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,10 +232,12 @@ static CatalogTable mergeCatalogTable(CatalogTable tableOfPath, CatalogTable tab
&& columnsOfPath
.get(column.getName())
.getDataType()
.getSqlType()
.equals(
columnsOfQuery
.get(column.getName())
.getDataType()))
.getDataType()
.getSqlType()))
.map(column -> columnsOfPath.get(column.getName()))
.collect(Collectors.toList());
boolean schemaIncludeAllColumns = columnsOfMerge.size() == columnKeysOfQuery.size();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ public void testConvertSmallDecimal() {
.build();
Column column = SapHanaTypeConverter.INSTANCE.convert(typeDefine);
Assertions.assertEquals(typeDefine.getName(), column.getName());
Assertions.assertEquals(new DecimalType(38, 368), column.getDataType());
Assertions.assertEquals(new DecimalType(38, 0), column.getDataType());
Assertions.assertEquals(typeDefine.getColumnType(), column.getSourceType());

typeDefine =
Expand All @@ -139,7 +139,7 @@ public void testConvertSmallDecimal() {
.build();
column = SapHanaTypeConverter.INSTANCE.convert(typeDefine);
Assertions.assertEquals(typeDefine.getName(), column.getName());
Assertions.assertEquals(new DecimalType(10, 368), column.getDataType());
Assertions.assertEquals(new DecimalType(10, 5), column.getDataType());
Assertions.assertEquals(typeDefine.getColumnType(), column.getSourceType());
}

Expand All @@ -153,7 +153,7 @@ public void testConvertDecimal() {
.build();
Column column = SapHanaTypeConverter.INSTANCE.convert(typeDefine);
Assertions.assertEquals(typeDefine.getName(), column.getName());
Assertions.assertEquals(new DecimalType(34, 6176), column.getDataType());
Assertions.assertEquals(new DecimalType(34, 0), column.getDataType());
Assertions.assertEquals(typeDefine.getColumnType(), column.getSourceType());

BasicTypeDefine<Object> typeDefine2 =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.DecimalType;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -314,4 +315,59 @@ public void testColumnNotIncludeMerge() {
tableOfQuery.getTableSchema().getColumns(),
mergeTable.getTableSchema().getColumns());
}

@Test
public void testDecimalColumnMerge() {
CatalogTable tableOfQuery =
CatalogTable.of(
TableIdentifier.of("default", null, null, "default"),
TableSchema.builder()
.column(
PhysicalColumn.of(
"f1",
new DecimalType(10, 1),
null,
true,
null,
null,
null,
false,
false,
null,
null,
null))
.build(),
Collections.emptyMap(),
Collections.emptyList(),
null);

CatalogTable tableOfPath =
CatalogTable.of(
TableIdentifier.of("default", null, null, "default"),
TableSchema.builder()
.column(
PhysicalColumn.of(
"f1",
new DecimalType(10, 2),
null,
true,
null,
null,
null,
false,
false,
null,
null,
null))
.build(),
Collections.emptyMap(),
Collections.emptyList(),
null);

CatalogTable mergeTable = JdbcCatalogUtils.mergeCatalogTable(tableOfPath, tableOfQuery);
// When column type is decimal, the precision and scale should not affect the merge result
Assertions.assertEquals(
tableOfPath.getTableSchema().getColumns().get(0),
mergeTable.getTableSchema().getColumns().get(0));
}
}

0 comments on commit 8baa012

Please sign in to comment.