diff --git a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/pom.xml b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/pom.xml
index 39cb2509cb28..52c509c40ec5 100644
--- a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/pom.xml
+++ b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/pom.xml
@@ -36,9 +36,8 @@
nifi-proxy-configuration-api
- com.fasterxml.jackson.core
- jackson-databind
- compile
+ org.apache.nifi
+ nifi-oauth2-provider-api
diff --git a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/AuthorizationScheme.java b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/AuthorizationScheme.java
index c62e1f22d334..feb31b44c69b 100644
--- a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/AuthorizationScheme.java
+++ b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/AuthorizationScheme.java
@@ -22,7 +22,8 @@ public enum AuthorizationScheme implements DescribedValue {
NONE("None", "No authorization scheme."),
PKI("PKI", "Mutual TLS with PKI certificate authorization scheme."),
BASIC("Basic", "Basic authorization scheme."),
- API_KEY("API Key", "API key authorization scheme.");
+ API_KEY("API Key", "API key authorization scheme."),
+ JWT("JWT", "JWT realm scheme.");
private final String displayName;
private final String description;
diff --git a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientService.java b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientService.java
index 77e819332a48..899e977a4562 100644
--- a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientService.java
+++ b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientService.java
@@ -22,6 +22,7 @@
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.controller.VerifiableControllerService;
import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.oauth2.OAuth2AccessTokenProvider;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.proxy.ProxyConfiguration;
import org.apache.nifi.proxy.ProxySpec;
@@ -50,6 +51,7 @@ public interface ElasticSearchClientService extends ControllerService, Verifiabl
.identifiesControllerService(SSLContextProvider.class)
.addValidator(Validator.VALID)
.build();
+
PropertyDescriptor PROXY_CONFIGURATION_SERVICE = ProxyConfiguration.createProxyConfigPropertyDescriptor(ProxySpec.HTTP);
PropertyDescriptor AUTHORIZATION_SCHEME = new PropertyDescriptor.Builder()
@@ -62,6 +64,25 @@ public interface ElasticSearchClientService extends ControllerService, Verifiabl
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
+ PropertyDescriptor OAUTH2_ACCESS_TOKEN_PROVIDER = new PropertyDescriptor.Builder()
+ .name("el-cs-oauth2-token-provider")
+ .displayName("OAuth2 Access Token Provider")
+ .description("The OAuth2 Access Token Provider used to provide JWTs for Bearer Token Authorization with Elasticsearch.")
+ .dependsOn(AUTHORIZATION_SCHEME, AuthorizationScheme.JWT)
+ .required(false)
+ .identifiesControllerService(OAuth2AccessTokenProvider.class)
+ .addValidator(Validator.VALID)
+ .build();
+
+ PropertyDescriptor RUN_AS_USER = new PropertyDescriptor.Builder()
+ .name("el-cs-run-as-user")
+ .displayName("Run As User")
+ .description("The username to impersonate within Elasticsearch.")
+ .required(false)
+ .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
PropertyDescriptor USERNAME = new PropertyDescriptor.Builder()
.name("el-cs-username")
.displayName("Username")
@@ -103,6 +124,16 @@ public interface ElasticSearchClientService extends ControllerService, Verifiabl
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
+ PropertyDescriptor JWT_SHARED_SECRET = new PropertyDescriptor.Builder()
+ .name("jwt-shared-secret")
+ .displayName("JWT Shared Secret")
+ .description("JWT realm Shared Secret.")
+ .dependsOn(AUTHORIZATION_SCHEME, AuthorizationScheme.JWT)
+ .required(false)
+ .sensitive(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
PropertyDescriptor CONNECT_TIMEOUT = new PropertyDescriptor.Builder()
.name("el-cs-connect-timeout")
.displayName("Connect timeout")
@@ -220,7 +251,7 @@ public interface ElasticSearchClientService extends ControllerService, Verifiabl
.name("el-cs-sniff-failure")
.displayName("Sniff on Failure")
.description("Enable sniffing on failure, meaning that after each failure the Elasticsearch nodes list gets updated " +
- "straightaway rather than at the following ordinary sniffing round")
+ "straight away rather than at the following ordinary sniffing round")
.dependsOn(SNIFF_CLUSTER_NODES, "true")
.allowableValues("true", "false")
.defaultValue("false")
@@ -370,7 +401,7 @@ public interface ElasticSearchClientService extends ControllerService, Verifiabl
/**
* Perform a search using the JSON DSL.
*
- * @param query A JSON string reprensenting the query.
+ * @param query A JSON string representing the query.
* @param index The index to target. Optional.
* @param type The type to target. Optional. Will not be used in future versions of Elasticsearch.
* @param requestParameters A collection of URL request parameters. Optional.
diff --git a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/pom.xml b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/pom.xml
index 80e6ae99a917..91fe867bfa6d 100644
--- a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/pom.xml
+++ b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/pom.xml
@@ -47,6 +47,10 @@
org.apache.nifi
nifi-proxy-configuration-api
+
+ org.apache.nifi
+ nifi-oauth2-provider-api
+
org.apache.nifi
nifi-elasticsearch-client-service-api
@@ -83,23 +87,11 @@
jackson-annotations
provided
-
- commons-io
- commons-io
-
-
- org.apache.commons
- commons-compress
-
com.github.stephenc.findbugs
findbugs-annotations
1.3.9-1
-
- org.apache.commons
- commons-lang3
-
org.opentest4j
opentest4j
diff --git a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java
index 8169a38f0636..de9949812ddb 100644
--- a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java
+++ b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java
@@ -45,6 +45,7 @@
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.oauth2.OAuth2AccessTokenProvider;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.proxy.ProxyConfiguration;
@@ -56,6 +57,7 @@
import org.elasticsearch.client.Node;
import org.elasticsearch.client.NodeSelector;
import org.elasticsearch.client.Request;
+import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.client.RestClient;
@@ -102,10 +104,12 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
private ObjectMapper mapper;
- private static final List properties = List.of(HTTP_HOSTS, PATH_PREFIX, AUTHORIZATION_SCHEME, USERNAME, PASSWORD, API_KEY_ID, API_KEY,
- PROP_SSL_CONTEXT_SERVICE, PROXY_CONFIGURATION_SERVICE, CONNECT_TIMEOUT, SOCKET_TIMEOUT, CHARSET,
- SUPPRESS_NULLS, COMPRESSION, SEND_META_HEADER, STRICT_DEPRECATION, NODE_SELECTOR, SNIFF_CLUSTER_NODES,
- SNIFFER_INTERVAL, SNIFFER_REQUEST_TIMEOUT, SNIFF_ON_FAILURE, SNIFFER_FAILURE_DELAY);
+ private static final List properties = List.of(HTTP_HOSTS, PATH_PREFIX, AUTHORIZATION_SCHEME, USERNAME, PASSWORD,
+ API_KEY_ID, API_KEY, JWT_SHARED_SECRET, OAUTH2_ACCESS_TOKEN_PROVIDER, RUN_AS_USER, PROP_SSL_CONTEXT_SERVICE, PROXY_CONFIGURATION_SERVICE,
+ CONNECT_TIMEOUT, SOCKET_TIMEOUT, CHARSET, SUPPRESS_NULLS, COMPRESSION, SEND_META_HEADER, STRICT_DEPRECATION, NODE_SELECTOR,
+ SNIFF_CLUSTER_NODES, SNIFFER_INTERVAL, SNIFFER_REQUEST_TIMEOUT, SNIFF_ON_FAILURE, SNIFFER_FAILURE_DELAY);
+
+ private OAuth2AccessTokenProvider oAuth2AccessTokenProvider;
private RestClient client;
@@ -145,6 +149,9 @@ protected Collection customValidate(final ValidationContext va
final SSLContextProvider sslContextProvider = validationContext.getProperty(PROP_SSL_CONTEXT_SERVICE).asControllerService(SSLContextProvider.class);
+ final boolean jwtSharedSecretSet = validationContext.getProperty(JWT_SHARED_SECRET).isSet();
+ final OAuth2AccessTokenProvider oAuth2Provider = validationContext.getProperty(OAUTH2_ACCESS_TOKEN_PROVIDER).asControllerService(OAuth2AccessTokenProvider.class);
+
if (authorizationScheme == AuthorizationScheme.PKI && (sslContextProvider == null)) {
results.add(new ValidationResult.Builder().subject(PROP_SSL_CONTEXT_SERVICE.getName()).valid(false)
.explanation(String.format("if '%s' is '%s' then '%s' must be set and specify a Keystore for mutual TLS encryption.",
@@ -153,6 +160,23 @@ protected Collection customValidate(final ValidationContext va
);
}
+ if (authorizationScheme == AuthorizationScheme.JWT) {
+ if (oAuth2Provider == null) {
+ results.add(new ValidationResult.Builder().subject(OAUTH2_ACCESS_TOKEN_PROVIDER.getName()).valid(false)
+ .explanation(String.format("if '%s' is '%s' then '%s' must be set.",
+ AUTHORIZATION_SCHEME.getDisplayName(), authorizationScheme.getDisplayName(), OAUTH2_ACCESS_TOKEN_PROVIDER.getDisplayName())
+ ).build()
+ );
+ }
+ if (!jwtSharedSecretSet) {
+ results.add(new ValidationResult.Builder().subject(JWT_SHARED_SECRET.getName()).valid(false)
+ .explanation(String.format("if '%s' is '%s' then '%s' must be set.",
+ AUTHORIZATION_SCHEME.getDisplayName(), authorizationScheme.getDisplayName(), JWT_SHARED_SECRET.getDisplayName())
+ ).build()
+ );
+ }
+ }
+
if (usernameSet && !passwordSet) {
addAuthorizationPropertiesValidationIssue(results, USERNAME, PASSWORD);
} else if (passwordSet && !usernameSet) {
@@ -177,7 +201,7 @@ protected Collection customValidate(final ValidationContext va
private void addAuthorizationPropertiesValidationIssue(final List results, final PropertyDescriptor presentProperty, final PropertyDescriptor missingProperty) {
results.add(new ValidationResult.Builder().subject(missingProperty.getName()).valid(false)
- .explanation(String.format("if '%s' is then '%s' must be set.", presentProperty.getDisplayName(), missingProperty.getDisplayName()))
+ .explanation(String.format("if '%s' is set, then '%s' must be set.", presentProperty.getDisplayName(), missingProperty.getDisplayName()))
.build()
);
}
@@ -187,11 +211,12 @@ public void onEnabled(final ConfigurationContext context) throws InitializationE
try {
this.client = setupClient(context);
this.sniffer = setupSniffer(context, this.client);
- responseCharset = Charset.forName(context.getProperty(CHARSET).getValue());
+ this.responseCharset = Charset.forName(context.getProperty(CHARSET).getValue());
+
+ this.oAuth2AccessTokenProvider = context.getProperty(OAUTH2_ACCESS_TOKEN_PROVIDER).asControllerService(OAuth2AccessTokenProvider.class);
// re-create the ObjectMapper in case the SUPPRESS_NULLS property has changed - the JsonInclude settings aren't dynamic
createObjectMapper(context);
-
} catch (final Exception ex) {
getLogger().error("Could not initialize ElasticSearch client.", ex);
throw new InitializationException(ex);
@@ -244,10 +269,11 @@ public List verify(final ConfigurationContext context,
clientSetupResult.outcome(ConfigVerificationResult.Outcome.SUCCESSFUL);
// try to fetch the Elasticsearch root endpoint (system summary)
- verifyRootConnection(verifyClient, connectionResult, warningsResult);
+ final OAuth2AccessTokenProvider tokenProvider = context.getProperty(OAUTH2_ACCESS_TOKEN_PROVIDER).asControllerService(OAuth2AccessTokenProvider.class);
+ verifyRootConnection(verifyClient, tokenProvider, connectionResult, warningsResult);
// try sniffing for cluster nodes
- verifySniffer(context, verifyClient, snifferResult);
+ verifySniffer(context, verifyClient, tokenProvider, snifferResult);
} catch (final MalformedURLException mue) {
clientSetupResult.outcome(ConfigVerificationResult.Outcome.FAILED)
.explanation("Incorrect/invalid " + ElasticSearchClientService.HTTP_HOSTS.getDisplayName());
@@ -283,7 +309,7 @@ public List verify(final ConfigurationContext context,
return results;
}
- private void verifySniffer(final ConfigurationContext context, final RestClient verifyClient, final ConfigVerificationResult.Builder snifferResult) {
+ private void verifySniffer(final ConfigurationContext context, final RestClient verifyClient, final OAuth2AccessTokenProvider tokenProvider, final ConfigVerificationResult.Builder snifferResult) {
try (final Sniffer verifySniffer = setupSniffer(context, verifyClient)) {
if (verifySniffer != null) {
final List originalNodes = verifyClient.getNodes();
@@ -297,7 +323,7 @@ private void verifySniffer(final ConfigurationContext context, final RestClient
nodes.forEach(n -> {
try {
verifyClient.setNodes(Collections.singletonList(n));
- final List warnings = getElasticsearchRoot(verifyClient);
+ final List warnings = getElasticsearchRoot(verifyClient, tokenProvider);
successfulInstances.getAndIncrement();
if (!warnings.isEmpty()) {
warningInstances.getAndIncrement();
@@ -331,17 +357,20 @@ private void verifySniffer(final ConfigurationContext context, final RestClient
}
}
- private List getElasticsearchRoot(final RestClient verifyClient) throws IOException {
- final Response response = verifyClient.performRequest(new Request("GET", "/"));
+ private List getElasticsearchRoot(final RestClient verifyClient, final OAuth2AccessTokenProvider tokenProvider) throws IOException {
+ final Request request = addJWTAuthorizationHeader(new Request("GET", "/"), tokenProvider);
+ final Response response = verifyClient.performRequest(request);
final List warnings = parseResponseWarningHeaders(response);
+ // ensure the response can be parsed without exception
parseResponse(response);
return warnings;
}
- private void verifyRootConnection(final RestClient verifyClient, final ConfigVerificationResult.Builder connectionResult, final ConfigVerificationResult.Builder warningsResult) {
+ private void verifyRootConnection(final RestClient verifyClient, final OAuth2AccessTokenProvider tokenProvider,
+ final ConfigVerificationResult.Builder connectionResult, final ConfigVerificationResult.Builder warningsResult) {
try {
- final List warnings = getElasticsearchRoot(verifyClient);
+ final List warnings = getElasticsearchRoot(verifyClient, tokenProvider);
connectionResult.outcome(ConfigVerificationResult.Outcome.SUCCESSFUL);
if (warnings.isEmpty()) {
@@ -419,9 +448,13 @@ private RestClientBuilder addAuthAndProxy(final ConfigurationContext context, fi
final String username = context.getProperty(USERNAME).evaluateAttributeExpressions().getValue();
final String password = context.getProperty(PASSWORD).evaluateAttributeExpressions().getValue();
+ final String runAsUser = context.getProperty(RUN_AS_USER).evaluateAttributeExpressions().getValue();
+
final String apiKeyId = context.getProperty(API_KEY_ID).getValue();
final String apiKey = context.getProperty(API_KEY).getValue();
+ final String jwtSharedSecret = context.getProperty(JWT_SHARED_SECRET).getValue();
+
final SSLContext sslContext = getSSLContext(context);
final ProxyConfigurationService proxyConfigurationService = context.getProperty(PROXY_CONFIGURATION_SERVICE).asControllerService(ProxyConfigurationService.class);
@@ -439,6 +472,12 @@ private RestClientBuilder addAuthAndProxy(final ConfigurationContext context, fi
if (AuthorizationScheme.API_KEY == authorizationScheme && apiKeyId != null && apiKey != null) {
defaultHeaders.add(createApiKeyAuthorizationHeader(apiKeyId, apiKey));
}
+ if (AuthorizationScheme.JWT == authorizationScheme && jwtSharedSecret != null) {
+ defaultHeaders.add(createSharedSecretHeader(jwtSharedSecret));
+ }
+ if (runAsUser != null) {
+ defaultHeaders.add(createRunAsUserHeader(runAsUser));
+ }
if (!defaultHeaders.isEmpty()) {
builder.setDefaultHeaders(defaultHeaders.toArray(new Header[0]));
}
@@ -504,6 +543,23 @@ private BasicHeader createApiKeyAuthorizationHeader(final String apiKeyId, final
return new BasicHeader("Authorization", "ApiKey " + apiKeyAuth);
}
+ private BasicHeader createSharedSecretHeader(final String jwtSharedSecret) {
+ return new BasicHeader("ES-Client-Authentication", "sharedsecret " + jwtSharedSecret);
+ }
+
+ private BasicHeader createRunAsUserHeader(final String runAsUser) {
+ return new BasicHeader("es-security-runas-user", runAsUser);
+ }
+
+ private Request addJWTAuthorizationHeader(final Request request, final OAuth2AccessTokenProvider tokenProvider) {
+ if (tokenProvider != null) {
+ final RequestOptions.Builder requestOptionsBuilder = RequestOptions.DEFAULT.toBuilder();
+ requestOptionsBuilder.addHeader("Authorization", "Bearer " + tokenProvider.getAccessDetails().getAccessToken());
+ request.setOptions(requestOptionsBuilder.build());
+ }
+ return request;
+ }
+
private Sniffer setupSniffer(final ConfigurationContext context, final RestClient restClient) {
final boolean sniffClusterNodes = context.getProperty(SNIFF_CLUSTER_NODES).asBoolean();
final int snifferIntervalMillis = context.getProperty(SNIFFER_INTERVAL).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
@@ -996,7 +1052,7 @@ public String getTransitUrl(final String index, final String type) {
}
private Response performRequest(final String method, final String endpoint, final Map parameters, final HttpEntity entity) throws IOException {
- final Request request = new Request(method, endpoint);
+ final Request request = addJWTAuthorizationHeader(new Request(method, endpoint), oAuth2AccessTokenProvider);
if (parameters != null && !parameters.isEmpty()) {
request.addParameters(parameters);
}
diff --git a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchLookupService.java b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchLookupService.java
index 38b39975628b..b9f0d7dd23c0 100644
--- a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchLookupService.java
+++ b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchLookupService.java
@@ -209,12 +209,12 @@ private Record getById(final String _id, final Map context) thro
return null;
}
- final Map source = (Map) response.getHits().get(0).get("_source");
+ final Map source = (Map) response.getHits().getFirst().get("_source");
final RecordSchema toUse = getSchema(context, source, null);
Record record = new MapRecord(toUse, source);
- if (recordPathMappings.size() > 0) {
+ if (!recordPathMappings.isEmpty()) {
record = applyMappings(record, source);
}
@@ -239,7 +239,7 @@ private Map buildQuery(final Map coordinates) {
put(e.getKey(), e.getValue());
}});
}
- }}).collect(Collectors.toList())
+ }}).toList()
);
}});
}};
@@ -256,10 +256,10 @@ private Record getByQuery(final Map query, final Map source = (Map) response.getHits().get(0).get("_source");
+ final Map source = (Map) response.getHits().getFirst().get("_source");
final RecordSchema toUse = getSchema(context, source, null);
Record record = new MapRecord(toUse, source);
- if (recordPathMappings.size() > 0) {
+ if (!recordPathMappings.isEmpty()) {
record = applyMappings(record, source);
}
diff --git a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/ElasticSearchClientService_IT.java b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/ElasticSearchClientService_IT.java
index 2e7922084719..fe6c637acc5d 100644
--- a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/ElasticSearchClientService_IT.java
+++ b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/ElasticSearchClientService_IT.java
@@ -263,7 +263,7 @@ private void assertBasicSearch(final Map requestParameters) thro
"four", 4, "five", 5)
.build();
- buckets.forEach( (aggRes) -> {
+ buckets.forEach(aggRes -> {
final String key = (String) aggRes.get("key");
final Integer docCount = (Integer) aggRes.get("doc_count");
assertEquals(expected.get(key), docCount, String.format("%s did not match.", key));
diff --git a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/ElasticSearchLookupService_IT.java b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/ElasticSearchLookupService_IT.java
index 8129e999d5bc..3fc0b5076f89 100644
--- a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/ElasticSearchLookupService_IT.java
+++ b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/ElasticSearchLookupService_IT.java
@@ -49,6 +49,7 @@ class ElasticSearchLookupService_IT extends AbstractElasticsearch_IT {
private ElasticSearchLookupService lookupService;
+ @Override
@BeforeEach
void before() throws Exception {
super.before();
diff --git a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/unit/ElasticSearchClientServiceImplTest.java b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/unit/ElasticSearchClientServiceImplTest.java
index 9fdbcee8cd74..594f711b3ec3 100644
--- a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/unit/ElasticSearchClientServiceImplTest.java
+++ b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/unit/ElasticSearchClientServiceImplTest.java
@@ -21,6 +21,7 @@
import org.apache.nifi.elasticsearch.ElasticSearchClientService;
import org.apache.nifi.elasticsearch.ElasticSearchClientServiceImpl;
import org.apache.nifi.elasticsearch.TestControllerServiceProcessor;
+import org.apache.nifi.oauth2.OAuth2AccessTokenProvider;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.ssl.SSLContextProvider;
import org.apache.nifi.util.TestRunner;
@@ -114,9 +115,25 @@ void testValidatePkiAuth() throws InitializationException {
assertPKIAuthorizationValidationErrorMessage();
}
+ @Test
+ void testValidateJwtAuth() throws InitializationException {
+ runner.setProperty(service, ElasticSearchClientService.AUTHORIZATION_SCHEME, AuthorizationScheme.JWT);
+ runner.setProperty(service, ElasticSearchClientService.JWT_SHARED_SECRET, "jwt-shared-secret");
+ assertJWTAuthorizationValidationErrorMessage(ElasticSearchClientService.OAUTH2_ACCESS_TOKEN_PROVIDER);
+
+ final OAuth2AccessTokenProvider oAuth2AccessTokenProvider = mock(OAuth2AccessTokenProvider.class);
+ when(oAuth2AccessTokenProvider.getIdentifier()).thenReturn("oauth2-access-token-provider");
+ runner.addControllerService("oauth2-access-token-provider", oAuth2AccessTokenProvider);
+ runner.setProperty(service, ElasticSearchClientService.OAUTH2_ACCESS_TOKEN_PROVIDER, "oauth2-access-token-provider");
+ runner.assertValid(service);
+
+ runner.removeProperty(service, ElasticSearchClientService.JWT_SHARED_SECRET);
+ assertJWTAuthorizationValidationErrorMessage(ElasticSearchClientService.JWT_SHARED_SECRET);
+ }
+
private void assertAuthorizationPropertyValidationErrorMessage(final PropertyDescriptor presentProperty, final PropertyDescriptor missingProperty) {
final AssertionFailedError afe = assertThrows(AssertionFailedError.class, () -> runner.assertValid(service));
- assertTrue(afe.getMessage().contains(String.format("if '%s' is then '%s' must be set.", presentProperty.getDisplayName(), missingProperty.getDisplayName())));
+ assertTrue(afe.getMessage().contains(String.format("if '%s' is set, then '%s' must be set.", presentProperty.getDisplayName(), missingProperty.getDisplayName())));
}
private void assertPKIAuthorizationValidationErrorMessage() {
@@ -128,4 +145,14 @@ private void assertPKIAuthorizationValidationErrorMessage() {
ElasticSearchClientService.PROP_SSL_CONTEXT_SERVICE.getDisplayName()
)));
}
+
+ private void assertJWTAuthorizationValidationErrorMessage(final PropertyDescriptor expectedMissingProperty) {
+ final AssertionFailedError afe = assertThrows(AssertionFailedError.class, () -> runner.assertValid(service));
+ assertTrue(afe.getMessage().contains(String.format(
+ "if '%s' is '%s' then '%s' must be set.",
+ ElasticSearchClientService.AUTHORIZATION_SCHEME.getDisplayName(),
+ AuthorizationScheme.JWT.getDisplayName(),
+ expectedMissingProperty.getDisplayName()
+ )));
+ }
}
diff --git a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/unit/ElasticSearchStringLookupServiceTest.java b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/unit/ElasticSearchStringLookupServiceTest.java
index 16971faabfa6..7e19c90ca8ac 100644
--- a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/unit/ElasticSearchStringLookupServiceTest.java
+++ b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/unit/ElasticSearchStringLookupServiceTest.java
@@ -35,7 +35,7 @@
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
-public class ElasticSearchStringLookupServiceTest {
+class ElasticSearchStringLookupServiceTest {
private ElasticSearchClientService mockClientService;
private ElasticSearchStringLookupService lookupService;
@@ -56,8 +56,9 @@ public void setup() throws Exception {
runner.enableControllerService(lookupService);
}
+ @SuppressWarnings("unchecked")
@Test
- public void simpleLookupTest() throws Exception {
+ void simpleLookupTest() throws Exception {
Map coordinates = new HashMap<>();
coordinates.put(ElasticSearchStringLookupService.ID, "12345");
diff --git a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/pom.xml b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/pom.xml
index b35e5f876314..ea5e5ac13d48 100644
--- a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/pom.xml
+++ b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/pom.xml
@@ -72,8 +72,9 @@ language governing permissions and limitations under the License. -->
test
- commons-io
- commons-io
+ org.apache.nifi
+ nifi-oauth2-provider-api
+ test
com.fasterxml.jackson.core
diff --git a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-test-utils/pom.xml b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-test-utils/pom.xml
index 12255e5de472..21e8c148c307 100644
--- a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-test-utils/pom.xml
+++ b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-test-utils/pom.xml
@@ -21,16 +21,6 @@ language governing permissions and limitations under the License. -->
jar
-
- com.fasterxml.jackson.core
- jackson-databind
- compile
-
-
- com.fasterxml.jackson.core
- jackson-core
- compile
-
com.github.docker-java
docker-java-api
diff --git a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-test-utils/src/main/java/org/apache/nifi/elasticsearch/integration/AbstractElasticsearchITBase.java b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-test-utils/src/main/java/org/apache/nifi/elasticsearch/integration/AbstractElasticsearchITBase.java
index ae3db3cc072b..163b8fad9961 100644
--- a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-test-utils/src/main/java/org/apache/nifi/elasticsearch/integration/AbstractElasticsearchITBase.java
+++ b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-test-utils/src/main/java/org/apache/nifi/elasticsearch/integration/AbstractElasticsearchITBase.java
@@ -52,7 +52,7 @@
public abstract class AbstractElasticsearchITBase {
// default Elasticsearch version should (ideally) match that in the nifi-elasticsearch-bundle#pom.xml for the integration-tests profile
protected static final DockerImageName IMAGE = DockerImageName
- .parse(System.getProperty("elasticsearch.docker.image", "docker.elastic.co/elasticsearch/elasticsearch:8.15.1"));
+ .parse(System.getProperty("elasticsearch.docker.image", "docker.elastic.co/elasticsearch/elasticsearch:8.17.0"));
protected static final String ELASTIC_USER_PASSWORD = System.getProperty("elasticsearch.elastic_user.password", RandomStringUtils.randomAlphanumeric(10, 20));
private static final int PORT = 9200;
protected static final ElasticsearchContainer ELASTICSEARCH_CONTAINER = new ElasticsearchContainer(IMAGE)
diff --git a/nifi-extension-bundles/nifi-elasticsearch-bundle/pom.xml b/nifi-extension-bundles/nifi-elasticsearch-bundle/pom.xml
index 2d9499d6e57c..e94f3e48b25e 100644
--- a/nifi-extension-bundles/nifi-elasticsearch-bundle/pom.xml
+++ b/nifi-extension-bundles/nifi-elasticsearch-bundle/pom.xml
@@ -119,7 +119,7 @@ language governing permissions and limitations under the License. -->
elasticsearch7
- 7.17.23
+ 7.17.26