-
Notifications
You must be signed in to change notification settings - Fork 2.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
NIFI-10831 enable JWT realm authentication with Elasticsearch #9605
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -22,6 +22,7 @@ | |||||
import org.apache.nifi.controller.ControllerService; | ||||||
import org.apache.nifi.controller.VerifiableControllerService; | ||||||
import org.apache.nifi.expression.ExpressionLanguageScope; | ||||||
import org.apache.nifi.oauth2.OAuth2AccessTokenProvider; | ||||||
import org.apache.nifi.processor.util.StandardValidators; | ||||||
import org.apache.nifi.proxy.ProxyConfiguration; | ||||||
import org.apache.nifi.proxy.ProxySpec; | ||||||
|
@@ -50,6 +51,7 @@ public interface ElasticSearchClientService extends ControllerService, Verifiabl | |||||
.identifiesControllerService(SSLContextProvider.class) | ||||||
.addValidator(Validator.VALID) | ||||||
.build(); | ||||||
|
||||||
PropertyDescriptor PROXY_CONFIGURATION_SERVICE = ProxyConfiguration.createProxyConfigPropertyDescriptor(ProxySpec.HTTP); | ||||||
|
||||||
PropertyDescriptor AUTHORIZATION_SCHEME = new PropertyDescriptor.Builder() | ||||||
|
@@ -62,6 +64,25 @@ public interface ElasticSearchClientService extends ControllerService, Verifiabl | |||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) | ||||||
.build(); | ||||||
|
||||||
PropertyDescriptor OAUTH2_ACCESS_TOKEN_PROVIDER = new PropertyDescriptor.Builder() | ||||||
.name("el-cs-oauth2-token-provider") | ||||||
.displayName("OAuth2 Access Token Provider") | ||||||
.description("The OAuth2 Access Token Provider used to provide JWTs for Bearer Token Authorization with Elasticsearch.") | ||||||
.dependsOn(AUTHORIZATION_SCHEME, AuthorizationScheme.JWT) | ||||||
.required(false) | ||||||
.identifiesControllerService(OAuth2AccessTokenProvider.class) | ||||||
.addValidator(Validator.VALID) | ||||||
.build(); | ||||||
|
||||||
PropertyDescriptor RUN_AS_USER = new PropertyDescriptor.Builder() | ||||||
.name("el-cs-run-as-user") | ||||||
.displayName("Run As User") | ||||||
.description("The username to impersonate within Elasticsearch.") | ||||||
.required(false) | ||||||
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT) | ||||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) | ||||||
.build(); | ||||||
|
||||||
PropertyDescriptor USERNAME = new PropertyDescriptor.Builder() | ||||||
.name("el-cs-username") | ||||||
.displayName("Username") | ||||||
|
@@ -103,6 +124,16 @@ public interface ElasticSearchClientService extends ControllerService, Verifiabl | |||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) | ||||||
.build(); | ||||||
|
||||||
PropertyDescriptor JWT_SHARED_SECRET = new PropertyDescriptor.Builder() | ||||||
.name("jwt-shared-secret") | ||||||
.displayName("JWT Shared Secret") | ||||||
.description("JWT realm Shared Secret.") | ||||||
.dependsOn(AUTHORIZATION_SCHEME, AuthorizationScheme.JWT) | ||||||
.required(false) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This can be changed to
Suggested change
|
||||||
.sensitive(true) | ||||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) | ||||||
.build(); | ||||||
|
||||||
PropertyDescriptor CONNECT_TIMEOUT = new PropertyDescriptor.Builder() | ||||||
.name("el-cs-connect-timeout") | ||||||
.displayName("Connect timeout") | ||||||
|
@@ -220,7 +251,7 @@ public interface ElasticSearchClientService extends ControllerService, Verifiabl | |||||
.name("el-cs-sniff-failure") | ||||||
.displayName("Sniff on Failure") | ||||||
.description("Enable sniffing on failure, meaning that after each failure the Elasticsearch nodes list gets updated " + | ||||||
"straightaway rather than at the following ordinary sniffing round") | ||||||
"straight away rather than at the following ordinary sniffing round") | ||||||
.dependsOn(SNIFF_CLUSTER_NODES, "true") | ||||||
.allowableValues("true", "false") | ||||||
.defaultValue("false") | ||||||
|
@@ -370,7 +401,7 @@ public interface ElasticSearchClientService extends ControllerService, Verifiabl | |||||
/** | ||||||
* Perform a search using the JSON DSL. | ||||||
* | ||||||
* @param query A JSON string reprensenting the query. | ||||||
* @param query A JSON string representing the query. | ||||||
* @param index The index to target. Optional. | ||||||
* @param type The type to target. Optional. Will not be used in future versions of Elasticsearch. | ||||||
* @param requestParameters A collection of URL request parameters. Optional. | ||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -47,6 +47,10 @@ | |
<groupId>org.apache.nifi</groupId> | ||
<artifactId>nifi-proxy-configuration-api</artifactId> | ||
</dependency> | ||
<dependency> | ||
<groupId>org.apache.nifi</groupId> | ||
<artifactId>nifi-oauth2-provider-api</artifactId> | ||
</dependency> | ||
<dependency> | ||
<groupId>org.apache.nifi</groupId> | ||
<artifactId>nifi-elasticsearch-client-service-api</artifactId> | ||
|
@@ -83,23 +87,11 @@ | |
<artifactId>jackson-annotations</artifactId> | ||
<scope>provided</scope> | ||
</dependency> | ||
<dependency> | ||
<groupId>commons-io</groupId> | ||
<artifactId>commons-io</artifactId> | ||
</dependency> | ||
<dependency> | ||
<groupId>org.apache.commons</groupId> | ||
<artifactId>commons-compress</artifactId> | ||
</dependency> | ||
Comment on lines
-86
to
-93
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Are these dependencies not used? |
||
<dependency> | ||
<groupId>com.github.stephenc.findbugs</groupId> | ||
<artifactId>findbugs-annotations</artifactId> | ||
<version>1.3.9-1</version> | ||
</dependency> | ||
<dependency> | ||
<groupId>org.apache.commons</groupId> | ||
<artifactId>commons-lang3</artifactId> | ||
</dependency> | ||
<dependency> | ||
<groupId>org.opentest4j</groupId> | ||
<artifactId>opentest4j</artifactId> | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -45,6 +45,7 @@ | |
import org.apache.nifi.controller.ConfigurationContext; | ||
import org.apache.nifi.expression.ExpressionLanguageScope; | ||
import org.apache.nifi.logging.ComponentLog; | ||
import org.apache.nifi.oauth2.OAuth2AccessTokenProvider; | ||
import org.apache.nifi.processor.exception.ProcessException; | ||
import org.apache.nifi.processor.util.StandardValidators; | ||
import org.apache.nifi.proxy.ProxyConfiguration; | ||
|
@@ -56,6 +57,7 @@ | |
import org.elasticsearch.client.Node; | ||
import org.elasticsearch.client.NodeSelector; | ||
import org.elasticsearch.client.Request; | ||
import org.elasticsearch.client.RequestOptions; | ||
import org.elasticsearch.client.Response; | ||
import org.elasticsearch.client.ResponseException; | ||
import org.elasticsearch.client.RestClient; | ||
|
@@ -102,10 +104,12 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im | |
|
||
private ObjectMapper mapper; | ||
|
||
private static final List<PropertyDescriptor> properties = List.of(HTTP_HOSTS, PATH_PREFIX, AUTHORIZATION_SCHEME, USERNAME, PASSWORD, API_KEY_ID, API_KEY, | ||
PROP_SSL_CONTEXT_SERVICE, PROXY_CONFIGURATION_SERVICE, CONNECT_TIMEOUT, SOCKET_TIMEOUT, CHARSET, | ||
SUPPRESS_NULLS, COMPRESSION, SEND_META_HEADER, STRICT_DEPRECATION, NODE_SELECTOR, SNIFF_CLUSTER_NODES, | ||
SNIFFER_INTERVAL, SNIFFER_REQUEST_TIMEOUT, SNIFF_ON_FAILURE, SNIFFER_FAILURE_DELAY); | ||
private static final List<PropertyDescriptor> properties = List.of(HTTP_HOSTS, PATH_PREFIX, AUTHORIZATION_SCHEME, USERNAME, PASSWORD, | ||
API_KEY_ID, API_KEY, JWT_SHARED_SECRET, OAUTH2_ACCESS_TOKEN_PROVIDER, RUN_AS_USER, PROP_SSL_CONTEXT_SERVICE, PROXY_CONFIGURATION_SERVICE, | ||
CONNECT_TIMEOUT, SOCKET_TIMEOUT, CHARSET, SUPPRESS_NULLS, COMPRESSION, SEND_META_HEADER, STRICT_DEPRECATION, NODE_SELECTOR, | ||
SNIFF_CLUSTER_NODES, SNIFFER_INTERVAL, SNIFFER_REQUEST_TIMEOUT, SNIFF_ON_FAILURE, SNIFFER_FAILURE_DELAY); | ||
Comment on lines
+107
to
+110
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is a good opportunity to reformat these properties and list one per line for easier readability. |
||
|
||
private OAuth2AccessTokenProvider oAuth2AccessTokenProvider; | ||
|
||
private RestClient client; | ||
|
||
|
@@ -145,6 +149,9 @@ protected Collection<ValidationResult> customValidate(final ValidationContext va | |
|
||
final SSLContextProvider sslContextProvider = validationContext.getProperty(PROP_SSL_CONTEXT_SERVICE).asControllerService(SSLContextProvider.class); | ||
|
||
final boolean jwtSharedSecretSet = validationContext.getProperty(JWT_SHARED_SECRET).isSet(); | ||
final OAuth2AccessTokenProvider oAuth2Provider = validationContext.getProperty(OAUTH2_ACCESS_TOKEN_PROVIDER).asControllerService(OAuth2AccessTokenProvider.class); | ||
|
||
if (authorizationScheme == AuthorizationScheme.PKI && (sslContextProvider == null)) { | ||
results.add(new ValidationResult.Builder().subject(PROP_SSL_CONTEXT_SERVICE.getName()).valid(false) | ||
.explanation(String.format("if '%s' is '%s' then '%s' must be set and specify a Keystore for mutual TLS encryption.", | ||
|
@@ -153,6 +160,23 @@ protected Collection<ValidationResult> customValidate(final ValidationContext va | |
); | ||
} | ||
|
||
if (authorizationScheme == AuthorizationScheme.JWT) { | ||
if (oAuth2Provider == null) { | ||
results.add(new ValidationResult.Builder().subject(OAUTH2_ACCESS_TOKEN_PROVIDER.getName()).valid(false) | ||
.explanation(String.format("if '%s' is '%s' then '%s' must be set.", | ||
AUTHORIZATION_SCHEME.getDisplayName(), authorizationScheme.getDisplayName(), OAUTH2_ACCESS_TOKEN_PROVIDER.getDisplayName()) | ||
).build() | ||
); | ||
} | ||
if (!jwtSharedSecretSet) { | ||
results.add(new ValidationResult.Builder().subject(JWT_SHARED_SECRET.getName()).valid(false) | ||
.explanation(String.format("if '%s' is '%s' then '%s' must be set.", | ||
AUTHORIZATION_SCHEME.getDisplayName(), authorizationScheme.getDisplayName(), JWT_SHARED_SECRET.getDisplayName()) | ||
).build() | ||
); | ||
} | ||
} | ||
Comment on lines
+163
to
+178
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This can be be removed given the use of dependent properties. |
||
|
||
if (usernameSet && !passwordSet) { | ||
addAuthorizationPropertiesValidationIssue(results, USERNAME, PASSWORD); | ||
} else if (passwordSet && !usernameSet) { | ||
|
@@ -177,7 +201,7 @@ protected Collection<ValidationResult> customValidate(final ValidationContext va | |
|
||
private void addAuthorizationPropertiesValidationIssue(final List<ValidationResult> results, final PropertyDescriptor presentProperty, final PropertyDescriptor missingProperty) { | ||
results.add(new ValidationResult.Builder().subject(missingProperty.getName()).valid(false) | ||
.explanation(String.format("if '%s' is then '%s' must be set.", presentProperty.getDisplayName(), missingProperty.getDisplayName())) | ||
.explanation(String.format("if '%s' is set, then '%s' must be set.", presentProperty.getDisplayName(), missingProperty.getDisplayName())) | ||
.build() | ||
); | ||
} | ||
|
@@ -187,11 +211,12 @@ public void onEnabled(final ConfigurationContext context) throws InitializationE | |
try { | ||
this.client = setupClient(context); | ||
this.sniffer = setupSniffer(context, this.client); | ||
responseCharset = Charset.forName(context.getProperty(CHARSET).getValue()); | ||
this.responseCharset = Charset.forName(context.getProperty(CHARSET).getValue()); | ||
|
||
this.oAuth2AccessTokenProvider = context.getProperty(OAUTH2_ACCESS_TOKEN_PROVIDER).asControllerService(OAuth2AccessTokenProvider.class); | ||
|
||
// re-create the ObjectMapper in case the SUPPRESS_NULLS property has changed - the JsonInclude settings aren't dynamic | ||
createObjectMapper(context); | ||
|
||
} catch (final Exception ex) { | ||
getLogger().error("Could not initialize ElasticSearch client.", ex); | ||
throw new InitializationException(ex); | ||
|
@@ -244,10 +269,11 @@ public List<ConfigVerificationResult> verify(final ConfigurationContext context, | |
clientSetupResult.outcome(ConfigVerificationResult.Outcome.SUCCESSFUL); | ||
|
||
// try to fetch the Elasticsearch root endpoint (system summary) | ||
verifyRootConnection(verifyClient, connectionResult, warningsResult); | ||
final OAuth2AccessTokenProvider tokenProvider = context.getProperty(OAUTH2_ACCESS_TOKEN_PROVIDER).asControllerService(OAuth2AccessTokenProvider.class); | ||
verifyRootConnection(verifyClient, tokenProvider, connectionResult, warningsResult); | ||
|
||
// try sniffing for cluster nodes | ||
verifySniffer(context, verifyClient, snifferResult); | ||
verifySniffer(context, verifyClient, tokenProvider, snifferResult); | ||
} catch (final MalformedURLException mue) { | ||
clientSetupResult.outcome(ConfigVerificationResult.Outcome.FAILED) | ||
.explanation("Incorrect/invalid " + ElasticSearchClientService.HTTP_HOSTS.getDisplayName()); | ||
|
@@ -283,7 +309,7 @@ public List<ConfigVerificationResult> verify(final ConfigurationContext context, | |
return results; | ||
} | ||
|
||
private void verifySniffer(final ConfigurationContext context, final RestClient verifyClient, final ConfigVerificationResult.Builder snifferResult) { | ||
private void verifySniffer(final ConfigurationContext context, final RestClient verifyClient, final OAuth2AccessTokenProvider tokenProvider, final ConfigVerificationResult.Builder snifferResult) { | ||
try (final Sniffer verifySniffer = setupSniffer(context, verifyClient)) { | ||
if (verifySniffer != null) { | ||
final List<Node> originalNodes = verifyClient.getNodes(); | ||
|
@@ -297,7 +323,7 @@ private void verifySniffer(final ConfigurationContext context, final RestClient | |
nodes.forEach(n -> { | ||
try { | ||
verifyClient.setNodes(Collections.singletonList(n)); | ||
final List<String> warnings = getElasticsearchRoot(verifyClient); | ||
final List<String> warnings = getElasticsearchRoot(verifyClient, tokenProvider); | ||
successfulInstances.getAndIncrement(); | ||
if (!warnings.isEmpty()) { | ||
warningInstances.getAndIncrement(); | ||
|
@@ -331,17 +357,20 @@ private void verifySniffer(final ConfigurationContext context, final RestClient | |
} | ||
} | ||
|
||
private List<String> getElasticsearchRoot(final RestClient verifyClient) throws IOException { | ||
final Response response = verifyClient.performRequest(new Request("GET", "/")); | ||
private List<String> getElasticsearchRoot(final RestClient verifyClient, final OAuth2AccessTokenProvider tokenProvider) throws IOException { | ||
final Request request = addJWTAuthorizationHeader(new Request("GET", "/"), tokenProvider); | ||
final Response response = verifyClient.performRequest(request); | ||
final List<String> warnings = parseResponseWarningHeaders(response); | ||
// ensure the response can be parsed without exception | ||
parseResponse(response); | ||
|
||
return warnings; | ||
} | ||
|
||
private void verifyRootConnection(final RestClient verifyClient, final ConfigVerificationResult.Builder connectionResult, final ConfigVerificationResult.Builder warningsResult) { | ||
private void verifyRootConnection(final RestClient verifyClient, final OAuth2AccessTokenProvider tokenProvider, | ||
final ConfigVerificationResult.Builder connectionResult, final ConfigVerificationResult.Builder warningsResult) { | ||
try { | ||
final List<String> warnings = getElasticsearchRoot(verifyClient); | ||
final List<String> warnings = getElasticsearchRoot(verifyClient, tokenProvider); | ||
|
||
connectionResult.outcome(ConfigVerificationResult.Outcome.SUCCESSFUL); | ||
if (warnings.isEmpty()) { | ||
|
@@ -419,9 +448,13 @@ private RestClientBuilder addAuthAndProxy(final ConfigurationContext context, fi | |
final String username = context.getProperty(USERNAME).evaluateAttributeExpressions().getValue(); | ||
final String password = context.getProperty(PASSWORD).evaluateAttributeExpressions().getValue(); | ||
|
||
final String runAsUser = context.getProperty(RUN_AS_USER).evaluateAttributeExpressions().getValue(); | ||
|
||
final String apiKeyId = context.getProperty(API_KEY_ID).getValue(); | ||
final String apiKey = context.getProperty(API_KEY).getValue(); | ||
|
||
final String jwtSharedSecret = context.getProperty(JWT_SHARED_SECRET).getValue(); | ||
|
||
final SSLContext sslContext = getSSLContext(context); | ||
final ProxyConfigurationService proxyConfigurationService = context.getProperty(PROXY_CONFIGURATION_SERVICE).asControllerService(ProxyConfigurationService.class); | ||
|
||
|
@@ -439,6 +472,12 @@ private RestClientBuilder addAuthAndProxy(final ConfigurationContext context, fi | |
if (AuthorizationScheme.API_KEY == authorizationScheme && apiKeyId != null && apiKey != null) { | ||
defaultHeaders.add(createApiKeyAuthorizationHeader(apiKeyId, apiKey)); | ||
} | ||
if (AuthorizationScheme.JWT == authorizationScheme && jwtSharedSecret != null) { | ||
defaultHeaders.add(createSharedSecretHeader(jwtSharedSecret)); | ||
} | ||
if (runAsUser != null) { | ||
defaultHeaders.add(createRunAsUserHeader(runAsUser)); | ||
} | ||
if (!defaultHeaders.isEmpty()) { | ||
builder.setDefaultHeaders(defaultHeaders.toArray(new Header[0])); | ||
} | ||
|
@@ -504,6 +543,23 @@ private BasicHeader createApiKeyAuthorizationHeader(final String apiKeyId, final | |
return new BasicHeader("Authorization", "ApiKey " + apiKeyAuth); | ||
} | ||
|
||
private BasicHeader createSharedSecretHeader(final String jwtSharedSecret) { | ||
return new BasicHeader("ES-Client-Authentication", "sharedsecret " + jwtSharedSecret); | ||
} | ||
|
||
private BasicHeader createRunAsUserHeader(final String runAsUser) { | ||
return new BasicHeader("es-security-runas-user", runAsUser); | ||
} | ||
|
||
private Request addJWTAuthorizationHeader(final Request request, final OAuth2AccessTokenProvider tokenProvider) { | ||
if (tokenProvider != null) { | ||
final RequestOptions.Builder requestOptionsBuilder = RequestOptions.DEFAULT.toBuilder(); | ||
requestOptionsBuilder.addHeader("Authorization", "Bearer " + tokenProvider.getAccessDetails().getAccessToken()); | ||
request.setOptions(requestOptionsBuilder.build()); | ||
} | ||
return request; | ||
} | ||
|
||
private Sniffer setupSniffer(final ConfigurationContext context, final RestClient restClient) { | ||
final boolean sniffClusterNodes = context.getProperty(SNIFF_CLUSTER_NODES).asBoolean(); | ||
final int snifferIntervalMillis = context.getProperty(SNIFFER_INTERVAL).asTimePeriod(TimeUnit.MILLISECONDS).intValue(); | ||
|
@@ -996,7 +1052,7 @@ public String getTransitUrl(final String index, final String type) { | |
} | ||
|
||
private Response performRequest(final String method, final String endpoint, final Map<String, String> parameters, final HttpEntity entity) throws IOException { | ||
final Request request = new Request(method, endpoint); | ||
final Request request = addJWTAuthorizationHeader(new Request(method, endpoint), oAuth2AccessTokenProvider); | ||
if (parameters != null && !parameters.isEmpty()) { | ||
request.addParameters(parameters); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.