Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NIFI-14129 Add Database Dialect Service #9640

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions nifi-assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,12 @@ language governing permissions and limitations under the License. -->
<version>2.2.0-SNAPSHOT</version>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-database-dialect-service-nar</artifactId>
<version>2.2.0-SNAPSHOT</version>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mongodb-client-service-api-nar</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-dbcp-service-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-database-dialect-service-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-web-client-provider-api</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,24 @@
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.database.dialect.service.api.ColumnDefinition;
import org.apache.nifi.database.dialect.service.api.StandardColumnDefinition;
import org.apache.nifi.database.dialect.service.api.DatabaseDialectService;
import org.apache.nifi.database.dialect.service.api.QueryClause;
import org.apache.nifi.database.dialect.service.api.QueryClauseType;
import org.apache.nifi.database.dialect.service.api.QueryStatementRequest;
import org.apache.nifi.database.dialect.service.api.StandardQueryStatementRequest;
import org.apache.nifi.database.dialect.service.api.StatementResponse;
import org.apache.nifi.database.dialect.service.api.StatementType;
import org.apache.nifi.database.dialect.service.api.TableDefinition;
import org.apache.nifi.dbcp.DBCPService;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.standard.db.DatabaseAdapter;
import org.apache.nifi.util.StringUtils;
import org.apache.nifi.processors.standard.db.DatabaseAdapterDescriptor;
import org.apache.nifi.processors.standard.db.impl.DatabaseAdapterDatabaseDialectService;
import org.apache.nifi.processors.standard.db.impl.DatabaseDialectServiceDatabaseAdapter;

import java.sql.Connection;
import java.sql.ResultSet;
Expand All @@ -38,37 +50,17 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.ServiceLoader;
import java.util.Optional;
import java.util.stream.Collectors;

@Tags({"database", "dbcp", "sql"})
@CapabilityDescription("Fetches parameters from database tables")

public class DatabaseParameterProvider extends AbstractParameterProvider implements VerifiableParameterProvider {

protected final static Map<String, DatabaseAdapter> dbAdapters = new HashMap<>();

public static final PropertyDescriptor DB_TYPE;

static {
// Load the DatabaseAdapters
ArrayList<AllowableValue> dbAdapterValues = new ArrayList<>();
ServiceLoader<DatabaseAdapter> dbAdapterLoader = ServiceLoader.load(DatabaseAdapter.class);
dbAdapterLoader.forEach(it -> {
dbAdapters.put(it.getName(), it);
dbAdapterValues.add(new AllowableValue(it.getName(), it.getName(), it.getDescription()));
});

DB_TYPE = new PropertyDescriptor.Builder()
.name("db-type")
.displayName("Database Type")
.description("The type/flavor of database, used for generating database-specific code. In many cases the Generic type "
+ "should suffice, but some databases (such as Oracle) require custom SQL clauses. ")
.allowableValues(dbAdapterValues.toArray(new AllowableValue[dbAdapterValues.size()]))
.defaultValue("Generic")
.required(true)
.build();
}
public static final PropertyDescriptor DB_TYPE = DatabaseAdapterDescriptor.getDatabaseTypeDescriptor("db-type");

public static final PropertyDescriptor DATABASE_DIALECT_SERVICE = DatabaseAdapterDescriptor.getDatabaseDialectServiceDescriptor(DB_TYPE);

static AllowableValue GROUPING_BY_COLUMN = new AllowableValue("grouping-by-column", "Column",
"A single table is partitioned by the 'Parameter Group Name Column'. All rows with the same value in this column will " +
Expand Down Expand Up @@ -149,6 +141,7 @@ public class DatabaseParameterProvider extends AbstractParameterProvider impleme
protected void init(final ParameterProviderInitializationContext config) {
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(DB_TYPE);
properties.add(DATABASE_DIALECT_SERVICE);
properties.add(DBCP_SERVICE);
properties.add(PARAMETER_GROUPING_STRATEGY);
properties.add(TABLE_NAME);
Expand Down Expand Up @@ -233,8 +226,22 @@ private void validateValueNotNull(final String value, final String columnName) {
}

String getQuery(final ConfigurationContext context, final String tableName, final List<String> columns, final String whereClause) {
final DatabaseAdapter dbAdapter = dbAdapters.get(context.getProperty(DB_TYPE).getValue());
return dbAdapter.getSelectStatement(tableName, StringUtils.join(columns, ", "), whereClause, null, null, null);
final DatabaseDialectService databaseDialectService = getDatabaseDialectService(context);

final List<ColumnDefinition> columnDefinitions = columns.stream()
.map(StandardColumnDefinition::new)
.map(ColumnDefinition.class::cast)
.toList();
final TableDefinition tableDefinition = new TableDefinition(Optional.empty(), Optional.empty(), tableName, columnDefinitions);
final QueryStatementRequest queryStatementRequest = new StandardQueryStatementRequest(
StatementType.SELECT,
tableDefinition,
Optional.empty(),
List.of(new QueryClause(QueryClauseType.WHERE, whereClause)),
Optional.empty()
);
final StatementResponse statementResponse = databaseDialectService.getStatement(queryStatementRequest);
return statementResponse.sql();
}

@Override
Expand Down Expand Up @@ -262,4 +269,15 @@ public List<ConfigVerificationResult> verify(final ConfigurationContext context,

return results;
}

private DatabaseDialectService getDatabaseDialectService(final PropertyContext context) {
final DatabaseDialectService databaseDialectService;
final String databaseType = context.getProperty(DB_TYPE).getValue();
if (DatabaseDialectServiceDatabaseAdapter.NAME.equals(databaseType)) {
databaseDialectService = context.getProperty(DATABASE_DIALECT_SERVICE).asControllerService(DatabaseDialectService.class);
} else {
databaseDialectService = new DatabaseAdapterDatabaseDialectService(databaseType);
}
return databaseDialectService;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,10 @@
<artifactId>nifi-dbcp-service-api</artifactId>
<version>2.2.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-database-dialect-service-api</artifactId>
</dependency>
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId>
Expand Down
Loading
Loading