Skip to content

Commit

Permalink
[Fix][Connector-V2] Fixed iceberg sink can not handle uppercase fields (
Browse files Browse the repository at this point in the history
  • Loading branch information
sunxiaojian authored Sep 23, 2024
1 parent 9720f11 commit b7be0cb
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 77 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,6 @@
import org.apache.seatunnel.api.sink.SupportSaveMode;
import org.apache.seatunnel.api.table.catalog.Catalog;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.ConstraintKey;
import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
import org.apache.seatunnel.api.table.catalog.PrimaryKey;
import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.factory.CatalogFactory;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.common.constants.PluginType;
Expand All @@ -48,7 +44,6 @@
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;

import static org.apache.seatunnel.api.table.factory.FactoryUtil.discoverFactory;

Expand All @@ -68,7 +63,7 @@ public class IcebergSink
public IcebergSink(ReadonlyConfig pluginConfig, CatalogTable catalogTable) {
this.readonlyConfig = pluginConfig;
this.config = new SinkConfig(pluginConfig);
this.catalogTable = convertLowerCaseCatalogTable(catalogTable);
this.catalogTable = catalogTable;
// Reset primary keys if need
if (config.getPrimaryKeys().isEmpty()
&& Objects.nonNull(this.catalogTable.getTableSchema().getPrimaryKey())) {
Expand Down Expand Up @@ -138,72 +133,4 @@ public Optional<SaveModeHandler> getSaveModeHandler() {
catalogTable,
null));
}

private CatalogTable convertLowerCaseCatalogTable(CatalogTable catalogTable) {
TableSchema tableSchema = catalogTable.getTableSchema();
TableSchema.Builder builder = TableSchema.builder();
tableSchema
.getColumns()
.forEach(
column -> {
PhysicalColumn physicalColumn =
PhysicalColumn.of(
column.getName(),
column.getDataType(),
column.getColumnLength(),
column.isNullable(),
column.getDefaultValue(),
column.getComment());
builder.column(physicalColumn);
});
// set primary
if (Objects.nonNull(tableSchema.getPrimaryKey())) {
PrimaryKey newPrimaryKey =
PrimaryKey.of(
tableSchema.getPrimaryKey().getPrimaryKey(),
tableSchema.getPrimaryKey().getColumnNames().stream()
.map(String::toLowerCase)
.collect(Collectors.toList()));
builder.primaryKey(newPrimaryKey);
}

if (Objects.nonNull(tableSchema.getConstraintKeys())) {
tableSchema
.getConstraintKeys()
.forEach(
constraintKey -> {
ConstraintKey newConstraintKey =
ConstraintKey.of(
constraintKey.getConstraintType(),
constraintKey.getConstraintName(),
constraintKey.getColumnNames() != null
? constraintKey.getColumnNames().stream()
.map(
constraintKeyColumn ->
ConstraintKey
.ConstraintKeyColumn
.of(
constraintKeyColumn
.getColumnName()
!= null
? constraintKeyColumn
.getColumnName()
.toLowerCase()
: null,
constraintKeyColumn
.getSortType()))
.collect(Collectors.toList())
: null);
builder.constraintKey(newConstraintKey);
});
}

return CatalogTable.of(
catalogTable.getTableId(),
builder.build(),
catalogTable.getOptions(),
catalogTable.getPartitionKeys(),
catalogTable.getComment(),
catalogTable.getCatalogName());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,13 @@ public TaskWriter<Record> createTaskWriter(Table table, SinkConfig config) {
if (!idCols.isEmpty()) {
identifierFieldIds =
idCols.stream()
.map(colName -> table.schema().findField(colName).fieldId())
.map(
colName ->
config.isCaseSensitive()
? table.schema()
.caseInsensitiveFindField(colName)
.fieldId()
: table.schema().findField(colName).fieldId())
.collect(toSet());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -325,10 +325,10 @@ private List<Record> loadIcebergTable() {
results.add(record);
}
} catch (IOException e) {
e.printStackTrace();
log.error(e.getMessage());
}
} catch (Exception ex) {
ex.printStackTrace();
log.error(ex.getMessage());
}
return results;
}
Expand Down

0 comments on commit b7be0cb

Please sign in to comment.