Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#5973] feat(hadoop-catalog): Support credential when using fileset catalog with cloud storage #5974

Merged
merged 75 commits into from
Jan 10, 2025
Merged
Show file tree
Hide file tree
Changes from 64 commits
Commits
Show all changes
75 commits
Select commit Hold shift + click to select a range
c424d8e
Support using dynamic credential
yuqi1129 Dec 24, 2024
d7a031c
Merge branch 'main' of github.com:apache/gravitino into issue_5973
yuqi1129 Dec 26, 2024
5b648e8
Fix again.
yuqi1129 Dec 26, 2024
0c61a48
fix
yuqi1129 Dec 27, 2024
44425e9
Merge branch 'main' of github.com:datastrato/graviton into issue_5973
yuqi1129 Dec 27, 2024
18f6ff6
fix
yuqi1129 Dec 27, 2024
8fc56ae
Fix
yuqi1129 Dec 27, 2024
00fa098
Fix
yuqi1129 Dec 27, 2024
682705d
Fix
yuqi1129 Dec 27, 2024
50a4d15
Fix
yuqi1129 Dec 28, 2024
7f0a99b
Fix
yuqi1129 Dec 29, 2024
fede4a8
Merge branch 'main' of github.com:datastrato/graviton into issue_5973
yuqi1129 Dec 29, 2024
22bea5c
fix conflict
yuqi1129 Dec 29, 2024
f4f2287
fix
yuqi1129 Dec 30, 2024
20f7ec6
fix
yuqi1129 Dec 30, 2024
e8814b0
Polish code.
yuqi1129 Dec 30, 2024
9b7bf01
Merge branch 'main' of github.com:datastrato/graviton into issue_5973
yuqi1129 Dec 30, 2024
06b192b
fix
yuqi1129 Dec 31, 2024
440db59
fix
yuqi1129 Dec 31, 2024
9678513
fix
yuqi1129 Jan 2, 2025
c4fb29a
fix
yuqi1129 Jan 2, 2025
4791a64
Merge branch 'main' of github.com:datastrato/graviton into 5472
yuqi1129 Jan 2, 2025
9a90e7a
Merge branch 'main' of github.com:datastrato/graviton into issue_5973
yuqi1129 Jan 2, 2025
826239d
Merge branch 'main' of github.com:datastrato/graviton into issue_5973
yuqi1129 Jan 3, 2025
baf42e1
fix
yuqi1129 Jan 3, 2025
d86610b
Merge branch 'main' of github.com:datastrato/graviton into 5472
yuqi1129 Jan 3, 2025
a1aa4d5
fix
yuqi1129 Jan 3, 2025
f67981c
fix typo
yuqi1129 Jan 3, 2025
c0db96b
fix
yuqi1129 Jan 3, 2025
d2ba98b
refactor module and create a new module `filesystem-hadoop3-common`
yuqi1129 Jan 3, 2025
b7eb621
fix
yuqi1129 Jan 3, 2025
b34c526
Rename class of credential providers and optimize expired time.
yuqi1129 Jan 3, 2025
1ecc378
update the docs
yuqi1129 Jan 4, 2025
d232e92
polish document again.
yuqi1129 Jan 6, 2025
fbd57ba
Again
yuqi1129 Jan 6, 2025
c47fd09
Merge branch 'main' of github.com:datastrato/graviton into issue_5973
yuqi1129 Jan 6, 2025
2d8f4dc
Merge remote-tracking branch 'me/issue_5973' into issue_5973
yuqi1129 Jan 6, 2025
193f467
resolve comments.
yuqi1129 Jan 6, 2025
a1f0989
fix
yuqi1129 Jan 6, 2025
d75b67e
Merge remote-tracking branch 'me/issue_5973' into issue_5973
yuqi1129 Jan 6, 2025
4fb6e79
fix
yuqi1129 Jan 6, 2025
755a474
fix comments.
yuqi1129 Jan 6, 2025
5251a85
Merge remote-tracking branch 'me/issue_5973' into issue_5973
yuqi1129 Jan 6, 2025
3bb9252
fix
yuqi1129 Jan 6, 2025
614464f
Optimize
yuqi1129 Jan 6, 2025
e481c8d
fix
yuqi1129 Jan 6, 2025
0a97fc7
fix
yuqi1129 Jan 6, 2025
9bb28a8
Merge branch '5472' of github.com:yuqi1129/gravitino into issue_5973
yuqi1129 Jan 7, 2025
7e74fc9
optimize again.
yuqi1129 Jan 7, 2025
c853de2
Merge remote-tracking branch 'me/issue_5973' into issue_5973
yuqi1129 Jan 7, 2025
571c1e9
The new framework of credential vending in fileset.
yuqi1129 Jan 7, 2025
e27f9cd
Refactor the framework of using credential in fileset
yuqi1129 Jan 7, 2025
37a5735
Merge branch 'main' of github.com:apache/gravitino into issue_5973
yuqi1129 Jan 7, 2025
bad0c87
fix
yuqi1129 Jan 7, 2025
ca51efb
fix
yuqi1129 Jan 7, 2025
bf1290c
fix
yuqi1129 Jan 8, 2025
a1b249f
fix typo
yuqi1129 Jan 8, 2025
a722080
fix the checkstyle problem.
yuqi1129 Jan 8, 2025
03395c8
fix
yuqi1129 Jan 8, 2025
274ec3e
fix
yuqi1129 Jan 8, 2025
3eb9e92
Merge branch 'main' of github.com:apache/gravitino into issue_5973
yuqi1129 Jan 8, 2025
a285dee
Polish the code.
yuqi1129 Jan 8, 2025
ce49fa3
fix
yuqi1129 Jan 8, 2025
6b848c8
fix
yuqi1129 Jan 8, 2025
40422f0
Fix method naming problem and fix improper description in comments.
yuqi1129 Jan 9, 2025
8c228fd
Resolve comments
yuqi1129 Jan 9, 2025
d8ba11e
fix minor
yuqi1129 Jan 9, 2025
f5c3837
Remove unnecessary class
yuqi1129 Jan 9, 2025
46ba6da
fix
yuqi1129 Jan 9, 2025
f646aed
fix
yuqi1129 Jan 9, 2025
ee441f7
fix
yuqi1129 Jan 9, 2025
da98dcf
fix
yuqi1129 Jan 9, 2025
41c6dfe
Fix
yuqi1129 Jan 10, 2025
f3f5676
Fix
yuqi1129 Jan 10, 2025
57db471
Fix
yuqi1129 Jan 10, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ public long expireTimeInMs() {
public Map<String, String> credentialInfo() {
return (new ImmutableMap.Builder<String, String>())
.put(GRAVITINO_ADLS_SAS_TOKEN, sasToken)
.put(GRAVITINO_AZURE_STORAGE_ACCOUNT_NAME, accountName)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.gravitino.oss.fs;

import com.aliyun.oss.common.auth.BasicCredentials;
import com.aliyun.oss.common.auth.Credentials;
import com.aliyun.oss.common.auth.CredentialsProvider;
import com.aliyun.oss.common.auth.DefaultCredentials;
import java.net.URI;
import org.apache.gravitino.catalog.hadoop.fs.FileSystemUtils;
import org.apache.gravitino.catalog.hadoop.fs.GravitinoFileSystemCredentialsProvider;
import org.apache.gravitino.credential.Credential;
import org.apache.gravitino.credential.OSSSecretKeyCredential;
import org.apache.gravitino.credential.OSSTokenCredential;
import org.apache.hadoop.conf.Configuration;

public class OSSCredentialsProvider implements CredentialsProvider {

private GravitinoFileSystemCredentialsProvider gravitinoFileSystemCredentialsProvider;
private Credentials basicCredentials;
private long expirationTime = Long.MAX_VALUE;
private static final double EXPIRATION_TIME_FACTOR = 0.9D;
yuqi1129 marked this conversation as resolved.
Show resolved Hide resolved

public OSSCredentialsProvider(URI uri, Configuration conf) {
this.gravitinoFileSystemCredentialsProvider = FileSystemUtils.getGvfsCredentialProvider(conf);
}

@Override
public void setCredentials(Credentials credentials) {}

@Override
public Credentials getCredentials() {
if (basicCredentials == null || System.currentTimeMillis() >= expirationTime) {
synchronized (this) {
refresh();
}
}

return basicCredentials;
}

private void refresh() {
Credential[] gravitinoCredentials = gravitinoFileSystemCredentialsProvider.getCredentials();
Credential credential = getSuitableCredential(gravitinoCredentials);
if (credential == null) {
throw new RuntimeException("No suitable credential for OSS found...");
}

if (credential instanceof OSSSecretKeyCredential) {
OSSSecretKeyCredential ossSecretKeyCredential = (OSSSecretKeyCredential) credential;
basicCredentials =
new DefaultCredentials(
ossSecretKeyCredential.accessKeyId(), ossSecretKeyCredential.secretAccessKey());
} else if (credential instanceof OSSTokenCredential) {
OSSTokenCredential ossTokenCredential = (OSSTokenCredential) credential;
basicCredentials =
new BasicCredentials(
ossTokenCredential.accessKeyId(),
ossTokenCredential.secretAccessKey(),
ossTokenCredential.securityToken());
}

if (credential.expireTimeInMs() > 0) {
expirationTime =
System.currentTimeMillis()
+ (long)
((credential.expireTimeInMs() - System.currentTimeMillis())
* EXPIRATION_TIME_FACTOR);
}
}

/**
* Get the credential from the credential array. Using dynamic credential first, if not found,
* uses static credential.
*
* @param credentials The credential array.
* @return A credential. Null if not found.
*/
private Credential getSuitableCredential(Credential[] credentials) {
// Use dynamic credential if found.
for (Credential credential : credentials) {
if (credential instanceof OSSTokenCredential) {
return credential;
}
}

// If dynamic credential not found, use the static one
for (Credential credential : credentials) {
if (credential instanceof OSSSecretKeyCredential) {
return credential;
}
}

return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package org.apache.gravitino.oss.fs;

import static org.apache.gravitino.catalog.hadoop.fs.CredentialUtils.enableGravitinoCredentialVending;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import java.io.IOException;
Expand Down Expand Up @@ -60,7 +62,13 @@ public FileSystem getFileSystem(Path path, Map<String, String> config) throws IO
hadoopConfMap.put(OSS_FILESYSTEM_IMPL, AliyunOSSFileSystem.class.getCanonicalName());
}

if (enableGravitinoCredentialVending(config)) {
hadoopConfMap.put(
Constants.CREDENTIALS_PROVIDER_KEY, OSSCredentialsProvider.class.getCanonicalName());
}

hadoopConfMap.forEach(configuration::set);

return AliyunOSSFileSystem.newInstance(path.toUri(), configuration);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.gravitino.s3.fs;

import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.auth.BasicSessionCredentials;
import java.net.URI;
import org.apache.gravitino.catalog.hadoop.fs.FileSystemUtils;
import org.apache.gravitino.catalog.hadoop.fs.GravitinoFileSystemCredentialsProvider;
import org.apache.gravitino.credential.Credential;
import org.apache.gravitino.credential.S3SecretKeyCredential;
import org.apache.gravitino.credential.S3TokenCredential;
import org.apache.hadoop.conf.Configuration;

public class S3CredentialsProvider implements AWSCredentialsProvider {
private GravitinoFileSystemCredentialsProvider gravitinoFileSystemCredentialsProvider;

private AWSCredentials basicSessionCredentials;
private long expirationTime = Long.MAX_VALUE;
private static final double EXPIRATION_TIME_FACTOR = 0.9D;

public S3CredentialsProvider(final URI uri, final Configuration conf) {
this.gravitinoFileSystemCredentialsProvider = FileSystemUtils.getGvfsCredentialProvider(conf);
}

@Override
public AWSCredentials getCredentials() {
// Refresh credentials if they are null or about to expire.
if (basicSessionCredentials == null || System.currentTimeMillis() >= expirationTime) {
synchronized (this) {
refresh();
}
}

return basicSessionCredentials;
}

@Override
public void refresh() {
Credential[] gravitinoCredentials = gravitinoFileSystemCredentialsProvider.getCredentials();
Credential credential = getSuitableCredential(gravitinoCredentials);

if (credential == null) {
throw new RuntimeException("No suitable credential for S3 found...");
}

if (credential instanceof S3SecretKeyCredential) {
S3SecretKeyCredential s3SecretKeyCredential = (S3SecretKeyCredential) credential;
basicSessionCredentials =
new BasicAWSCredentials(
s3SecretKeyCredential.accessKeyId(), s3SecretKeyCredential.secretAccessKey());
} else if (credential instanceof S3TokenCredential) {
S3TokenCredential s3TokenCredential = (S3TokenCredential) credential;
basicSessionCredentials =
new BasicSessionCredentials(
s3TokenCredential.accessKeyId(),
s3TokenCredential.secretAccessKey(),
s3TokenCredential.sessionToken());
}

if (credential.expireTimeInMs() > 0) {
expirationTime =
System.currentTimeMillis()
+ (long)
((credential.expireTimeInMs() - System.currentTimeMillis())
* EXPIRATION_TIME_FACTOR);
}
}

/**
* Get the credential from the credential array. Using dynamic credential first, if not found,
* uses static credential.
*
* @param credentials The credential array.
* @return A credential. Null if not found.
*/
private Credential getSuitableCredential(Credential[] credentials) {
// Use dynamic credential if found.
for (Credential credential : credentials) {
if (credential instanceof S3TokenCredential) {
return credential;
}
}

// If dynamic credential not found, use the static one
for (Credential credential : credentials) {
if (credential instanceof S3SecretKeyCredential) {
return credential;
}
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package org.apache.gravitino.s3.fs;

import static org.apache.gravitino.catalog.hadoop.fs.CredentialUtils.enableGravitinoCredentialVending;

import com.amazonaws.auth.AWSCredentialsProvider;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
Expand All @@ -41,7 +43,7 @@

public class S3FileSystemProvider implements FileSystemProvider {

private static final Logger LOGGER = LoggerFactory.getLogger(S3FileSystemProvider.class);
private static final Logger LOG = LoggerFactory.getLogger(S3FileSystemProvider.class);

@VisibleForTesting
public static final Map<String, String> GRAVITINO_KEY_TO_S3_HADOOP_KEY =
Expand All @@ -61,11 +63,15 @@ public FileSystem getFileSystem(Path path, Map<String, String> config) throws IO
Map<String, String> hadoopConfMap =
FileSystemUtils.toHadoopConfigMap(config, GRAVITINO_KEY_TO_S3_HADOOP_KEY);

hadoopConfMap.forEach(configuration::set);
if (!hadoopConfMap.containsKey(S3_CREDENTIAL_KEY)) {
hadoopConfMap.put(S3_CREDENTIAL_KEY, S3_SIMPLE_CREDENTIAL);
configuration.set(S3_CREDENTIAL_KEY, S3_SIMPLE_CREDENTIAL);
}

hadoopConfMap.forEach(configuration::set);
if (enableGravitinoCredentialVending(config)) {
configuration.set(
Constants.AWS_CREDENTIALS_PROVIDER, S3CredentialsProvider.class.getCanonicalName());
}

// Hadoop-aws 2 does not support IAMInstanceCredentialsProvider
checkAndSetCredentialProvider(configuration);
Expand All @@ -91,12 +97,12 @@ private void checkAndSetCredentialProvider(Configuration configuration) {
if (AWSCredentialsProvider.class.isAssignableFrom(c)) {
validProviders.add(provider);
} else {
LOGGER.warn(
LOG.warn(
"Credential provider {} is not a subclass of AWSCredentialsProvider, skipping",
provider);
}
} catch (Exception e) {
LOGGER.warn(
LOG.warn(
"Credential provider {} not found in the Hadoop runtime, falling back to default",
provider);
configuration.set(S3_CREDENTIAL_KEY, S3_SIMPLE_CREDENTIAL);
Expand Down
1 change: 0 additions & 1 deletion bundles/azure/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ dependencies {
compileOnly(project(":api"))
compileOnly(project(":catalogs:catalog-hadoop"))
compileOnly(project(":core"))

compileOnly(libs.hadoop3.abs)
compileOnly(libs.hadoop3.client.api)
compileOnly(libs.hadoop3.client.runtime)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@

package org.apache.gravitino.abs.fs;

import static org.apache.gravitino.catalog.hadoop.fs.CredentialUtils.enableGravitinoCredentialVending;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ACCOUNT_AUTH_TYPE_PROPERTY_NAME;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ACCOUNT_IS_HNS_ENABLED;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_SAS_TOKEN_PROVIDER_TYPE;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import java.io.IOException;
Expand All @@ -30,6 +35,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.azurebfs.services.AuthType;

public class AzureFileSystemProvider implements FileSystemProvider {

Expand Down Expand Up @@ -58,12 +64,42 @@ public FileSystem getFileSystem(@Nonnull Path path, @Nonnull Map<String, String>
config.get(AzureProperties.GRAVITINO_AZURE_STORAGE_ACCOUNT_KEY));
}

if (!config.containsKey(ABFS_IMPL_KEY)) {
if (!hadoopConfMap.containsKey(ABFS_IMPL_KEY)) {
configuration.set(ABFS_IMPL_KEY, ABFS_IMPL);
}

hadoopConfMap.forEach(configuration::set);

if (enableGravitinoCredentialVending(hadoopConfMap)) {
try {
AzureSasCredentialsProvider azureSasCredentialsProvider = new AzureSasCredentialsProvider();
FANNG1 marked this conversation as resolved.
Show resolved Hide resolved
azureSasCredentialsProvider.initialize(configuration, null);
String sas = azureSasCredentialsProvider.getSASToken(null, null, null, null);
if (sas != null) {
String accountName =
String.format(
"%s.dfs.core.windows.net",
config.get(AzureProperties.GRAVITINO_AZURE_STORAGE_ACCOUNT_NAME));

configuration.set(
FS_AZURE_ACCOUNT_AUTH_TYPE_PROPERTY_NAME + "." + accountName, AuthType.SAS.name());
configuration.set(
FS_AZURE_SAS_TOKEN_PROVIDER_TYPE + "." + accountName,
AzureSasCredentialsProvider.class.getName());
configuration.set(FS_AZURE_ACCOUNT_IS_HNS_ENABLED, "true");
} else if (azureSasCredentialsProvider.getAzureStorageAccountKey() != null
&& azureSasCredentialsProvider.getAzureStorageAccountName() != null) {
configuration.set(
String.format(
"fs.azure.account.key.%s.dfs.core.windows.net",
azureSasCredentialsProvider.getAzureStorageAccountName()),
azureSasCredentialsProvider.getAzureStorageAccountKey());
}
} catch (Exception e) {
throw new IOException("Failed to get SAS token from AzureSasCredentialsProvider", e);
}
}

return FileSystem.get(path.toUri(), configuration);
}

Expand Down
Loading
Loading