Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NIFI-14129 Add Database Dialect Service #9640

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions nifi-assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,12 @@ language governing permissions and limitations under the License. -->
<version>2.2.0-SNAPSHOT</version>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-database-dialect-service-nar</artifactId>
<version>2.2.0-SNAPSHOT</version>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mongodb-client-service-api-nar</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-dbcp-service-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-database-dialect-service-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-web-client-provider-api</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,18 @@
import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.database.dialect.service.api.ColumnDefinition;
import org.apache.nifi.database.dialect.service.api.StandardColumnDefinition;
import org.apache.nifi.database.dialect.service.api.DatabaseDialectService;
import org.apache.nifi.database.dialect.service.api.QueryStatementRequest;
import org.apache.nifi.database.dialect.service.api.StandardQueryStatementRequest;
import org.apache.nifi.database.dialect.service.api.StatementResponse;
import org.apache.nifi.database.dialect.service.api.StatementType;
import org.apache.nifi.database.dialect.service.api.TableDefinition;
import org.apache.nifi.dbcp.DBCPService;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.standard.db.DatabaseAdapter;
import org.apache.nifi.util.StringUtils;
import org.apache.nifi.processors.standard.db.DatabaseAdapterDescriptor;

import java.sql.Connection;
import java.sql.ResultSet;
Expand All @@ -38,37 +45,17 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.ServiceLoader;
import java.util.Optional;
import java.util.stream.Collectors;

@Tags({"database", "dbcp", "sql"})
@CapabilityDescription("Fetches parameters from database tables")

public class DatabaseParameterProvider extends AbstractParameterProvider implements VerifiableParameterProvider {

protected final static Map<String, DatabaseAdapter> dbAdapters = new HashMap<>();

public static final PropertyDescriptor DB_TYPE;

static {
// Load the DatabaseAdapters
ArrayList<AllowableValue> dbAdapterValues = new ArrayList<>();
ServiceLoader<DatabaseAdapter> dbAdapterLoader = ServiceLoader.load(DatabaseAdapter.class);
dbAdapterLoader.forEach(it -> {
dbAdapters.put(it.getName(), it);
dbAdapterValues.add(new AllowableValue(it.getName(), it.getName(), it.getDescription()));
});

DB_TYPE = new PropertyDescriptor.Builder()
.name("db-type")
.displayName("Database Type")
.description("The type/flavor of database, used for generating database-specific code. In many cases the Generic type "
+ "should suffice, but some databases (such as Oracle) require custom SQL clauses. ")
.allowableValues(dbAdapterValues.toArray(new AllowableValue[dbAdapterValues.size()]))
.defaultValue("Generic")
.required(true)
.build();
}
public static final PropertyDescriptor DB_TYPE = DatabaseAdapterDescriptor.getDatabaseTypeDescriptor("db-type");

public static final PropertyDescriptor DATABASE_DIALECT_SERVICE = DatabaseAdapterDescriptor.getDatabaseDialectServiceDescriptor(DB_TYPE);

static AllowableValue GROUPING_BY_COLUMN = new AllowableValue("grouping-by-column", "Column",
"A single table is partitioned by the 'Parameter Group Name Column'. All rows with the same value in this column will " +
Expand Down Expand Up @@ -149,6 +136,7 @@ public class DatabaseParameterProvider extends AbstractParameterProvider impleme
protected void init(final ParameterProviderInitializationContext config) {
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(DB_TYPE);
properties.add(DATABASE_DIALECT_SERVICE);
properties.add(DBCP_SERVICE);
properties.add(PARAMETER_GROUPING_STRATEGY);
properties.add(TABLE_NAME);
Expand Down Expand Up @@ -178,7 +166,7 @@ public List<ParameterGroup> fetchParameters(final ConfigurationContext context)

final List<String> tableNames = groupByColumn
? Collections.singletonList(context.getProperty(TABLE_NAME).getValue())
: Arrays.stream(context.getProperty(TABLE_NAMES).getValue().split(",")).map(String::trim).collect(Collectors.toList());
: Arrays.stream(context.getProperty(TABLE_NAMES).getValue().split(",")).map(String::trim).toList();

final Map<String, List<Parameter>> parameterMap = new HashMap<>();
for (final String tableName : tableNames) {
Expand Down Expand Up @@ -233,8 +221,24 @@ private void validateValueNotNull(final String value, final String columnName) {
}

String getQuery(final ConfigurationContext context, final String tableName, final List<String> columns, final String whereClause) {
final DatabaseAdapter dbAdapter = dbAdapters.get(context.getProperty(DB_TYPE).getValue());
return dbAdapter.getSelectStatement(tableName, StringUtils.join(columns, ", "), whereClause, null, null, null);
final String databaseType = context.getProperty(DB_TYPE).getValue();
final DatabaseDialectService databaseDialectService = DatabaseAdapterDescriptor.getDatabaseDialectService(context, DATABASE_DIALECT_SERVICE, databaseType);

final List<ColumnDefinition> columnDefinitions = columns.stream()
.map(StandardColumnDefinition::new)
.map(ColumnDefinition.class::cast)
.toList();
final TableDefinition tableDefinition = new TableDefinition(Optional.empty(), Optional.empty(), tableName, columnDefinitions);
final QueryStatementRequest queryStatementRequest = new StandardQueryStatementRequest(
StatementType.SELECT,
tableDefinition,
Optional.empty(),
Optional.ofNullable(whereClause),
Optional.empty(),
Optional.empty()
);
final StatementResponse statementResponse = databaseDialectService.getStatement(queryStatementRequest);
Copy link
Contributor

@tpalfy tpalfy Jan 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the optionalSQL WHERE clause property is not set (null), the generated select statement will have a "WHERE null" clause, resulting in an exception (at least it did when testing on Oracle).

return statementResponse.sql();
}

@Override
Expand All @@ -243,8 +247,8 @@ public List<ConfigVerificationResult> verify(final ConfigurationContext context,
try {
final List<ParameterGroup> parameterGroups = fetchParameters(context);
final long parameterCount = parameterGroups.stream()
.flatMap(group -> group.getParameters().stream())
.count();
.mapToLong(group -> group.getParameters().size())
.sum();
results.add(new ConfigVerificationResult.Builder()
.outcome(ConfigVerificationResult.Outcome.SUCCESSFUL)
.verificationStepName("Fetch Parameters")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,10 @@
<artifactId>nifi-dbcp-service-api</artifactId>
<version>2.2.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-database-dialect-service-api</artifactId>
</dependency>
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,18 @@
*/
package org.apache.nifi.processors.standard;

import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.database.dialect.service.api.ColumnDefinition;
import org.apache.nifi.database.dialect.service.api.StandardColumnDefinition;
import org.apache.nifi.database.dialect.service.api.DatabaseDialectService;
import org.apache.nifi.database.dialect.service.api.QueryStatementRequest;
import org.apache.nifi.database.dialect.service.api.StandardQueryStatementRequest;
import org.apache.nifi.database.dialect.service.api.StatementResponse;
import org.apache.nifi.database.dialect.service.api.StatementType;
import org.apache.nifi.database.dialect.service.api.TableDefinition;
import org.apache.nifi.dbcp.DBCPService;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
Expand All @@ -29,7 +37,7 @@
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.standard.db.DatabaseAdapter;
import org.apache.nifi.processors.standard.db.DatabaseAdapterDescriptor;
import org.apache.nifi.processors.standard.db.impl.PhoenixDatabaseAdapter;
import org.apache.nifi.util.StringUtils;

Expand All @@ -50,13 +58,14 @@
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeParseException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.ServiceLoader;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;

Expand Down Expand Up @@ -183,9 +192,9 @@ public abstract class AbstractDatabaseFetchProcessor extends AbstractSessionFact
// The delimiter to use when referencing qualified names (such as table@!@column in the state map)
protected static final String NAMESPACE_DELIMITER = "@!@";

public static final PropertyDescriptor DB_TYPE;
public static final PropertyDescriptor DB_TYPE = DatabaseAdapterDescriptor.getDatabaseTypeDescriptor("db-fetch-db-type");
static final PropertyDescriptor DATABASE_DIALECT_SERVICE = DatabaseAdapterDescriptor.getDatabaseDialectServiceDescriptor(DB_TYPE);

protected final static Map<String, DatabaseAdapter> dbAdapters = new HashMap<>();
protected final Map<String, Integer> columnTypeMap = new HashMap<>();

// This value is set when the processor is scheduled and indicates whether the Table Name property contains Expression Language.
Expand All @@ -204,29 +213,11 @@ public abstract class AbstractDatabaseFetchProcessor extends AbstractSessionFact

private static final DateTimeFormatter TIME_TYPE_FORMAT = DateTimeFormatter.ofPattern("HH:mm:ss.SSS");

private static final String ZERO_RESULT_WHERE_CLAUSE = "1 = 0";

// A Map (name to value) of initial maximum-value properties, filled at schedule-time and used at trigger-time
protected Map<String, String> maxValueProperties;

static {
// Load the DatabaseAdapters
ArrayList<AllowableValue> dbAdapterValues = new ArrayList<>();
ServiceLoader<DatabaseAdapter> dbAdapterLoader = ServiceLoader.load(DatabaseAdapter.class);
dbAdapterLoader.forEach(it -> {
dbAdapters.put(it.getName(), it);
dbAdapterValues.add(new AllowableValue(it.getName(), it.getName(), it.getDescription()));
});

DB_TYPE = new PropertyDescriptor.Builder()
.name("db-fetch-db-type")
.displayName("Database Type")
.description("The type/flavor of database, used for generating database-specific code. In many cases the Generic type "
+ "should suffice, but some databases (such as Oracle) require custom SQL clauses. ")
.allowableValues(dbAdapterValues.toArray(new AllowableValue[dbAdapterValues.size()]))
.defaultValue("Generic")
.required(true)
.build();
}

// A common validation procedure for DB fetch processors, it stores whether the Table Name and/or Max Value Column properties have expression language
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
// For backwards-compatibility, keep track of whether the table name and max-value column properties are dynamic (i.e. has expression language)
Expand All @@ -241,6 +232,8 @@ public void setup(final ProcessContext context) {
}

public void setup(final ProcessContext context, boolean shouldCleanCache, FlowFile flowFile) {
final DatabaseDialectService databaseDialectService = getDatabaseDialectService(context);

synchronized (setupComplete) {
setupComplete.set(false);
final String maxValueColumnNames = context.getProperty(MAX_VALUE_COLUMN_NAMES).evaluateAttributeExpressions(flowFile).getValue();
Expand All @@ -256,23 +249,15 @@ public void setup(final ProcessContext context, boolean shouldCleanCache, FlowFi
final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
final String sqlQuery = context.getProperty(SQL_QUERY).evaluateAttributeExpressions().getValue();

final DatabaseAdapter dbAdapter = dbAdapters.get(context.getProperty(DB_TYPE).getValue());
try (final Connection con = dbcpService.getConnection(flowFile == null ? Collections.emptyMap() : flowFile.getAttributes());
final Statement st = con.createStatement()) {

// Try a query that returns no rows, for the purposes of getting metadata about the columns. It is possible
// to use DatabaseMetaData.getColumns(), but not all drivers support this, notably the schema-on-read
// approach as in Apache Drill
String query;

if (StringUtils.isEmpty(sqlQuery)) {
query = dbAdapter.getSelectStatement(tableName, maxValueColumnNames, "1 = 0", null, null, null);
} else {
StringBuilder sbQuery = getWrappedQuery(dbAdapter, sqlQuery, tableName);
sbQuery.append(" WHERE 1=0");

query = sbQuery.toString();
}
final QueryStatementRequest statementRequest = getMaxValueStatementRequest(tableName, maxValueColumnNames, sqlQuery);
final StatementResponse statementResponse = databaseDialectService.getStatement(statementRequest);
final String query = statementResponse.sql();

ResultSet resultSet = st.executeQuery(query);
ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
Expand All @@ -286,13 +271,13 @@ public void setup(final ProcessContext context, boolean shouldCleanCache, FlowFi
final List<String> maxValueQualifiedColumnNameList = new ArrayList<>();

for (String maxValueColumn : maxValueColumnNameList) {
String colKey = getStateKey(tableName, maxValueColumn.trim(), dbAdapter);
String colKey = getStateKey(tableName, maxValueColumn.trim());
maxValueQualifiedColumnNameList.add(colKey);
}

for (int i = 1; i <= numCols; i++) {
String colName = resultSetMetaData.getColumnName(i).toLowerCase();
String colKey = getStateKey(tableName, colName, dbAdapter);
String colKey = getStateKey(tableName, colName);

//only include columns that are part of the maximum value tracking column list
if (!maxValueQualifiedColumnNameList.contains(colKey)) {
Expand All @@ -304,7 +289,7 @@ public void setup(final ProcessContext context, boolean shouldCleanCache, FlowFi
}

for (String maxValueColumn : maxValueColumnNameList) {
String colKey = getStateKey(tableName, maxValueColumn.trim().toLowerCase(), dbAdapter);
String colKey = getStateKey(tableName, maxValueColumn.trim().toLowerCase());
if (!columnTypeMap.containsKey(colKey)) {
throw new ProcessException("Column not found in the table/query specified: " + maxValueColumn);
}
Expand All @@ -320,15 +305,32 @@ public void setup(final ProcessContext context, boolean shouldCleanCache, FlowFi
}
}

protected static StringBuilder getWrappedQuery(DatabaseAdapter dbAdapter, String sqlQuery, String tableName) {
return new StringBuilder("SELECT * FROM (" + sqlQuery + ") " + dbAdapter.getTableAliasClause(tableName));
protected DatabaseDialectService getDatabaseDialectService(final PropertyContext context) {
final String databaseType = context.getProperty(DB_TYPE).getValue();
return DatabaseAdapterDescriptor.getDatabaseDialectService(context, DATABASE_DIALECT_SERVICE, databaseType);
}

private QueryStatementRequest getMaxValueStatementRequest(final String tableName, final String maxValueColumnNames, final String derivedTableQuery) {
final List<ColumnDefinition> maxValueColumns = Arrays.stream(maxValueColumnNames.split(","))
.map(StandardColumnDefinition::new)
.map(ColumnDefinition.class::cast)
.toList();

final TableDefinition tableDefinition = new TableDefinition(Optional.empty(), Optional.empty(), tableName, maxValueColumns);
return new StandardQueryStatementRequest(
StatementType.SELECT,
tableDefinition,
Optional.ofNullable(derivedTableQuery),
Optional.of(ZERO_RESULT_WHERE_CLAUSE),
Optional.empty(),
Optional.empty()
);
}

protected static String getMaxValueFromRow(ResultSet resultSet,
int columnIndex,
Integer type,
String maxValueString,
String databaseType)
String maxValueString)
throws ParseException, IOException, SQLException {

// Skip any columns we're not keeping track of or whose value is null
Expand Down Expand Up @@ -520,17 +522,18 @@ protected static String getLiteralByType(int type, String value, String database
* Construct a key string for a corresponding state value.
* @param prefix A prefix may contain database and table name, or just table name, this can be null
* @param columnName A column name
* @param adapter DatabaseAdapter is used to unwrap identifiers
* @return a state key string
*/
protected static String getStateKey(String prefix, String columnName, DatabaseAdapter adapter) {
protected static String getStateKey(String prefix, String columnName) {
StringBuilder sb = new StringBuilder();
if (prefix != null) {
sb.append(adapter.unwrapIdentifier(prefix.toLowerCase()));
final String prefixUnwrapped = prefix.toLowerCase().replaceAll("[\"`\\[\\]]", "");
sb.append(prefixUnwrapped);
sb.append(NAMESPACE_DELIMITER);
}
if (columnName != null) {
sb.append(adapter.unwrapIdentifier(columnName.toLowerCase()));
final String columnNameUnwrapped = columnName.toLowerCase().replaceAll("[\"`\\[\\]]", "");
sb.append(columnNameUnwrapped);
}
return sb.toString();
}
Expand Down
Loading
Loading