Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feature][jdbc] Jdbc database support identifier #5089

Merged
merged 21 commits into from
Sep 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions docs/en/connector-v2/sink/Jdbc.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ support `Xa transactions`. You can set `is_exactly_once=true` to enable it.
| max_commit_attempts | Int | No | 3 |
| transaction_timeout_sec | Int | No | -1 |
| auto_commit | Boolean | No | true |
| field_ide | String | No | - |
| common-options | | no | - |

### driver [string]
Expand Down Expand Up @@ -136,6 +137,12 @@ exactly-once semantics

Automatic transaction commit is enabled by default

### field_ide [String]

The field "field_ide" is used to identify whether the field needs to be converted to uppercase or lowercase when
synchronizing from the source to the sink. "ORIGINAL" indicates no conversion is needed, "UPPERCASE" indicates
conversion to uppercase, and "LOWERCASE" indicates conversion to lowercase.

### common options

Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details
Expand Down
2 changes: 2 additions & 0 deletions docs/en/connector-v2/sink/Mysql.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ semantics (using XA transaction guarantee).
| max_commit_attempts | Int | No | 3 | The number of retries for transaction commit failures |
| transaction_timeout_sec | Int | No | -1 | The timeout after the transaction is opened, the default is -1 (never timeout). Note that setting the timeout may affect<br/>exactly-once semantics |
| auto_commit | Boolean | No | true | Automatic transaction commit is enabled by default |
| field_ide | String | No | - | Identify whether the field needs to be converted when synchronizing from the source to the sink. `ORIGINAL` indicates no conversion is needed;`UPPERCASE` indicates conversion to uppercase;`LOWERCASE` indicates conversion to lowercase. |
| common-options | | no | - | Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details |

### Tips
Expand Down Expand Up @@ -191,6 +192,7 @@ sink {
database = test
table = sink_table
primary_keys = ["id","name"]
field_ide = UPPERCASE
}
}
```
Expand Down
2 changes: 2 additions & 0 deletions docs/en/connector-v2/sink/PostgreSql.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ semantics (using XA transaction guarantee).
| max_commit_attempts | Int | No | 3 | The number of retries for transaction commit failures |
| transaction_timeout_sec | Int | No | -1 | The timeout after the transaction is opened, the default is -1 (never timeout). Note that setting the timeout may affect<br/>exactly-once semantics |
| auto_commit | Boolean | No | true | Automatic transaction commit is enabled by default |
| field_ide | String | No | - | Identify whether the field needs to be converted when synchronizing from the source to the sink. `ORIGINAL` indicates no conversion is needed;`UPPERCASE` indicates conversion to uppercase;`LOWERCASE` indicates conversion to lowercase. |
| common-options | | no | - | Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details |

### Tips
Expand Down Expand Up @@ -197,6 +198,7 @@ sink {
database = test
table = sink_table
primary_keys = ["id","name"]
field_ide = UPPERCASE
}
}
```
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dialectenum.FieldIdeEnum;

import java.math.BigDecimal;
import java.util.List;
Expand Down Expand Up @@ -154,4 +155,10 @@ public interface JdbcOptions {
.intType()
.noDefaultValue()
.withDescription("partition num");

Option<FieldIdeEnum> FIELD_IDE =
Options.key("field_ide")
hailin0 marked this conversation as resolved.
Show resolved Hide resolved
.enumType(FieldIdeEnum.class)
.noDefaultValue()
.withDescription("Whether case conversion is required");
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dialectenum.FieldIdeEnum;

import org.apache.commons.lang3.StringUtils;

import java.io.Serializable;
import java.sql.Connection;
Expand Down Expand Up @@ -68,9 +71,13 @@ default String hashModForField(String fieldName, int mod) {
default String quoteIdentifier(String identifier) {
return identifier;
}
/** Quotes the identifier for database name or field name */
default String quoteDatabaseIdentifier(String identifier) {
return identifier;
}

default String tableIdentifier(String database, String tableName) {
return quoteIdentifier(database) + "." + quoteIdentifier(tableName);
return quoteDatabaseIdentifier(database) + "." + quoteIdentifier(tableName);
}

/**
Expand Down Expand Up @@ -219,4 +226,18 @@ default ResultSetMetaData getResultSetMetaData(
default String extractTableName(TablePath tablePath) {
return tablePath.getSchemaAndTableName();
}

default String getFieldIde(String identifier, String fieldIde) {
if (StringUtils.isEmpty(fieldIde)) {
return identifier;
}
switch (FieldIdeEnum.valueOf(fieldIde.toUpperCase())) {
case LOWERCASE:
return identifier.toLowerCase();
case UPPERCASE:
return identifier.toUpperCase();
default:
return identifier;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public interface JdbcDialectFactory {
* @param compatibleMode The compatible mode
* @return a new instance of {@link JdbcDialect}
*/
default JdbcDialect create(String compatibleMode) {
default JdbcDialect create(String compatibleMode, String fieldId) {
return create();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ public final class JdbcDialectLoader {

private JdbcDialectLoader() {}

public static JdbcDialect load(String url, String compatibleMode) {
return load(url, compatibleMode, "");
}

/**
* Loads the unique JDBC Dialect that can handle the given database url.
*
Expand All @@ -45,7 +49,7 @@ private JdbcDialectLoader() {}
* unambiguously process the given database URL.
* @return The loaded dialect.
*/
public static JdbcDialect load(String url, String compatibleMode) {
public static JdbcDialect load(String url, String compatibleMode, String fieldIde) {
ClassLoader cl = Thread.currentThread().getContextClassLoader();
List<JdbcDialectFactory> foundFactories = discoverFactories(cl);

Expand Down Expand Up @@ -90,7 +94,7 @@ public static JdbcDialect load(String url, String compatibleMode) {
.collect(Collectors.joining("\n"))));
}

return matchingFactories.get(0).create(compatibleMode);
return matchingFactories.get(0).create(compatibleMode, fieldIde);
}

private static List<JdbcDialectFactory> discoverFactories(ClassLoader classLoader) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dialectenum;
hailin0 marked this conversation as resolved.
Show resolved Hide resolved

public enum FieldIdeEnum {
ORIGINAL("original"), // Original string form
UPPERCASE("uppercase"), // Convert to uppercase
LOWERCASE("lowercase"); // Convert to lowercase

private final String value;

FieldIdeEnum(String value) {
this.value = value;
}

public String getValue() {
return value;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@

import com.google.auto.service.AutoService;

import javax.annotation.Nonnull;

/** Factory for {@link MysqlDialect}. */
@AutoService(JdbcDialectFactory.class)
public class MySqlDialectFactory implements JdbcDialectFactory {
Expand All @@ -34,4 +36,9 @@ public boolean acceptsURL(String url) {
public JdbcDialect create() {
return new MysqlDialect();
}

@Override
public JdbcDialect create(@Nonnull String compatibleMode, String fieldIde) {
return new MysqlDialect(fieldIde);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dialectenum.FieldIdeEnum;

import java.sql.Connection;
import java.sql.PreparedStatement;
Expand All @@ -31,6 +32,14 @@
import java.util.stream.Collectors;

public class MysqlDialect implements JdbcDialect {
public String fieldIde = FieldIdeEnum.ORIGINAL.getValue();

public MysqlDialect() {}

public MysqlDialect(String fieldIde) {
this.fieldIde = fieldIde;
}

@Override
public String dialectName() {
return "MySQL";
Expand All @@ -48,6 +57,11 @@ public JdbcDialectTypeMapper getJdbcDialectTypeMapper() {

@Override
public String quoteIdentifier(String identifier) {
return "`" + getFieldIde(identifier, fieldIde) + "`";
}

@Override
public String quoteDatabaseIdentifier(String identifier) {
return "`" + identifier + "`";
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public JdbcDialect create() {
}

@Override
public JdbcDialect create(@Nonnull String compatibleMode) {
public JdbcDialect create(@Nonnull String compatibleMode, String fieldIde) {
if ("oracle".equalsIgnoreCase(compatibleMode)) {
return new OracleDialect();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dialectenum.FieldIdeEnum;

import java.sql.Connection;
import java.sql.PreparedStatement;
Expand All @@ -33,6 +34,13 @@
public class OracleDialect implements JdbcDialect {

private static final int DEFAULT_ORACLE_FETCH_SIZE = 128;
public String fieldIde = FieldIdeEnum.ORIGINAL.getValue();

public OracleDialect(String fieldIde) {
this.fieldIde = fieldIde;
}

public OracleDialect() {}

@Override
public String dialectName() {
Expand All @@ -56,7 +64,18 @@ public JdbcDialectTypeMapper getJdbcDialectTypeMapper() {

@Override
public String quoteIdentifier(String identifier) {
return identifier;
if (identifier.contains(".")) {
String[] parts = identifier.split("\\.");
StringBuilder sb = new StringBuilder();
for (int i = 0; i < parts.length - 1; i++) {
sb.append("\"").append(parts[i]).append("\"").append(".");
}
return sb.append("\"")
.append(getFieldIde(parts[parts.length - 1], fieldIde))
.append("\"")
.toString();
}
return "\"" + getFieldIde(identifier, fieldIde) + "\"";
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@

import com.google.auto.service.AutoService;

import javax.annotation.Nonnull;

/** Factory for {@link OracleDialect}. */
@AutoService(JdbcDialectFactory.class)
public class OracleDialectFactory implements JdbcDialectFactory {
Expand All @@ -34,4 +36,9 @@ public boolean acceptsURL(String url) {
public JdbcDialect create() {
return new OracleDialect();
}

@Override
public JdbcDialect create(@Nonnull String compatibleMode, String fieldIde) {
return new OracleDialect(fieldIde);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dialectenum.FieldIdeEnum;

import java.sql.Connection;
import java.sql.PreparedStatement;
Expand All @@ -33,6 +34,14 @@ public class PostgresDialect implements JdbcDialect {

public static final int DEFAULT_POSTGRES_FETCH_SIZE = 128;

public String fieldIde = FieldIdeEnum.ORIGINAL.getValue();

public PostgresDialect() {}

public PostgresDialect(String fieldIde) {
this.fieldIde = fieldIde;
}

@Override
public String dialectName() {
return "PostgreSQL";
Expand Down Expand Up @@ -88,4 +97,32 @@ public PreparedStatement creatPreparedStatement(
}
return statement;
}

@Override
public String tableIdentifier(String database, String tableName) {
// resolve pg database name upper or lower not recognised
return quoteDatabaseIdentifier(database) + "." + quoteIdentifier(tableName);
}

@Override
public String quoteIdentifier(String identifier) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add test case for this new feature.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done! e2e has been added

if (identifier.contains(".")) {
String[] parts = identifier.split("\\.");
StringBuilder sb = new StringBuilder();
for (int i = 0; i < parts.length - 1; i++) {
sb.append("\"").append(parts[i]).append("\"").append(".");
}
return sb.append("\"")
.append(getFieldIde(parts[parts.length - 1], fieldIde))
.append("\"")
.toString();
}

return "\"" + getFieldIde(identifier, fieldIde) + "\"";
}

@Override
public String quoteDatabaseIdentifier(String identifier) {
return "\"" + identifier + "\"";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,10 @@ public JdbcDialect create() {
}

@Override
public JdbcDialect create(@Nonnull String compatibleMode) {
public JdbcDialect create(@Nonnull String compatibleMode, String fieldIde) {
if ("postgresLow".equalsIgnoreCase(compatibleMode)) {
return new PostgresLowDialect();
return new PostgresLowDialect(fieldIde);
}
return new PostgresDialect();
return new PostgresDialect(fieldIde);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@
import java.util.Optional;

public class PostgresLowDialect extends PostgresDialect {

public PostgresLowDialect(String fieldIde) {
this.fieldIde = fieldIde;
}

@Override
public Optional<String> getUpsertStatement(
String database, String tableName, String[] fieldNames, String[] uniqueKeyFields) {
Expand Down
Loading
Loading