columnNames,
throw new UnsupportedOperationException("UPSERT is not supported for " + getName());
}
- /**
- * Returns a bare identifier string by removing wrapping escape characters
- * from identifier strings such as table and column names.
- * The default implementation of this method removes double quotes.
- * If the target database engine supports different escape characters, then its DatabaseAdapter implementation should override
- * this method so that such escape characters can be removed properly.
- *
- * @param identifier An identifier which may be wrapped with escape characters
- * @return An unwrapped identifier string, or null if the input identifier is null
- */
- default String unwrapIdentifier(String identifier) {
- return identifier == null ? null : identifier.replaceAll("\"", "");
- }
-
default String getTableAliasClause(String tableName) {
return "AS " + tableName;
}
+ /**
+ * Table Quote String usage limited to statement generation methods within DatabaseAdapter
+ *
+ * @return Table Quote String
+ */
default String getTableQuoteString() {
// ANSI standard is a double quote
return "\"";
diff --git a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/DatabaseAdapterDescriptor.java b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/DatabaseAdapterDescriptor.java
new file mode 100644
index 000000000000..d3d5bac52207
--- /dev/null
+++ b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/DatabaseAdapterDescriptor.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.db;
+
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.database.dialect.service.api.DatabaseDialectService;
+import org.apache.nifi.processors.standard.db.impl.DatabaseDialectServiceDatabaseAdapter;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.ServiceLoader;
+
+public class DatabaseAdapterDescriptor {
+
+ private static final List databaseTypes = new ArrayList<>();
+
+ private static final Map databaseAdapters = new HashMap<>();
+
+ static {
+ final ServiceLoader loader = ServiceLoader.load(DatabaseAdapter.class);
+ loader.forEach(databaseAdapter -> {
+ final String name = databaseAdapter.getName();
+ final String description = databaseAdapter.getDescription();
+ final AllowableValue databaseType = new AllowableValue(name, name, description);
+ databaseTypes.add(databaseType);
+ databaseAdapters.put(name, databaseAdapter);
+ });
+ }
+
+ public static PropertyDescriptor getDatabaseDialectServiceDescriptor(final PropertyDescriptor dependsOnPropertyDescriptor) {
+ return new PropertyDescriptor.Builder()
+ .name("Database Dialect Service")
+ .description("Database Dialect Service for generating statements specific to a particular service or vendor.")
+ .identifiesControllerService(DatabaseDialectService.class)
+ .required(true)
+ .dependsOn(dependsOnPropertyDescriptor, DatabaseDialectServiceDatabaseAdapter.NAME)
+ .build();
+ }
+
+ public static PropertyDescriptor getDatabaseTypeDescriptor(final String propertyName) {
+ return new PropertyDescriptor.Builder()
+ .name(propertyName)
+ .displayName("Database Type")
+ .description("""
+ Database Type for generating statements specific to a particular service or vendor.
+ The Generic Type supports most cases but selecting a specific type enables optimal processing
+ or additional features.
+ """
+ )
+ .allowableValues(databaseTypes.toArray(new AllowableValue[0]))
+ .defaultValue("Generic")
+ .required(true)
+ .build();
+ }
+
+ public static DatabaseAdapter getDatabaseAdapter(final String databaseType) {
+ Objects.requireNonNull(databaseType, "Database Type required");
+ return databaseAdapters.get(databaseType);
+ }
+}
diff --git a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/DatabaseAdapterDatabaseDialectService.java b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/DatabaseAdapterDatabaseDialectService.java
new file mode 100644
index 000000000000..df8e947b9a26
--- /dev/null
+++ b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/DatabaseAdapterDatabaseDialectService.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.db.impl;
+
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.database.dialect.service.api.ColumnDefinition;
+import org.apache.nifi.database.dialect.service.api.DatabaseDialectService;
+import org.apache.nifi.database.dialect.service.api.PageRequest;
+import org.apache.nifi.database.dialect.service.api.QueryClause;
+import org.apache.nifi.database.dialect.service.api.QueryClauseType;
+import org.apache.nifi.database.dialect.service.api.QueryStatementRequest;
+import org.apache.nifi.database.dialect.service.api.StandardStatementResponse;
+import org.apache.nifi.database.dialect.service.api.StatementRequest;
+import org.apache.nifi.database.dialect.service.api.StatementResponse;
+import org.apache.nifi.database.dialect.service.api.StatementType;
+import org.apache.nifi.database.dialect.service.api.TableDefinition;
+import org.apache.nifi.processors.standard.db.ColumnDescription;
+import org.apache.nifi.processors.standard.db.DatabaseAdapter;
+import org.apache.nifi.processors.standard.db.DatabaseAdapterDescriptor;
+import org.apache.nifi.processors.standard.db.TableSchema;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Transitional implementation of Database Dialect Service bridging to existing Database Adapters
+ */
+public class DatabaseAdapterDatabaseDialectService extends AbstractControllerService implements DatabaseDialectService {
+ private static final char SPACE_SEPARATOR = ' ';
+
+ private static final char COMMA_SEPARATOR = ',';
+
+ private static final int COLUMN_SIZE_IGNORED = -1;
+
+ private static final String DOUBLE_QUOTE = "\"";
+
+ private final DatabaseAdapter databaseAdapter;
+
+ private final Set supportedStatementTypes;
+
+ public DatabaseAdapterDatabaseDialectService(final String databaseType) {
+ Objects.requireNonNull(databaseType, "Database Type required");
+ databaseAdapter = DatabaseAdapterDescriptor.getDatabaseAdapter(databaseType);
+ Objects.requireNonNull(databaseAdapter, "Database Adapter required");
+
+ final Set statementTypes = new LinkedHashSet<>();
+ statementTypes.add(StatementType.ALTER);
+ statementTypes.add(StatementType.CREATE);
+ statementTypes.add(StatementType.SELECT);
+
+ if (databaseAdapter.supportsInsertIgnore()) {
+ statementTypes.add(StatementType.INSERT_IGNORE);
+ }
+ if (databaseAdapter.supportsUpsert()) {
+ statementTypes.add(StatementType.UPSERT);
+ }
+ supportedStatementTypes = Collections.unmodifiableSet(statementTypes);
+ }
+
+ @Override
+ public StatementResponse getStatement(final StatementRequest statementRequest) {
+ final StatementType statementType = statementRequest.statementType();
+
+ final TableDefinition tableDefinition = statementRequest.tableDefinition();
+ final List columnNames = tableDefinition.columns()
+ .stream()
+ .map(ColumnDefinition::columnName)
+ .toList();
+ final List primaryKeyColumnNames = tableDefinition.columns()
+ .stream()
+ .filter(ColumnDefinition::primaryKey)
+ .map(ColumnDefinition::columnName)
+ .toList();
+ final List columnDescriptions = getColumnDescriptions(tableDefinition);
+
+ final String sql;
+
+ if (StatementType.ALTER == statementType) {
+ final List statements = databaseAdapter.getAlterTableStatements(tableDefinition.tableName(), columnDescriptions, true, true);
+ sql = statements.getFirst();
+ } else if (StatementType.CREATE == statementType) {
+ final TableSchema tableSchema = getTableSchema(tableDefinition);
+ sql = databaseAdapter.getCreateTableStatement(tableSchema, false, false);
+ } else if (StatementType.UPSERT == statementType) {
+ sql = databaseAdapter.getUpsertStatement(tableDefinition.tableName(), columnNames, primaryKeyColumnNames);
+ } else if (StatementType.INSERT_IGNORE == statementType) {
+ sql = databaseAdapter.getInsertIgnoreStatement(tableDefinition.tableName(), columnNames, primaryKeyColumnNames);
+ } else if (StatementType.SELECT == statementType) {
+ sql = getSelectStatement(statementRequest);
+ } else {
+ throw new UnsupportedOperationException("Statement Type [%s] not supported".formatted(statementType));
+ }
+
+ return new StandardStatementResponse(sql);
+ }
+
+ @Override
+ public Set getSupportedStatementTypes() {
+ return supportedStatementTypes;
+ }
+
+ private String getSelectStatement(final StatementRequest statementRequest) {
+ if (statementRequest instanceof QueryStatementRequest queryStatementRequest) {
+ final TableDefinition tableDefinition = statementRequest.tableDefinition();
+ final String qualifiedTableName = tableDefinition.tableName();
+ final Optional derivedTableFound = queryStatementRequest.derivedTable();
+
+ final Optional whereQueryClause = queryStatementRequest.queryClauses().stream()
+ .filter(queryClause -> QueryClauseType.WHERE == queryClause.queryClauseType())
+ .findFirst();
+ final Optional orderByQueryClause = queryStatementRequest.queryClauses().stream()
+ .filter(queryClause -> QueryClauseType.ORDER_BY == queryClause.queryClauseType())
+ .findFirst();
+
+ final String selectTableSql;
+ if (derivedTableFound.isPresent()) {
+ final String derivedTable = derivedTableFound.get();
+ final String tableAlias = databaseAdapter.getTableAliasClause(qualifiedTableName);
+ selectTableSql = "SELECT * FROM (%s) %s".formatted(derivedTable, tableAlias);
+ } else {
+ final String tableColumns = getSelectTableColumns(tableDefinition.columns());
+
+ final Optional pageRequestFound = queryStatementRequest.pageRequest();
+ final Long limit;
+ final Long offset;
+ final String indexColumnName;
+ if (pageRequestFound.isPresent()) {
+ final PageRequest pageRequest = pageRequestFound.get();
+ limit = pageRequest.limit().isPresent() ? pageRequest.limit().getAsLong() : null;
+ offset = pageRequest.offset();
+ indexColumnName = pageRequest.indexColumnName().orElse(null);
+ } else {
+ limit = null;
+ offset = null;
+ indexColumnName = null;
+ }
+
+ final String whereSql = whereQueryClause.map(QueryClause::criteria).orElse(null);
+ final String orderBySql = orderByQueryClause.map(QueryClause::criteria).orElse(null);
+
+ selectTableSql = databaseAdapter.getSelectStatement(qualifiedTableName, tableColumns, whereSql, orderBySql, limit, offset, indexColumnName);
+ }
+
+ return selectTableSql;
+ } else {
+ throw new IllegalArgumentException("Query Statement Request not found [%s]".formatted(statementRequest.getClass()));
+ }
+ }
+
+ private String getSelectTableColumns(final List columnDefinitions) {
+ final StringBuilder tableColumns = new StringBuilder();
+
+ final Iterator columns = columnDefinitions.iterator();
+ while (columns.hasNext()) {
+ final ColumnDefinition columnDefinition = columns.next();
+ final String columnName = columnDefinition.columnName();
+ tableColumns.append(columnName);
+
+ if (columns.hasNext()) {
+ tableColumns.append(COMMA_SEPARATOR);
+ tableColumns.append(SPACE_SEPARATOR);
+ }
+ }
+
+ return tableColumns.toString();
+ }
+
+ private List getColumnDescriptions(final TableDefinition tableDefinition) {
+ return tableDefinition.columns().stream().map(columnDefinition ->
+ new ColumnDescription(
+ columnDefinition.columnName(),
+ columnDefinition.dataType(),
+ columnDefinition.primaryKey(),
+ COLUMN_SIZE_IGNORED,
+ columnDefinition.nullable() == ColumnDefinition.Nullable.YES
+ )
+ ).toList();
+ }
+
+ private TableSchema getTableSchema(final TableDefinition tableDefinition) {
+ final List columnDescriptions = getColumnDescriptions(tableDefinition);
+ final Set primaryKeyColumnNames = tableDefinition.columns().stream()
+ .filter(ColumnDefinition::primaryKey)
+ .map(ColumnDefinition::columnName)
+ .collect(Collectors.toUnmodifiableSet());
+
+ return new TableSchema(
+ tableDefinition.catalog().orElse(null),
+ tableDefinition.schemaName().orElse(null),
+ tableDefinition.tableName(),
+ columnDescriptions,
+ false,
+ null,
+ primaryKeyColumnNames,
+ DOUBLE_QUOTE
+ );
+ }
+}
diff --git a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/DatabaseDialectServiceDatabaseAdapter.java b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/DatabaseDialectServiceDatabaseAdapter.java
new file mode 100644
index 000000000000..6cb1fb06d299
--- /dev/null
+++ b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/DatabaseDialectServiceDatabaseAdapter.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.db.impl;
+
+/**
+ * Placeholder implementation to bridge between historical Database Adapter and Database Dialect Service
+ */
+public class DatabaseDialectServiceDatabaseAdapter extends GenericDatabaseAdapter {
+ public static final String NAME = "Database Dialect Service";
+
+ @Override
+ public String getName() {
+ return NAME;
+ }
+
+ @Override
+ public String getDescription() {
+ return "Requires configuring a Database Dialect Service for SQL statements";
+ }
+}
diff --git a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/MSSQLDatabaseAdapter.java b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/MSSQLDatabaseAdapter.java
index 851268eaecaf..6536709ecb0d 100644
--- a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/MSSQLDatabaseAdapter.java
+++ b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/MSSQLDatabaseAdapter.java
@@ -103,12 +103,6 @@ public String getSelectStatement(String tableName, String columnNames, String wh
return query.toString();
}
- @Override
- public String unwrapIdentifier(String identifier) {
- // Remove double quotes and square brackets.
- return identifier == null ? null : identifier.replaceAll("[\"\\[\\]]", "");
- }
-
@Override
public List getAlterTableStatements(final String tableName, final List columnsToAdd, final boolean quoteTableName, final boolean quoteColumnNames) {
List columnsAndDatatypes = new ArrayList<>(columnsToAdd.size());
diff --git a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/MySQLDatabaseAdapter.java b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/MySQLDatabaseAdapter.java
index 6e9e31732278..6b76b29a1e26 100644
--- a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/MySQLDatabaseAdapter.java
+++ b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/MySQLDatabaseAdapter.java
@@ -51,12 +51,6 @@ public String getDescription() {
return "Generates MySQL compatible SQL";
}
- @Override
- public String unwrapIdentifier(String identifier) {
- // Removes double quotes and back-ticks.
- return identifier == null ? null : identifier.replaceAll("[\"`]", "");
- }
-
@Override
public boolean supportsUpsert() {
return true;
@@ -67,17 +61,6 @@ public boolean supportsInsertIgnore() {
return true;
}
- /**
- * Tells How many times the column values need to be inserted into the prepared statement. Some DBs (such as MySQL) need the values specified twice in the statement,
- * some need only to specify them once.
- *
- * @return An integer corresponding to the number of times to insert column values into the prepared statement for UPSERT, or -1 if upsert is not supported.
- */
- @Override
- public int getTimesToAddColumnObjectsForUpsert() {
- return 2;
- }
-
@Override
public String getUpsertStatement(String table, List columnNames, Collection uniqueKeyColumnNames) {
if (StringUtils.isEmpty(table)) {
diff --git a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processors.standard.db.DatabaseAdapter b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processors.standard.db.DatabaseAdapter
index 641223d21bbf..c0ba77b921c4 100644
--- a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processors.standard.db.DatabaseAdapter
+++ b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processors.standard.db.DatabaseAdapter
@@ -12,6 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+org.apache.nifi.processors.standard.db.impl.DatabaseDialectServiceDatabaseAdapter
org.apache.nifi.processors.standard.db.impl.GenericDatabaseAdapter
org.apache.nifi.processors.standard.db.impl.OracleDatabaseAdapter
org.apache.nifi.processors.standard.db.impl.Oracle12DatabaseAdapter
diff --git a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordIT.java b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordIT.java
index 1c2278e7542b..a0dbc204ac4a 100644
--- a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordIT.java
+++ b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordIT.java
@@ -56,6 +56,16 @@ public class PutDatabaseRecordIT {
private final long NANOS_AFTER_SECOND = 351567000L;
private final Instant INSTANT_MICROS_PRECISION = Instant.ofEpochMilli(MILLIS_TIMESTAMP_LONG).plusNanos(NANOS_AFTER_SECOND).minusMillis(MILLIS_TIMESTAMP_LONG % 1000);
+ private static final String SIMPLE_INPUT_RECORD = """
+ {
+ "name": "John Doe",
+ "age": 50,
+ "favorite_color": "blue"
+ }
+ """;
+
+ private static final String FAVORITE_COLOR_FIELD = "favorite_color";
+ private static final String FAVORITE_COLOR = "blue";
private static PostgreSQLContainer> postgres;
private TestRunner runner;
@@ -106,18 +116,36 @@ public void setup() throws InitializationException, SQLException {
@Test
public void testSimplePut() throws SQLException {
- runner.enqueue("""
- {
- "name": "John Doe",
- "age": 50,
- "favorite_color": "blue"
- }
- """);
+ runner.enqueue(SIMPLE_INPUT_RECORD);
+ runner.run();
+ runner.assertAllFlowFilesTransferred(PutDatabaseRecord.REL_SUCCESS, 1);
+
+ final Map results = getResults();
+ assertEquals(FAVORITE_COLOR, results.get(FAVORITE_COLOR_FIELD));
+ }
+
+ @Test
+ public void testUpsert() throws SQLException {
+ runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, "UPSERT");
+
+ runner.enqueue(SIMPLE_INPUT_RECORD);
+ runner.run();
+ runner.assertAllFlowFilesTransferred(PutDatabaseRecord.REL_SUCCESS, 1);
+
+ final Map results = getResults();
+ assertEquals(FAVORITE_COLOR, results.get(FAVORITE_COLOR_FIELD));
+ }
+
+ @Test
+ public void testInsertIgnore() throws SQLException {
+ runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, "INSERT_IGNORE");
+
+ runner.enqueue(SIMPLE_INPUT_RECORD);
runner.run();
runner.assertAllFlowFilesTransferred(PutDatabaseRecord.REL_SUCCESS, 1);
final Map results = getResults();
- assertEquals("blue", results.get("favorite_color"));
+ assertEquals(FAVORITE_COLOR, results.get(FAVORITE_COLOR_FIELD));
}
@Test
diff --git a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableIT.java b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableIT.java
index 6173d9ee51b4..aefa4076812e 100644
--- a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableIT.java
+++ b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableIT.java
@@ -18,17 +18,11 @@
import org.apache.nifi.dbcp.DBCPConnectionPool;
import org.apache.nifi.dbcp.utils.DBCPProperties;
-import org.apache.nifi.processors.standard.db.DatabaseAdapter;
-import org.apache.nifi.processors.standard.db.impl.PostgreSQLDatabaseAdapter;
import org.apache.nifi.reporting.InitializationException;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.Test;
import org.testcontainers.containers.PostgreSQLContainer;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertThrows;
-
public class QueryDatabaseTableIT extends QueryDatabaseTableTest {
private static PostgreSQLContainer> postgres;
@@ -48,8 +42,8 @@ public static void cleanUpAfterClass() {
}
@Override
- public DatabaseAdapter createDatabaseAdapter() {
- return new PostgreSQLDatabaseAdapter();
+ public String getDatabaseType() {
+ return "PostgreSQL";
}
@Override
@@ -62,13 +56,4 @@ public void createDbcpControllerService() throws InitializationException {
runner.setProperty(connectionPool, DBCPProperties.DB_DRIVERNAME, postgres.getDriverClassName());
runner.enableControllerService(connectionPool);
}
-
- @Test
- public void testAddedRowsAutoCommitTrue() {
- // this test in the base class is not valid for PostgreSQL so check the validation error message.
- final AssertionError assertionError = assertThrows(AssertionError.class, super::testAddedRowsAutoCommitTrue);
- assertEquals(assertionError.getMessage(), "Processor has 1 validation failures:\n" +
- "'Set Auto Commit' validated against 'true' is invalid because 'Set Auto Commit' " +
- "must be set to 'false' because 'PostgreSQL' Database Type requires it to be 'false'\n");
- }
}
diff --git a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableRecordIT.java b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableRecordIT.java
index a8e57c3d59fe..391cead1b5f2 100644
--- a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableRecordIT.java
+++ b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableRecordIT.java
@@ -18,17 +18,11 @@
import org.apache.nifi.dbcp.DBCPConnectionPool;
import org.apache.nifi.dbcp.utils.DBCPProperties;
-import org.apache.nifi.processors.standard.db.DatabaseAdapter;
-import org.apache.nifi.processors.standard.db.impl.PostgreSQLDatabaseAdapter;
import org.apache.nifi.reporting.InitializationException;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.Test;
import org.testcontainers.containers.PostgreSQLContainer;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertThrows;
-
public class QueryDatabaseTableRecordIT extends QueryDatabaseTableRecordTest {
private static PostgreSQLContainer> postgres;
@@ -48,8 +42,8 @@ public static void cleanUpAfterClass() {
}
@Override
- public DatabaseAdapter createDatabaseAdapter() {
- return new PostgreSQLDatabaseAdapter();
+ public String getDatabaseType() {
+ return "PostgreSQL";
}
@Override
@@ -62,13 +56,4 @@ public void createDbcpControllerService() throws InitializationException {
runner.setProperty(connectionPool, DBCPProperties.DB_DRIVERNAME, postgres.getDriverClassName());
runner.enableControllerService(connectionPool);
}
-
- @Test
- public void testAddedRowsAutoCommitTrue() {
- // this test in the base class is not valid for PostgreSQL so check the validation error message.
- final AssertionError assertionError = assertThrows(AssertionError.class, super::testAddedRowsAutoCommitTrue);
- assertEquals(assertionError.getMessage(), "Processor has 1 validation failures:\n" +
- "'Set Auto Commit' validated against 'true' is invalid because 'Set Auto Commit' " +
- "must be set to 'false' because 'PostgreSQL' Database Type requires it to be 'false'\n");
- }
}
diff --git a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableRecordTest.java b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableRecordTest.java
index 7bf922727c87..5ca58b4a2d3c 100644
--- a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableRecordTest.java
+++ b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableRecordTest.java
@@ -18,16 +18,9 @@
import org.apache.nifi.annotation.behavior.Stateful;
import org.apache.nifi.components.state.Scope;
-import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.dbcp.DBCPService;
import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processors.standard.db.DatabaseAdapter;
-import org.apache.nifi.processors.standard.db.impl.GenericDatabaseAdapter;
-import org.apache.nifi.processors.standard.db.impl.MSSQLDatabaseAdapter;
-import org.apache.nifi.processors.standard.db.impl.MySQLDatabaseAdapter;
-import org.apache.nifi.processors.standard.db.impl.OracleDatabaseAdapter;
-import org.apache.nifi.processors.standard.db.impl.PhoenixDatabaseAdapter;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.serialization.record.MockRecordWriter;
import org.apache.nifi.util.MockFlowFile;
@@ -47,17 +40,13 @@
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
-import java.sql.Types;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
-import java.util.Arrays;
-import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -72,8 +61,6 @@ public class QueryDatabaseTableRecordTest {
MockQueryDatabaseTableRecord processor;
protected TestRunner runner;
private final static String DB_LOCATION = "target/db_qdt";
- private DatabaseAdapter dbAdapter;
- private HashMap origDbAdapters;
private final static String TABLE_NAME_KEY = "tableName";
private final static String MAX_ROWS_KEY = "maxRows";
@@ -108,8 +95,8 @@ public static void cleanUpAfterClass() {
System.clearProperty("derby.stream.error.file");
}
- public DatabaseAdapter createDatabaseAdapter() {
- return new GenericDatabaseAdapter();
+ public String getDatabaseType() {
+ return "Generic";
}
public void createDbcpControllerService() throws InitializationException {
@@ -121,14 +108,11 @@ public void createDbcpControllerService() throws InitializationException {
@BeforeEach
public void setup() throws InitializationException, IOException {
- origDbAdapters = new HashMap<>(QueryDatabaseTableRecord.dbAdapters);
- dbAdapter = createDatabaseAdapter();
- QueryDatabaseTableRecord.dbAdapters.put(dbAdapter.getName(), dbAdapter);
processor = new MockQueryDatabaseTableRecord();
runner = TestRunners.newTestRunner(processor);
createDbcpControllerService();
runner.setProperty(QueryDatabaseTableRecord.DBCP_SERVICE, "dbcp");
- runner.setProperty(QueryDatabaseTableRecord.DB_TYPE, dbAdapter.getName());
+ runner.setProperty(QueryDatabaseTableRecord.DB_TYPE, getDatabaseType());
runner.getStateManager().clear(Scope.CLUSTER);
MockRecordWriter recordWriter = new MockRecordWriter(null, true, -1);
runner.addControllerService("writer", recordWriter);
@@ -142,101 +126,6 @@ public void setup() throws InitializationException, IOException {
public void teardown() throws IOException {
runner.getStateManager().clear(Scope.CLUSTER);
runner = null;
- QueryDatabaseTableRecord.dbAdapters.clear();
- QueryDatabaseTableRecord.dbAdapters.putAll(origDbAdapters);
- }
-
- @Test
- public void testGetQuery() throws Exception {
- String query = processor.getQuery(dbAdapter, "myTable", null, null, null, null);
- assertEquals("SELECT * FROM myTable", query);
- query = processor.getQuery(dbAdapter, "myTable", "col1,col2", null, null, null);
- assertEquals("SELECT col1,col2 FROM myTable", query);
-
- query = processor.getQuery(dbAdapter, "myTable", null, Collections.singletonList("id"), null, null);
- assertEquals("SELECT * FROM myTable", query);
-
- Map maxValues = new HashMap<>();
- maxValues.put("id", "509");
- StateManager stateManager = runner.getStateManager();
- stateManager.setState(maxValues, Scope.CLUSTER);
- processor.putColumnType(AbstractDatabaseFetchProcessor.getStateKey("mytable", "id", dbAdapter), Types.INTEGER);
- query = processor.getQuery(dbAdapter, "myTable", null, Collections.singletonList("id"), null, stateManager.getState(Scope.CLUSTER).toMap());
- assertEquals("SELECT * FROM myTable WHERE id > 509", query);
-
- maxValues.put("date_created", "2016-03-07 12:34:56");
- stateManager.setState(maxValues, Scope.CLUSTER);
- processor.putColumnType(AbstractDatabaseFetchProcessor.getStateKey("mytable", "date_created", dbAdapter), Types.TIMESTAMP);
- query = processor.getQuery(dbAdapter, "myTable", null, Arrays.asList("id", "DATE_CREATED"), null, stateManager.getState(Scope.CLUSTER).toMap());
- assertEquals("SELECT * FROM myTable WHERE id > 509 AND DATE_CREATED >= '2016-03-07 12:34:56'", query);
-
- // Double quotes can be used to escape column and table names with most ANSI compatible database engines.
- maxValues.put("mytable@!@date-created", "2016-03-07 12:34:56");
- stateManager.setState(maxValues, Scope.CLUSTER);
- processor.putColumnType(AbstractDatabaseFetchProcessor.getStateKey("\"myTable\"", "\"DATE-CREATED\"", dbAdapter), Types.TIMESTAMP);
- query = processor.getQuery(dbAdapter, "\"myTable\"", null, Arrays.asList("id", "\"DATE-CREATED\""), null, stateManager.getState(Scope.CLUSTER).toMap());
- assertEquals("SELECT * FROM \"myTable\" WHERE id > 509 AND \"DATE-CREATED\" >= '2016-03-07 12:34:56'", query);
-
- // Back-ticks can be used to escape MySQL column and table names.
- dbAdapter = new MySQLDatabaseAdapter();
- processor.putColumnType(AbstractDatabaseFetchProcessor.getStateKey("`myTable`", "`DATE-CREATED`", dbAdapter), Types.TIMESTAMP);
- query = processor.getQuery(dbAdapter, "`myTable`", null, Arrays.asList("id", "`DATE-CREATED`"), null, stateManager.getState(Scope.CLUSTER).toMap());
- assertEquals("SELECT * FROM `myTable` WHERE id > 509 AND `DATE-CREATED` >= '2016-03-07 12:34:56'", query);
-
- // Square brackets can be used to escape Microsoft SQL Server column and table names.
- dbAdapter = new MSSQLDatabaseAdapter();
- processor.putColumnType(AbstractDatabaseFetchProcessor.getStateKey("[myTable]", "[DATE-CREATED]", dbAdapter), Types.TIMESTAMP);
- query = processor.getQuery(dbAdapter, "[myTable]", null, Arrays.asList("id", "[DATE-CREATED]"), null, stateManager.getState(Scope.CLUSTER).toMap());
- assertEquals("SELECT * FROM [myTable] WHERE id > 509 AND [DATE-CREATED] >= '2016-03-07 12:34:56'", query);
-
- // Test Oracle strategy
- dbAdapter = new OracleDatabaseAdapter();
- query = processor.getQuery(dbAdapter, "myTable", null, Arrays.asList("id", "DATE_CREATED"), "type = \"CUSTOMER\"", stateManager.getState(Scope.CLUSTER).toMap());
- assertEquals("SELECT * FROM myTable WHERE id > 509 AND DATE_CREATED >= timestamp '2016-03-07 12:34:56' AND (type = \"CUSTOMER\")", query);
-
- // Test time.
- processor.putColumnType("mytable" + AbstractDatabaseFetchProcessor.NAMESPACE_DELIMITER + "time_created", Types.TIME);
- maxValues.clear();
- maxValues.put("id", "509");
- maxValues.put("time_created", "12:34:57");
- maxValues.put("date_created", "2016-03-07 12:34:56");
- stateManager = runner.getStateManager();
- stateManager.clear(Scope.CLUSTER);
- stateManager.setState(maxValues, Scope.CLUSTER);
- query = processor.getQuery(dbAdapter, "myTable", null, Arrays.asList("id", "DATE_CREATED", "TIME_CREATED"), "type = \"CUSTOMER\"", stateManager.getState(Scope.CLUSTER).toMap());
- assertEquals("SELECT * FROM myTable WHERE id > 509 AND DATE_CREATED >= timestamp '2016-03-07 12:34:56' AND TIME_CREATED >= timestamp '12:34:57' AND (type = \"CUSTOMER\")", query);
- dbAdapter = new GenericDatabaseAdapter();
- query = processor.getQuery(dbAdapter, "myTable", null, Arrays.asList("id", "DATE_CREATED", "TIME_CREATED"), "type = \"CUSTOMER\"", stateManager.getState(Scope.CLUSTER).toMap());
- assertEquals("SELECT * FROM myTable WHERE id > 509 AND DATE_CREATED >= '2016-03-07 12:34:56' AND TIME_CREATED >= '12:34:57' AND (type = \"CUSTOMER\")", query);
- }
-
- @Test
- public void testGetQueryUsingPhoenixAdapter() throws Exception {
- Map maxValues = new HashMap<>();
- StateManager stateManager = runner.getStateManager();
- processor.putColumnType("mytable" + AbstractDatabaseFetchProcessor.NAMESPACE_DELIMITER + "id", Types.INTEGER);
- processor.putColumnType("mytable" + AbstractDatabaseFetchProcessor.NAMESPACE_DELIMITER + "time_created", Types.TIME);
- processor.putColumnType("mytable" + AbstractDatabaseFetchProcessor.NAMESPACE_DELIMITER + "date_created", Types.TIMESTAMP);
-
- maxValues.put("id", "509");
- maxValues.put("time_created", "12:34:57");
- maxValues.put("date_created", "2016-03-07 12:34:56");
- stateManager.setState(maxValues, Scope.CLUSTER);
-
- dbAdapter = new PhoenixDatabaseAdapter();
- String query = processor.getQuery(dbAdapter, "myTable", null, Arrays.asList("id", "DATE_CREATED", "TIME_CREATED"), "type = \"CUSTOMER\"", stateManager.getState(Scope.CLUSTER).toMap());
- assertEquals("SELECT * FROM myTable WHERE id > 509 AND DATE_CREATED >= timestamp '2016-03-07 12:34:56' AND TIME_CREATED >= time '12:34:57' AND (type = \"CUSTOMER\")", query);
- // Cover the other path
- dbAdapter = new GenericDatabaseAdapter();
- query = processor.getQuery(dbAdapter, "myTable", null, Arrays.asList("id", "DATE_CREATED", "TIME_CREATED"), "type = \"CUSTOMER\"", stateManager.getState(Scope.CLUSTER).toMap());
- assertEquals("SELECT * FROM myTable WHERE id > 509 AND DATE_CREATED >= '2016-03-07 12:34:56' AND TIME_CREATED >= '12:34:57' AND (type = \"CUSTOMER\")", query);
- }
-
- @Test
- public void testGetQueryNoTable() {
- assertThrows(IllegalArgumentException.class, () -> {
- processor.getQuery(dbAdapter, null, null, null, null, null);
- });
}
@Test
@@ -663,38 +552,6 @@ public void testWithNullIntColumn() throws SQLException {
runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(0).assertAttributeEquals(QueryDatabaseTableRecord.RESULT_ROW_COUNT, "2");
}
- @Test
- public void testWithRuntimeException() throws SQLException {
- // load test data to database
- final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
- Statement stmt = con.createStatement();
-
- try {
- stmt.execute("drop table TEST_NULL_INT");
- } catch (final SQLException sqle) {
- // Ignore, usually due to Derby not having DROP TABLE IF EXISTS
- }
-
- stmt.execute("create table TEST_NULL_INT (id integer not null, val1 integer, val2 integer, constraint my_pk primary key (id))");
-
- stmt.execute("insert into TEST_NULL_INT (id, val1, val2) VALUES (0, NULL, 1)");
- stmt.execute("insert into TEST_NULL_INT (id, val1, val2) VALUES (1, 1, 1)");
-
- runner.setIncomingConnection(false);
- runner.setProperty(QueryDatabaseTableRecord.TABLE_NAME, "TEST_NULL_INT");
- runner.setProperty(AbstractDatabaseFetchProcessor.MAX_VALUE_COLUMN_NAMES, "id");
-
- QueryDatabaseTableRecord.dbAdapters.put(dbAdapter.getName(), new GenericDatabaseAdapter() {
- @Override
- public String getName() {
- throw new RuntimeException("test");
- }
- });
- runner.run();
-
- assertTrue(runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).isEmpty());
- }
-
@Test
public void testWithSqlException() throws SQLException {
// load test data to database
@@ -1429,52 +1286,6 @@ public void testMissingColumn() throws ProcessException, SQLException {
});
}
- @Test
- public void testWithExceptionAfterSomeRowsProcessed() throws SQLException {
- // load test data to database
- final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
- Statement stmt = con.createStatement();
-
- try {
- stmt.execute("drop table TEST_NULL_INT");
- } catch (final SQLException sqle) {
- // Ignore, usually due to Derby not having DROP TABLE IF EXISTS
- }
-
- stmt.execute("create table TEST_NULL_INT (id integer not null, val1 integer, val2 integer, constraint my_pk primary key (id))");
-
- stmt.execute("insert into TEST_NULL_INT (id, val1, val2) VALUES (1, NULL, 1)");
- stmt.execute("insert into TEST_NULL_INT (id, val1, val2) VALUES (2, 1, 1)");
-
- runner.setIncomingConnection(false);
- runner.setProperty(QueryDatabaseTableRecord.TABLE_NAME, "TEST_NULL_INT");
- runner.setProperty(AbstractDatabaseFetchProcessor.MAX_VALUE_COLUMN_NAMES, "id");
-
- // Override adapter with one that fails after the first row is processed
- QueryDatabaseTableRecord.dbAdapters.put(dbAdapter.getName(), new GenericDatabaseAdapter() {
- boolean fail = false;
-
- @Override
- public String getName() {
- if (!fail) {
- fail = true;
- return super.getName();
- }
- throw new RuntimeException("test");
- }
- });
- runner.run();
- assertTrue(runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).isEmpty());
- // State should not have been updated
- runner.getStateManager().assertStateNotSet("test_null_int@!@id", Scope.CLUSTER);
-
- // Restore original (working) adapter and run again
- QueryDatabaseTableRecord.dbAdapters.put(dbAdapter.getName(), dbAdapter);
- runner.run();
- assertFalse(runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).isEmpty());
- runner.getStateManager().assertStateEquals("test_null_int@!@id", "2", Scope.CLUSTER);
- }
-
/**
* Simple implementation only for QueryDatabaseTableRecord processor testing.
*/
diff --git a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
index e5e7daa27cda..ae3fb59853d5 100644
--- a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
+++ b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
@@ -17,22 +17,14 @@
package org.apache.nifi.processors.standard;
import org.apache.avro.file.DataFileStream;
-import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
import org.apache.nifi.annotation.behavior.Stateful;
import org.apache.nifi.components.state.Scope;
-import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.dbcp.DBCPService;
import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processors.standard.db.DatabaseAdapter;
-import org.apache.nifi.processors.standard.db.impl.GenericDatabaseAdapter;
-import org.apache.nifi.processors.standard.db.impl.MSSQLDatabaseAdapter;
-import org.apache.nifi.processors.standard.db.impl.MySQLDatabaseAdapter;
-import org.apache.nifi.processors.standard.db.impl.OracleDatabaseAdapter;
-import org.apache.nifi.processors.standard.db.impl.PhoenixDatabaseAdapter;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
@@ -52,17 +44,13 @@
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
-import java.sql.Types;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
-import java.util.Arrays;
-import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -75,8 +63,6 @@ public class QueryDatabaseTableTest {
MockQueryDatabaseTable processor;
protected TestRunner runner;
private final static String DB_LOCATION = "target/db_qdt";
- private DatabaseAdapter dbAdapter;
- private HashMap origDbAdapters;
private final static String TABLE_NAME_KEY = "tableName";
private final static String MAX_ROWS_KEY = "maxRows";
@@ -112,10 +98,6 @@ public static void cleanUpAfterClass() {
System.clearProperty("derby.stream.error.file");
}
- public DatabaseAdapter createDatabaseAdapter() {
- return new GenericDatabaseAdapter();
- }
-
public void createDbcpControllerService() throws InitializationException {
final DBCPService dbcp = new DBCPServiceSimpleImpl();
final Map dbcpProperties = new HashMap<>();
@@ -125,120 +107,26 @@ public void createDbcpControllerService() throws InitializationException {
@BeforeEach
public void setup() throws InitializationException, IOException {
- origDbAdapters = new HashMap<>(QueryDatabaseTable.dbAdapters);
- dbAdapter = createDatabaseAdapter();
- QueryDatabaseTable.dbAdapters.put(dbAdapter.getName(), dbAdapter);
processor = new MockQueryDatabaseTable();
runner = TestRunners.newTestRunner(processor);
createDbcpControllerService();
runner.setProperty(QueryDatabaseTable.DBCP_SERVICE, "dbcp");
- runner.setProperty(QueryDatabaseTable.DB_TYPE, dbAdapter.getName());
+ runner.setProperty(QueryDatabaseTable.DB_TYPE, getDatabaseType());
runner.getStateManager().clear(Scope.CLUSTER);
}
+ public String getDatabaseType() {
+ return "Generic";
+ }
+
@AfterEach
public void teardown() throws IOException {
runner.getStateManager().clear(Scope.CLUSTER);
runner = null;
- QueryDatabaseTable.dbAdapters.clear();
- QueryDatabaseTable.dbAdapters.putAll(origDbAdapters);
- }
-
- @Test
- public void testGetQuery() throws Exception {
- String query = processor.getQuery(dbAdapter, "myTable", null, null, null, null);
- assertEquals("SELECT * FROM myTable", query);
- query = processor.getQuery(dbAdapter, "myTable", "col1,col2", null, null, null);
- assertEquals("SELECT col1,col2 FROM myTable", query);
-
- query = processor.getQuery(dbAdapter, "myTable", null, Collections.singletonList("id"), null, null);
- assertEquals("SELECT * FROM myTable", query);
-
- Map maxValues = new HashMap<>();
- maxValues.put("id", "509");
- StateManager stateManager = runner.getStateManager();
- stateManager.setState(maxValues, Scope.CLUSTER);
- processor.putColumnType(processor.getStateKey("mytable", "id", dbAdapter), Types.INTEGER);
- query = processor.getQuery(dbAdapter, "myTable", null, Collections.singletonList("id"), null, stateManager.getState(Scope.CLUSTER).toMap());
- assertEquals("SELECT * FROM myTable WHERE id > 509", query);
-
- maxValues.put("date_created", "2016-03-07 12:34:56");
- stateManager.setState(maxValues, Scope.CLUSTER);
- processor.putColumnType(processor.getStateKey("mytable", "date_created", dbAdapter), Types.TIMESTAMP);
- query = processor.getQuery(dbAdapter, "myTable", null, Arrays.asList("id", "DATE_CREATED"), null, stateManager.getState(Scope.CLUSTER).toMap());
- assertEquals("SELECT * FROM myTable WHERE id > 509 AND DATE_CREATED >= '2016-03-07 12:34:56'", query);
-
- // Double quotes can be used to escape column and table names with most ANSI compatible database engines.
- maxValues.put("mytable@!@date-created", "2016-03-07 12:34:56");
- stateManager.setState(maxValues, Scope.CLUSTER);
- processor.putColumnType(processor.getStateKey("\"myTable\"", "\"DATE-CREATED\"", dbAdapter), Types.TIMESTAMP);
- query = processor.getQuery(dbAdapter, "\"myTable\"", null, Arrays.asList("id", "\"DATE-CREATED\""), null, stateManager.getState(Scope.CLUSTER).toMap());
- assertEquals("SELECT * FROM \"myTable\" WHERE id > 509 AND \"DATE-CREATED\" >= '2016-03-07 12:34:56'", query);
-
- // Back-ticks can be used to escape MySQL column and table names.
- dbAdapter = new MySQLDatabaseAdapter();
- processor.putColumnType(processor.getStateKey("`myTable`", "`DATE-CREATED`", dbAdapter), Types.TIMESTAMP);
- query = processor.getQuery(dbAdapter, "`myTable`", null, Arrays.asList("id", "`DATE-CREATED`"), null, stateManager.getState(Scope.CLUSTER).toMap());
- assertEquals("SELECT * FROM `myTable` WHERE id > 509 AND `DATE-CREATED` >= '2016-03-07 12:34:56'", query);
-
- // Square brackets can be used to escape Microsoft SQL Server column and table names.
- dbAdapter = new MSSQLDatabaseAdapter();
- processor.putColumnType(processor.getStateKey("[myTable]", "[DATE-CREATED]", dbAdapter), Types.TIMESTAMP);
- query = processor.getQuery(dbAdapter, "[myTable]", null, Arrays.asList("id", "[DATE-CREATED]"), null, stateManager.getState(Scope.CLUSTER).toMap());
- assertEquals("SELECT * FROM [myTable] WHERE id > 509 AND [DATE-CREATED] >= '2016-03-07 12:34:56'", query);
-
- // Test Oracle strategy
- dbAdapter = new OracleDatabaseAdapter();
- query = processor.getQuery(dbAdapter, "myTable", null, Arrays.asList("id", "DATE_CREATED"), "type = \"CUSTOMER\"", stateManager.getState(Scope.CLUSTER).toMap());
- assertEquals("SELECT * FROM myTable WHERE id > 509 AND DATE_CREATED >= timestamp '2016-03-07 12:34:56' AND (type = \"CUSTOMER\")", query);
-
- // Test time.
- processor.putColumnType("mytable" + AbstractDatabaseFetchProcessor.NAMESPACE_DELIMITER + "time_created", Types.TIME);
- maxValues.clear();
- maxValues.put("id", "509");
- maxValues.put("time_created", "12:34:57");
- maxValues.put("date_created", "2016-03-07 12:34:56");
- stateManager = runner.getStateManager();
- stateManager.clear(Scope.CLUSTER);
- stateManager.setState(maxValues, Scope.CLUSTER);
- query = processor.getQuery(dbAdapter, "myTable", null, Arrays.asList("id", "DATE_CREATED", "TIME_CREATED"), "type = \"CUSTOMER\"", stateManager.getState(Scope.CLUSTER).toMap());
- assertEquals("SELECT * FROM myTable WHERE id > 509 AND DATE_CREATED >= timestamp '2016-03-07 12:34:56' AND TIME_CREATED >= timestamp '12:34:57' AND (type = \"CUSTOMER\")", query);
- dbAdapter = new GenericDatabaseAdapter();
- query = processor.getQuery(dbAdapter, "myTable", null, Arrays.asList("id", "DATE_CREATED", "TIME_CREATED"), "type = \"CUSTOMER\"", stateManager.getState(Scope.CLUSTER).toMap());
- assertEquals("SELECT * FROM myTable WHERE id > 509 AND DATE_CREATED >= '2016-03-07 12:34:56' AND TIME_CREATED >= '12:34:57' AND (type = \"CUSTOMER\")", query);
- }
-
- @Test
- public void testGetQueryUsingPhoenixAdapter() throws Exception {
- Map maxValues = new HashMap<>();
- StateManager stateManager = runner.getStateManager();
- processor.putColumnType("mytable" + AbstractDatabaseFetchProcessor.NAMESPACE_DELIMITER + "id", Types.INTEGER);
- processor.putColumnType("mytable" + AbstractDatabaseFetchProcessor.NAMESPACE_DELIMITER + "time_created", Types.TIME);
- processor.putColumnType("mytable" + AbstractDatabaseFetchProcessor.NAMESPACE_DELIMITER + "date_created", Types.TIMESTAMP);
-
- maxValues.put("id", "509");
- maxValues.put("time_created", "12:34:57");
- maxValues.put("date_created", "2016-03-07 12:34:56");
- stateManager.setState(maxValues, Scope.CLUSTER);
-
- dbAdapter = new PhoenixDatabaseAdapter();
- String query = processor.getQuery(dbAdapter, "myTable", null, Arrays.asList("id", "DATE_CREATED", "TIME_CREATED"), "type = \"CUSTOMER\"", stateManager.getState(Scope.CLUSTER).toMap());
- assertEquals("SELECT * FROM myTable WHERE id > 509 AND DATE_CREATED >= timestamp '2016-03-07 12:34:56' AND TIME_CREATED >= time '12:34:57' AND (type = \"CUSTOMER\")", query);
- // Cover the other path
- dbAdapter = new GenericDatabaseAdapter();
- query = processor.getQuery(dbAdapter, "myTable", null, Arrays.asList("id", "DATE_CREATED", "TIME_CREATED"), "type = \"CUSTOMER\"", stateManager.getState(Scope.CLUSTER).toMap());
- assertEquals("SELECT * FROM myTable WHERE id > 509 AND DATE_CREATED >= '2016-03-07 12:34:56' AND TIME_CREATED >= '12:34:57' AND (type = \"CUSTOMER\")", query);
- }
-
- @Test
- public void testGetQueryNoTable() {
- assertThrows(IllegalArgumentException.class, () -> {
- processor.getQuery(dbAdapter, null, null, null, null, null);
- });
}
@Test
- public void testAddedRows() throws ClassNotFoundException, SQLException, InitializationException, IOException {
+ public void testAddedRows() throws SQLException, IOException {
// load test data to database
final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
@@ -675,38 +563,6 @@ public void testWithNullIntColumn() throws SQLException {
runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0).assertAttributeEquals(QueryDatabaseTable.RESULT_ROW_COUNT, "2");
}
- @Test
- public void testWithRuntimeException() throws SQLException {
- // load test data to database
- final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
- Statement stmt = con.createStatement();
-
- try {
- stmt.execute("drop table TEST_NULL_INT");
- } catch (final SQLException sqle) {
- // Ignore, usually due to Derby not having DROP TABLE IF EXISTS
- }
-
- stmt.execute("create table TEST_NULL_INT (id integer not null, val1 integer, val2 integer, constraint my_pk primary key (id))");
-
- stmt.execute("insert into TEST_NULL_INT (id, val1, val2) VALUES (0, NULL, 1)");
- stmt.execute("insert into TEST_NULL_INT (id, val1, val2) VALUES (1, 1, 1)");
-
- runner.setIncomingConnection(false);
- runner.setProperty(QueryDatabaseTable.TABLE_NAME, "TEST_NULL_INT");
- runner.setProperty(AbstractDatabaseFetchProcessor.MAX_VALUE_COLUMN_NAMES, "id");
-
- QueryDatabaseTable.dbAdapters.put(dbAdapter.getName(), new GenericDatabaseAdapter() {
- @Override
- public String getName() {
- throw new DataFileWriter.AppendWriteException(null);
- }
- });
- runner.run();
-
- assertTrue(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).isEmpty());
- }
-
@Test
public void testWithSqlException() throws SQLException {
// load test data to database
@@ -1462,51 +1318,6 @@ public void testMissingColumn() throws ProcessException, ClassNotFoundException,
});
}
- @Test
- public void testWithExceptionAfterSomeRowsProcessed() throws SQLException {
- // load test data to database
- final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
- Statement stmt = con.createStatement();
-
- try {
- stmt.execute("drop table TEST_NULL_INT");
- } catch (final SQLException sqle) {
- // Ignore, usually due to Derby not having DROP TABLE IF EXISTS
- }
-
- stmt.execute("create table TEST_NULL_INT (id integer not null, val1 integer, val2 integer, constraint my_pk primary key (id))");
-
- stmt.execute("insert into TEST_NULL_INT (id, val1, val2) VALUES (1, NULL, 1)");
- stmt.execute("insert into TEST_NULL_INT (id, val1, val2) VALUES (2, 1, 1)");
-
- runner.setIncomingConnection(false);
- runner.setProperty(QueryDatabaseTable.TABLE_NAME, "TEST_NULL_INT");
- runner.setProperty(AbstractDatabaseFetchProcessor.MAX_VALUE_COLUMN_NAMES, "id");
-
- // Override adapter with one that fails after the first row is processed
- QueryDatabaseTable.dbAdapters.put(dbAdapter.getName(), new GenericDatabaseAdapter() {
- boolean fail = false;
- @Override
- public String getName() {
- if (!fail) {
- fail = true;
- return super.getName();
- }
- throw new DataFileWriter.AppendWriteException(null);
- }
- });
- runner.run();
- assertTrue(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).isEmpty());
- // State should not have been updated
- runner.getStateManager().assertStateNotSet("test_null_int@!@id", Scope.CLUSTER);
-
- // Restore original (working) adapter and run again
- QueryDatabaseTable.dbAdapters.put(dbAdapter.getName(), dbAdapter);
- runner.run();
- assertFalse(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).isEmpty());
- runner.getStateManager().assertStateEquals("test_null_int@!@id", "2", Scope.CLUSTER);
- }
-
private long getNumberOfRecordsFromStream(InputStream in) throws IOException {
final DatumReader datumReader = new GenericDatumReader<>();
try (DataFileStream dataFileReader = new DataFileStream<>(in, datumReader)) {
diff --git a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java
index f30fdba60a8f..4722745fc69e 100644
--- a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java
+++ b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java
@@ -22,7 +22,6 @@
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.dbcp.DBCPService;
import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processors.standard.db.impl.DerbyDatabaseAdapter;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.MockProcessSession;
import org.apache.nifi.util.MockSessionFactory;
@@ -119,7 +118,7 @@ public void setUp() throws Exception {
runner.addControllerService("dbcp", dbcp, dbcpProperties);
runner.enableControllerService(dbcp);
runner.setProperty(GenerateTableFetch.DBCP_SERVICE, "dbcp");
- runner.setProperty(DB_TYPE, new DerbyDatabaseAdapter().getName());
+ runner.setProperty(DB_TYPE, "Derby");
}
@Test
diff --git a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/db/impl/TestMSSQL2008DatabaseAdapter.java b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/db/impl/TestMSSQL2008DatabaseAdapter.java
index 5ed921b06bfe..33c5f4af558f 100644
--- a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/db/impl/TestMSSQL2008DatabaseAdapter.java
+++ b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/db/impl/TestMSSQL2008DatabaseAdapter.java
@@ -16,14 +16,13 @@
*/
package org.apache.nifi.processors.standard.db.impl;
-import org.apache.nifi.processors.standard.db.DatabaseAdapter;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
public class TestMSSQL2008DatabaseAdapter {
- private final DatabaseAdapter db = new MSSQL2008DatabaseAdapter();
+ private final MSSQL2008DatabaseAdapter db = new MSSQL2008DatabaseAdapter();
@Test
public void testGeneration() {
diff --git a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/db/impl/TestMSSQLDatabaseAdapter.java b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/db/impl/TestMSSQLDatabaseAdapter.java
index dea494e937b5..e492b00b10a0 100644
--- a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/db/impl/TestMSSQLDatabaseAdapter.java
+++ b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/db/impl/TestMSSQLDatabaseAdapter.java
@@ -16,14 +16,13 @@
*/
package org.apache.nifi.processors.standard.db.impl;
-import org.apache.nifi.processors.standard.db.DatabaseAdapter;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
public class TestMSSQLDatabaseAdapter {
- final DatabaseAdapter db = new MSSQLDatabaseAdapter();
+ final MSSQLDatabaseAdapter db = new MSSQLDatabaseAdapter();
@Test
public void testGeneration() {
diff --git a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/db/impl/TestOracle12DatabaseAdapter.java b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/db/impl/TestOracle12DatabaseAdapter.java
index 1409f253a085..0f24374f989f 100644
--- a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/db/impl/TestOracle12DatabaseAdapter.java
+++ b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/db/impl/TestOracle12DatabaseAdapter.java
@@ -25,7 +25,6 @@
import org.apache.nifi.processors.standard.db.ColumnDescription;
import org.apache.nifi.processors.standard.db.NameNormalizer;
import org.apache.nifi.processors.standard.db.NameNormalizerFactory;
-import org.apache.nifi.processors.standard.db.DatabaseAdapter;
import org.apache.nifi.processors.standard.db.TableSchema;
import org.apache.nifi.processors.standard.db.TranslationStrategy;
import org.junit.jupiter.api.Test;
@@ -36,7 +35,7 @@
public class TestOracle12DatabaseAdapter {
- private final DatabaseAdapter db = new Oracle12DatabaseAdapter();
+ private final Oracle12DatabaseAdapter db = new Oracle12DatabaseAdapter();
@Test
public void testGeneration() {
diff --git a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/db/impl/TestOracleDatabaseAdapter.java b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/db/impl/TestOracleDatabaseAdapter.java
index 99b8e3e3bd8a..a2e8dd13d259 100644
--- a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/db/impl/TestOracleDatabaseAdapter.java
+++ b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/db/impl/TestOracleDatabaseAdapter.java
@@ -19,7 +19,6 @@
import org.apache.nifi.processors.standard.db.ColumnDescription;
import org.apache.nifi.processors.standard.db.NameNormalizer;
import org.apache.nifi.processors.standard.db.NameNormalizerFactory;
-import org.apache.nifi.processors.standard.db.DatabaseAdapter;
import org.apache.nifi.processors.standard.db.TableSchema;
import org.apache.nifi.processors.standard.db.TranslationStrategy;
import org.junit.jupiter.api.Test;
@@ -35,7 +34,7 @@
public class TestOracleDatabaseAdapter {
- private final DatabaseAdapter db = new OracleDatabaseAdapter();
+ private final OracleDatabaseAdapter db = new OracleDatabaseAdapter();
@Test
public void testGeneration() {
diff --git a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/PutDatabaseRecordIT/create-person-table.sql b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/PutDatabaseRecordIT/create-person-table.sql
index eed14bb75ed7..162e15d925e3 100644
--- a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/PutDatabaseRecordIT/create-person-table.sql
+++ b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/PutDatabaseRecordIT/create-person-table.sql
@@ -1,5 +1,5 @@
CREATE TABLE person (
- name VARCHAR(255) NOT NULL,
+ name VARCHAR(255) NOT NULL PRIMARY KEY,
age INT,
favorite_color VARCHAR(255),
dob DATE,
diff --git a/nifi-extension-bundles/nifi-standard-services-api-bom/pom.xml b/nifi-extension-bundles/nifi-standard-services-api-bom/pom.xml
index 22c0ac2af8cb..d4e6690387ff 100644
--- a/nifi-extension-bundles/nifi-standard-services-api-bom/pom.xml
+++ b/nifi-extension-bundles/nifi-standard-services-api-bom/pom.xml
@@ -72,6 +72,12 @@
2.2.0-SNAPSHOT
provided