diff --git a/bundles/aliyun/src/main/java/org/apache/gravitino/oss/fs/OSSCredentialsProvider.java b/bundles/aliyun/src/main/java/org/apache/gravitino/oss/fs/OSSCredentialsProvider.java new file mode 100644 index 00000000000..ef4afe434a8 --- /dev/null +++ b/bundles/aliyun/src/main/java/org/apache/gravitino/oss/fs/OSSCredentialsProvider.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.oss.fs; + +import com.aliyun.oss.common.auth.BasicCredentials; +import com.aliyun.oss.common.auth.Credentials; +import com.aliyun.oss.common.auth.CredentialsProvider; +import com.aliyun.oss.common.auth.DefaultCredentials; +import java.net.URI; +import org.apache.gravitino.catalog.hadoop.fs.FileSystemUtils; +import org.apache.gravitino.catalog.hadoop.fs.GravitinoFileSystemCredentialsProvider; +import org.apache.gravitino.credential.Credential; +import org.apache.gravitino.credential.OSSSecretKeyCredential; +import org.apache.gravitino.credential.OSSTokenCredential; +import org.apache.hadoop.conf.Configuration; + +public class OSSCredentialsProvider implements CredentialsProvider { + + private GravitinoFileSystemCredentialsProvider gravitinoFileSystemCredentialsProvider; + private Credentials basicCredentials; + private long expirationTime = Long.MAX_VALUE; + private static final double EXPIRATION_TIME_FACTOR = 0.5D; + + public OSSCredentialsProvider(URI uri, Configuration conf) { + this.gravitinoFileSystemCredentialsProvider = FileSystemUtils.getGvfsCredentialProvider(conf); + } + + @Override + public void setCredentials(Credentials credentials) {} + + @Override + public Credentials getCredentials() { + if (basicCredentials == null || System.currentTimeMillis() >= expirationTime) { + synchronized (this) { + refresh(); + } + } + + return basicCredentials; + } + + private void refresh() { + Credential[] gravitinoCredentials = gravitinoFileSystemCredentialsProvider.getCredentials(); + Credential credential = OSSUtils.getSuitableCredential(gravitinoCredentials); + if (credential == null) { + throw new RuntimeException("No suitable credential for OSS found..."); + } + + if (credential instanceof OSSSecretKeyCredential) { + OSSSecretKeyCredential ossSecretKeyCredential = (OSSSecretKeyCredential) credential; + basicCredentials = + new DefaultCredentials( + ossSecretKeyCredential.accessKeyId(), ossSecretKeyCredential.secretAccessKey()); + } else if (credential instanceof OSSTokenCredential) { + OSSTokenCredential ossTokenCredential = (OSSTokenCredential) credential; + basicCredentials = + new BasicCredentials( + ossTokenCredential.accessKeyId(), + ossTokenCredential.secretAccessKey(), + ossTokenCredential.securityToken()); + } + + if (credential.expireTimeInMs() > 0) { + expirationTime = + System.currentTimeMillis() + + (long) + ((credential.expireTimeInMs() - System.currentTimeMillis()) + * EXPIRATION_TIME_FACTOR); + } + } +} diff --git a/bundles/aliyun/src/main/java/org/apache/gravitino/oss/fs/OSSFileSystemProvider.java b/bundles/aliyun/src/main/java/org/apache/gravitino/oss/fs/OSSFileSystemProvider.java index b47d25335cd..358e3a08c76 100644 --- a/bundles/aliyun/src/main/java/org/apache/gravitino/oss/fs/OSSFileSystemProvider.java +++ b/bundles/aliyun/src/main/java/org/apache/gravitino/oss/fs/OSSFileSystemProvider.java @@ -20,10 +20,15 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Maps; import java.io.IOException; import java.util.Map; import org.apache.gravitino.catalog.hadoop.fs.FileSystemProvider; import org.apache.gravitino.catalog.hadoop.fs.FileSystemUtils; +import org.apache.gravitino.catalog.hadoop.fs.SupportsCredentialVending; +import org.apache.gravitino.credential.Credential; +import org.apache.gravitino.credential.OSSSecretKeyCredential; +import org.apache.gravitino.credential.OSSTokenCredential; import org.apache.gravitino.storage.OSSProperties; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -31,7 +36,7 @@ import org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem; import org.apache.hadoop.fs.aliyun.oss.Constants; -public class OSSFileSystemProvider implements FileSystemProvider { +public class OSSFileSystemProvider implements FileSystemProvider, SupportsCredentialVending { private static final String OSS_FILESYSTEM_IMPL = "fs.oss.impl"; @@ -61,9 +66,22 @@ public FileSystem getFileSystem(Path path, Map config) throws IO } hadoopConfMap.forEach(configuration::set); + return AliyunOSSFileSystem.newInstance(path.toUri(), configuration); } + @Override + public Map getFileSystemCredentialConf(Credential[] credentials) { + Credential credential = OSSUtils.getSuitableCredential(credentials); + Map result = Maps.newHashMap(); + if (credential instanceof OSSSecretKeyCredential || credential instanceof OSSTokenCredential) { + result.put( + Constants.CREDENTIALS_PROVIDER_KEY, OSSCredentialsProvider.class.getCanonicalName()); + } + + return result; + } + @Override public String scheme() { return "oss"; diff --git a/bundles/aliyun/src/main/java/org/apache/gravitino/oss/fs/OSSUtils.java b/bundles/aliyun/src/main/java/org/apache/gravitino/oss/fs/OSSUtils.java new file mode 100644 index 00000000000..87c71de377b --- /dev/null +++ b/bundles/aliyun/src/main/java/org/apache/gravitino/oss/fs/OSSUtils.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.oss.fs; + +import org.apache.gravitino.credential.Credential; +import org.apache.gravitino.credential.OSSSecretKeyCredential; +import org.apache.gravitino.credential.OSSTokenCredential; + +public class OSSUtils { + + /** + * Get the credential from the credential array. Using dynamic credential first, if not found, + * uses static credential. + * + * @param credentials The credential array. + * @return A credential. Null if not found. + */ + static Credential getSuitableCredential(Credential[] credentials) { + // Use dynamic credential if found. + for (Credential credential : credentials) { + if (credential instanceof OSSTokenCredential) { + return credential; + } + } + + // If dynamic credential not found, use the static one + for (Credential credential : credentials) { + if (credential instanceof OSSSecretKeyCredential) { + return credential; + } + } + + return null; + } +} diff --git a/bundles/aws/src/main/java/org/apache/gravitino/s3/fs/S3CredentialsProvider.java b/bundles/aws/src/main/java/org/apache/gravitino/s3/fs/S3CredentialsProvider.java new file mode 100644 index 00000000000..2fc14588959 --- /dev/null +++ b/bundles/aws/src/main/java/org/apache/gravitino/s3/fs/S3CredentialsProvider.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.s3.fs; + +import com.amazonaws.auth.AWSCredentials; +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.auth.BasicSessionCredentials; +import java.net.URI; +import org.apache.gravitino.catalog.hadoop.fs.FileSystemUtils; +import org.apache.gravitino.catalog.hadoop.fs.GravitinoFileSystemCredentialsProvider; +import org.apache.gravitino.credential.Credential; +import org.apache.gravitino.credential.S3SecretKeyCredential; +import org.apache.gravitino.credential.S3TokenCredential; +import org.apache.hadoop.conf.Configuration; + +public class S3CredentialsProvider implements AWSCredentialsProvider { + private GravitinoFileSystemCredentialsProvider gravitinoFileSystemCredentialsProvider; + + private AWSCredentials basicSessionCredentials; + private long expirationTime = Long.MAX_VALUE; + private static final double EXPIRATION_TIME_FACTOR = 0.5D; + + public S3CredentialsProvider(final URI uri, final Configuration conf) { + this.gravitinoFileSystemCredentialsProvider = FileSystemUtils.getGvfsCredentialProvider(conf); + } + + @Override + public AWSCredentials getCredentials() { + // Refresh credentials if they are null or about to expire. + if (basicSessionCredentials == null || System.currentTimeMillis() >= expirationTime) { + synchronized (this) { + refresh(); + } + } + + return basicSessionCredentials; + } + + @Override + public void refresh() { + Credential[] gravitinoCredentials = gravitinoFileSystemCredentialsProvider.getCredentials(); + Credential credential = S3Utils.getSuitableCredential(gravitinoCredentials); + + if (credential == null) { + throw new RuntimeException("No suitable credential for S3 found..."); + } + + if (credential instanceof S3SecretKeyCredential) { + S3SecretKeyCredential s3SecretKeyCredential = (S3SecretKeyCredential) credential; + basicSessionCredentials = + new BasicAWSCredentials( + s3SecretKeyCredential.accessKeyId(), s3SecretKeyCredential.secretAccessKey()); + } else if (credential instanceof S3TokenCredential) { + S3TokenCredential s3TokenCredential = (S3TokenCredential) credential; + basicSessionCredentials = + new BasicSessionCredentials( + s3TokenCredential.accessKeyId(), + s3TokenCredential.secretAccessKey(), + s3TokenCredential.sessionToken()); + } + + if (credential.expireTimeInMs() > 0) { + expirationTime = + System.currentTimeMillis() + + (long) + ((credential.expireTimeInMs() - System.currentTimeMillis()) + * EXPIRATION_TIME_FACTOR); + } + } +} diff --git a/bundles/aws/src/main/java/org/apache/gravitino/s3/fs/S3FileSystemProvider.java b/bundles/aws/src/main/java/org/apache/gravitino/s3/fs/S3FileSystemProvider.java index b7cd569bbf6..cbe133ed778 100644 --- a/bundles/aws/src/main/java/org/apache/gravitino/s3/fs/S3FileSystemProvider.java +++ b/bundles/aws/src/main/java/org/apache/gravitino/s3/fs/S3FileSystemProvider.java @@ -25,11 +25,16 @@ import com.google.common.base.Splitter; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import java.io.IOException; import java.util.List; import java.util.Map; import org.apache.gravitino.catalog.hadoop.fs.FileSystemProvider; import org.apache.gravitino.catalog.hadoop.fs.FileSystemUtils; +import org.apache.gravitino.catalog.hadoop.fs.SupportsCredentialVending; +import org.apache.gravitino.credential.Credential; +import org.apache.gravitino.credential.S3SecretKeyCredential; +import org.apache.gravitino.credential.S3TokenCredential; import org.apache.gravitino.storage.S3Properties; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -39,9 +44,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class S3FileSystemProvider implements FileSystemProvider { +public class S3FileSystemProvider implements FileSystemProvider, SupportsCredentialVending { - private static final Logger LOGGER = LoggerFactory.getLogger(S3FileSystemProvider.class); + private static final Logger LOG = LoggerFactory.getLogger(S3FileSystemProvider.class); @VisibleForTesting public static final Map GRAVITINO_KEY_TO_S3_HADOOP_KEY = @@ -61,18 +66,29 @@ public FileSystem getFileSystem(Path path, Map config) throws IO Map hadoopConfMap = FileSystemUtils.toHadoopConfigMap(config, GRAVITINO_KEY_TO_S3_HADOOP_KEY); + hadoopConfMap.forEach(configuration::set); if (!hadoopConfMap.containsKey(S3_CREDENTIAL_KEY)) { - hadoopConfMap.put(S3_CREDENTIAL_KEY, S3_SIMPLE_CREDENTIAL); + configuration.set(S3_CREDENTIAL_KEY, S3_SIMPLE_CREDENTIAL); } - hadoopConfMap.forEach(configuration::set); - // Hadoop-aws 2 does not support IAMInstanceCredentialsProvider checkAndSetCredentialProvider(configuration); return S3AFileSystem.newInstance(path.toUri(), configuration); } + @Override + public Map getFileSystemCredentialConf(Credential[] credentials) { + Credential credential = S3Utils.getSuitableCredential(credentials); + Map result = Maps.newHashMap(); + if (credential instanceof S3SecretKeyCredential || credential instanceof S3TokenCredential) { + result.put( + Constants.AWS_CREDENTIALS_PROVIDER, S3CredentialsProvider.class.getCanonicalName()); + } + + return result; + } + private void checkAndSetCredentialProvider(Configuration configuration) { String provides = configuration.get(S3_CREDENTIAL_KEY); if (provides == null) { @@ -91,12 +107,12 @@ private void checkAndSetCredentialProvider(Configuration configuration) { if (AWSCredentialsProvider.class.isAssignableFrom(c)) { validProviders.add(provider); } else { - LOGGER.warn( + LOG.warn( "Credential provider {} is not a subclass of AWSCredentialsProvider, skipping", provider); } } catch (Exception e) { - LOGGER.warn( + LOG.warn( "Credential provider {} not found in the Hadoop runtime, falling back to default", provider); configuration.set(S3_CREDENTIAL_KEY, S3_SIMPLE_CREDENTIAL); diff --git a/bundles/aws/src/main/java/org/apache/gravitino/s3/fs/S3Utils.java b/bundles/aws/src/main/java/org/apache/gravitino/s3/fs/S3Utils.java new file mode 100644 index 00000000000..078a1180ba4 --- /dev/null +++ b/bundles/aws/src/main/java/org/apache/gravitino/s3/fs/S3Utils.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.s3.fs; + +import org.apache.gravitino.credential.Credential; +import org.apache.gravitino.credential.S3SecretKeyCredential; +import org.apache.gravitino.credential.S3TokenCredential; + +public class S3Utils { + + /** + * Get the credential from the credential array. Using dynamic credential first, if not found, + * uses static credential. + * + * @param credentials The credential array. + * @return A credential. Null if not found. + */ + static Credential getSuitableCredential(Credential[] credentials) { + // Use dynamic credential if found. + for (Credential credential : credentials) { + if (credential instanceof S3TokenCredential) { + return credential; + } + } + + // If dynamic credential not found, use the static one + for (Credential credential : credentials) { + if (credential instanceof S3SecretKeyCredential) { + return credential; + } + } + return null; + } +} diff --git a/bundles/azure/build.gradle.kts b/bundles/azure/build.gradle.kts index 8dbd6ed489e..1cbe4856af5 100644 --- a/bundles/azure/build.gradle.kts +++ b/bundles/azure/build.gradle.kts @@ -28,7 +28,6 @@ dependencies { compileOnly(project(":api")) compileOnly(project(":catalogs:catalog-hadoop")) compileOnly(project(":core")) - compileOnly(libs.hadoop3.abs) compileOnly(libs.hadoop3.client.api) compileOnly(libs.hadoop3.client.runtime) diff --git a/bundles/azure/src/main/java/org/apache/gravitino/abs/fs/AzureFileSystemProvider.java b/bundles/azure/src/main/java/org/apache/gravitino/abs/fs/AzureFileSystemProvider.java index f8924044176..3dcbb502f62 100644 --- a/bundles/azure/src/main/java/org/apache/gravitino/abs/fs/AzureFileSystemProvider.java +++ b/bundles/azure/src/main/java/org/apache/gravitino/abs/fs/AzureFileSystemProvider.java @@ -19,19 +19,29 @@ package org.apache.gravitino.abs.fs; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ACCOUNT_AUTH_TYPE_PROPERTY_NAME; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ACCOUNT_IS_HNS_ENABLED; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_SAS_TOKEN_PROVIDER_TYPE; + import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Maps; import java.io.IOException; import java.util.Map; import javax.annotation.Nonnull; import org.apache.gravitino.catalog.hadoop.fs.FileSystemProvider; import org.apache.gravitino.catalog.hadoop.fs.FileSystemUtils; +import org.apache.gravitino.catalog.hadoop.fs.SupportsCredentialVending; +import org.apache.gravitino.credential.ADLSTokenCredential; +import org.apache.gravitino.credential.AzureAccountKeyCredential; +import org.apache.gravitino.credential.Credential; import org.apache.gravitino.storage.AzureProperties; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.azurebfs.services.AuthType; -public class AzureFileSystemProvider implements FileSystemProvider { +public class AzureFileSystemProvider implements FileSystemProvider, SupportsCredentialVending { @VisibleForTesting public static final String ABS_PROVIDER_SCHEME = "abfss"; @@ -58,13 +68,39 @@ public FileSystem getFileSystem(@Nonnull Path path, @Nonnull Map config.get(AzureProperties.GRAVITINO_AZURE_STORAGE_ACCOUNT_KEY)); } - if (!config.containsKey(ABFS_IMPL_KEY)) { + if (!hadoopConfMap.containsKey(ABFS_IMPL_KEY)) { configuration.set(ABFS_IMPL_KEY, ABFS_IMPL); } hadoopConfMap.forEach(configuration::set); - return FileSystem.get(path.toUri(), configuration); + return FileSystem.newInstance(path.toUri(), configuration); + } + + @Override + public Map getFileSystemCredentialConf(Credential[] credentials) { + Credential credential = AzureStorageUtils.getSuitableCredential(credentials); + Map result = Maps.newHashMap(); + if (credential instanceof ADLSTokenCredential) { + ADLSTokenCredential adlsTokenCredential = (ADLSTokenCredential) credential; + + String accountName = + String.format("%s.dfs.core.windows.net", adlsTokenCredential.accountName()); + result.put(FS_AZURE_ACCOUNT_AUTH_TYPE_PROPERTY_NAME + "." + accountName, AuthType.SAS.name()); + result.put( + FS_AZURE_SAS_TOKEN_PROVIDER_TYPE + "." + accountName, + AzureSasCredentialsProvider.class.getName()); + result.put(FS_AZURE_ACCOUNT_IS_HNS_ENABLED, "true"); + } else if (credential instanceof AzureAccountKeyCredential) { + AzureAccountKeyCredential azureAccountKeyCredential = (AzureAccountKeyCredential) credential; + result.put( + String.format( + "fs.azure.account.key.%s.dfs.core.windows.net", + azureAccountKeyCredential.accountName()), + azureAccountKeyCredential.accountKey()); + } + + return result; } @Override diff --git a/bundles/azure/src/main/java/org/apache/gravitino/abs/fs/AzureSasCredentialsProvider.java b/bundles/azure/src/main/java/org/apache/gravitino/abs/fs/AzureSasCredentialsProvider.java new file mode 100644 index 00000000000..85793d3d973 --- /dev/null +++ b/bundles/azure/src/main/java/org/apache/gravitino/abs/fs/AzureSasCredentialsProvider.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.abs.fs; + +import java.io.IOException; +import org.apache.gravitino.catalog.hadoop.fs.FileSystemUtils; +import org.apache.gravitino.catalog.hadoop.fs.GravitinoFileSystemCredentialsProvider; +import org.apache.gravitino.credential.ADLSTokenCredential; +import org.apache.gravitino.credential.Credential; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.azurebfs.extensions.SASTokenProvider; + +public class AzureSasCredentialsProvider implements SASTokenProvider, Configurable { + + private Configuration configuration; + private String sasToken; + + private GravitinoFileSystemCredentialsProvider gravitinoFileSystemCredentialsProvider; + private long expirationTime = Long.MAX_VALUE; + private static final double EXPIRATION_TIME_FACTOR = 0.5D; + + @Override + public void setConf(Configuration configuration) { + this.configuration = configuration; + } + + @Override + public Configuration getConf() { + return configuration; + } + + @Override + public void initialize(Configuration conf, String accountName) throws IOException { + this.configuration = conf; + this.gravitinoFileSystemCredentialsProvider = FileSystemUtils.getGvfsCredentialProvider(conf); + } + + @Override + public String getSASToken(String account, String fileSystem, String path, String operation) { + // Refresh credentials if they are null or about to expire. + if (sasToken == null || System.currentTimeMillis() >= expirationTime) { + synchronized (this) { + refresh(); + } + } + return sasToken; + } + + private void refresh() { + Credential[] gravitinoCredentials = gravitinoFileSystemCredentialsProvider.getCredentials(); + Credential credential = AzureStorageUtils.getADLSTokenCredential(gravitinoCredentials); + if (credential == null) { + throw new RuntimeException("No token credential for OSS found..."); + } + + if (credential instanceof ADLSTokenCredential) { + ADLSTokenCredential adlsTokenCredential = (ADLSTokenCredential) credential; + sasToken = adlsTokenCredential.sasToken(); + + if (credential.expireTimeInMs() > 0) { + expirationTime = + System.currentTimeMillis() + + (long) + ((credential.expireTimeInMs() - System.currentTimeMillis()) + * EXPIRATION_TIME_FACTOR); + } + } + } +} diff --git a/bundles/azure/src/main/java/org/apache/gravitino/abs/fs/AzureStorageUtils.java b/bundles/azure/src/main/java/org/apache/gravitino/abs/fs/AzureStorageUtils.java new file mode 100644 index 00000000000..873f61930ee --- /dev/null +++ b/bundles/azure/src/main/java/org/apache/gravitino/abs/fs/AzureStorageUtils.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.abs.fs; + +import org.apache.gravitino.credential.ADLSTokenCredential; +import org.apache.gravitino.credential.AzureAccountKeyCredential; +import org.apache.gravitino.credential.Credential; + +public class AzureStorageUtils { + + /** + * Get the ADLS credential from the credential array. Use the account and secret if dynamic token + * is not found, null if both are not found. + * + * @param credentials The credential array. + * @return A credential. Null if not found. + */ + static Credential getSuitableCredential(Credential[] credentials) { + for (Credential credential : credentials) { + if (credential instanceof ADLSTokenCredential) { + return credential; + } + } + + for (Credential credential : credentials) { + if (credential instanceof AzureAccountKeyCredential) { + return credential; + } + } + + return null; + } + + /** + * Get the ADLS token credential from the credential array. Null if not found. + * + * @param credentials The credential array. + * @return A credential. Null if not found. + */ + static Credential getADLSTokenCredential(Credential[] credentials) { + for (Credential credential : credentials) { + if (credential instanceof ADLSTokenCredential) { + return credential; + } + } + + return null; + } +} diff --git a/bundles/gcp/build.gradle.kts b/bundles/gcp/build.gradle.kts index 95907f8a3bd..7d46fde9e98 100644 --- a/bundles/gcp/build.gradle.kts +++ b/bundles/gcp/build.gradle.kts @@ -32,6 +32,7 @@ dependencies { compileOnly(libs.hadoop3.client.api) compileOnly(libs.hadoop3.client.runtime) + compileOnly(libs.hadoop3.gcs) implementation(project(":catalogs:catalog-common")) { exclude("*") diff --git a/bundles/gcp/src/main/java/org/apache/gravitino/gcs/fs/GCSCredentialsProvider.java b/bundles/gcp/src/main/java/org/apache/gravitino/gcs/fs/GCSCredentialsProvider.java new file mode 100644 index 00000000000..c4eefeeebe0 --- /dev/null +++ b/bundles/gcp/src/main/java/org/apache/gravitino/gcs/fs/GCSCredentialsProvider.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.gcs.fs; + +import com.google.cloud.hadoop.util.AccessTokenProvider; +import java.io.IOException; +import org.apache.gravitino.catalog.hadoop.fs.FileSystemUtils; +import org.apache.gravitino.catalog.hadoop.fs.GravitinoFileSystemCredentialsProvider; +import org.apache.gravitino.credential.Credential; +import org.apache.gravitino.credential.GCSTokenCredential; +import org.apache.hadoop.conf.Configuration; + +public class GCSCredentialsProvider implements AccessTokenProvider { + private Configuration configuration; + private GravitinoFileSystemCredentialsProvider gravitinoFileSystemCredentialsProvider; + + private AccessToken accessToken; + private long expirationTime = Long.MAX_VALUE; + private static final double EXPIRATION_TIME_FACTOR = 0.5D; + + @Override + public AccessToken getAccessToken() { + if (accessToken == null || System.currentTimeMillis() >= expirationTime) { + try { + refresh(); + } catch (IOException e) { + throw new RuntimeException("Failed to refresh access token", e); + } + } + return accessToken; + } + + @Override + public void refresh() throws IOException { + Credential[] gravitinoCredentials = gravitinoFileSystemCredentialsProvider.getCredentials(); + + Credential credential = GCSUtils.getGCSTokenCredential(gravitinoCredentials); + if (credential == null) { + throw new RuntimeException("No suitable credential for OSS found..."); + } + + if (credential instanceof GCSTokenCredential) { + GCSTokenCredential gcsTokenCredential = (GCSTokenCredential) credential; + accessToken = new AccessToken(gcsTokenCredential.token(), credential.expireTimeInMs()); + + if (credential.expireTimeInMs() > 0) { + expirationTime = + System.currentTimeMillis() + + (long) + ((credential.expireTimeInMs() - System.currentTimeMillis()) + * EXPIRATION_TIME_FACTOR); + } + } + } + + @Override + public void setConf(Configuration configuration) { + this.configuration = configuration; + this.gravitinoFileSystemCredentialsProvider = + FileSystemUtils.getGvfsCredentialProvider(configuration); + } + + @Override + public Configuration getConf() { + return configuration; + } +} diff --git a/bundles/gcp/src/main/java/org/apache/gravitino/gcs/fs/GCSFileSystemProvider.java b/bundles/gcp/src/main/java/org/apache/gravitino/gcs/fs/GCSFileSystemProvider.java index b79b58ef48d..7ab38b2d7a9 100644 --- a/bundles/gcp/src/main/java/org/apache/gravitino/gcs/fs/GCSFileSystemProvider.java +++ b/bundles/gcp/src/main/java/org/apache/gravitino/gcs/fs/GCSFileSystemProvider.java @@ -20,18 +20,23 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Maps; import java.io.IOException; import java.util.Map; import org.apache.gravitino.catalog.hadoop.fs.FileSystemProvider; import org.apache.gravitino.catalog.hadoop.fs.FileSystemUtils; +import org.apache.gravitino.catalog.hadoop.fs.SupportsCredentialVending; +import org.apache.gravitino.credential.Credential; +import org.apache.gravitino.credential.GCSTokenCredential; import org.apache.gravitino.storage.GCSProperties; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -public class GCSFileSystemProvider implements FileSystemProvider { +public class GCSFileSystemProvider implements FileSystemProvider, SupportsCredentialVending { private static final String GCS_SERVICE_ACCOUNT_JSON_FILE = "fs.gs.auth.service.account.json.keyfile"; + private static final String GCS_TOKEN_PROVIDER_IMPL = "fs.gs.auth.access.token.provider.impl"; @VisibleForTesting public static final Map GRAVITINO_KEY_TO_GCS_HADOOP_KEY = @@ -43,9 +48,21 @@ public FileSystem getFileSystem(Path path, Map config) throws IO Configuration configuration = new Configuration(); FileSystemUtils.toHadoopConfigMap(config, GRAVITINO_KEY_TO_GCS_HADOOP_KEY) .forEach(configuration::set); + return FileSystem.newInstance(path.toUri(), configuration); } + @Override + public Map getFileSystemCredentialConf(Credential[] credentials) { + Credential credential = GCSUtils.getGCSTokenCredential(credentials); + Map result = Maps.newHashMap(); + if (credential instanceof GCSTokenCredential) { + result.put(GCS_TOKEN_PROVIDER_IMPL, GCSCredentialsProvider.class.getName()); + } + + return result; + } + @Override public String scheme() { return "gs"; diff --git a/bundles/gcp/src/main/java/org/apache/gravitino/gcs/fs/GCSUtils.java b/bundles/gcp/src/main/java/org/apache/gravitino/gcs/fs/GCSUtils.java new file mode 100644 index 00000000000..f8fbfd6351b --- /dev/null +++ b/bundles/gcp/src/main/java/org/apache/gravitino/gcs/fs/GCSUtils.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.gcs.fs; + +import org.apache.gravitino.credential.Credential; +import org.apache.gravitino.credential.GCSTokenCredential; + +public class GCSUtils { + /** + * Get the credential from the credential array. If the dynamic credential is not found, return + * null. + * + * @param credentials The credential array. + * @return An credential. + */ + static Credential getGCSTokenCredential(Credential[] credentials) { + for (Credential credential : credentials) { + if (credential instanceof GCSTokenCredential) { + return credential; + } + } + + return null; + } +} diff --git a/catalogs/hadoop-common/build.gradle.kts b/catalogs/hadoop-common/build.gradle.kts index 566ce5986e3..09fd9f80170 100644 --- a/catalogs/hadoop-common/build.gradle.kts +++ b/catalogs/hadoop-common/build.gradle.kts @@ -23,6 +23,7 @@ plugins { // try to avoid adding extra dependencies because it is used by catalogs and connectors. dependencies { + implementation(project(":api")) implementation(project(":catalogs:catalog-common")) implementation(libs.commons.lang3) implementation(libs.hadoop3.client.api) diff --git a/catalogs/hadoop-common/src/main/java/org/apache/gravitino/catalog/hadoop/fs/FileSystemUtils.java b/catalogs/hadoop-common/src/main/java/org/apache/gravitino/catalog/hadoop/fs/FileSystemUtils.java index a1434e85c3e..11ecd1ee9c3 100644 --- a/catalogs/hadoop-common/src/main/java/org/apache/gravitino/catalog/hadoop/fs/FileSystemUtils.java +++ b/catalogs/hadoop-common/src/main/java/org/apache/gravitino/catalog/hadoop/fs/FileSystemUtils.java @@ -31,6 +31,7 @@ import java.util.ServiceLoader; import java.util.Set; import java.util.stream.Collectors; +import org.apache.hadoop.conf.Configuration; public class FileSystemUtils { @@ -160,4 +161,26 @@ public static Map toHadoopConfigMap( return result; } + + /** + * Get the GravitinoFileSystemCredentialProvider from the configuration. + * + * @param conf Configuration + * @return GravitinoFileSystemCredentialProvider + */ + public static GravitinoFileSystemCredentialsProvider getGvfsCredentialProvider( + Configuration conf) { + try { + GravitinoFileSystemCredentialsProvider gravitinoFileSystemCredentialsProvider = + (GravitinoFileSystemCredentialsProvider) + Class.forName( + conf.get(GravitinoFileSystemCredentialsProvider.GVFS_CREDENTIAL_PROVIDER)) + .getDeclaredConstructor() + .newInstance(); + gravitinoFileSystemCredentialsProvider.setConf(conf); + return gravitinoFileSystemCredentialsProvider; + } catch (Exception e) { + throw new RuntimeException("Failed to create GravitinoFileSystemCredentialProvider", e); + } + } } diff --git a/catalogs/hadoop-common/src/main/java/org/apache/gravitino/catalog/hadoop/fs/GravitinoFileSystemCredentialsProvider.java b/catalogs/hadoop-common/src/main/java/org/apache/gravitino/catalog/hadoop/fs/GravitinoFileSystemCredentialsProvider.java new file mode 100644 index 00000000000..40c0492c7f2 --- /dev/null +++ b/catalogs/hadoop-common/src/main/java/org/apache/gravitino/catalog/hadoop/fs/GravitinoFileSystemCredentialsProvider.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.catalog.hadoop.fs; + +import org.apache.gravitino.credential.Credential; +import org.apache.hadoop.conf.Configurable; + +/** Interface for providing credentials for Gravitino Virtual File System. */ +public interface GravitinoFileSystemCredentialsProvider extends Configurable { + + String GVFS_CREDENTIAL_PROVIDER = "fs.gvfs.credential.provider"; + + String GVFS_NAME_IDENTIFIER = "fs.gvfs.name.identifier"; + + /** + * Get credentials for Gravitino Virtual File System. + * + * @return credentials for Gravitino Virtual File System + */ + Credential[] getCredentials(); +} diff --git a/catalogs/hadoop-common/src/main/java/org/apache/gravitino/catalog/hadoop/fs/SupportsCredentialVending.java b/catalogs/hadoop-common/src/main/java/org/apache/gravitino/catalog/hadoop/fs/SupportsCredentialVending.java new file mode 100644 index 00000000000..a9c0b688d0c --- /dev/null +++ b/catalogs/hadoop-common/src/main/java/org/apache/gravitino/catalog/hadoop/fs/SupportsCredentialVending.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.catalog.hadoop.fs; + +import com.google.common.collect.ImmutableMap; +import java.util.Map; +import org.apache.gravitino.credential.Credential; + +/** Interface for file systems that support credential vending. */ +public interface SupportsCredentialVending { + /** + * Get the configuration needed for the file system credential based on the credentials. + * + * @param credentials the credentials to be used for the file system + * @return the configuration for the file system credential + */ + default Map getFileSystemCredentialConf(Credential[] credentials) { + return ImmutableMap.of(); + } +} diff --git a/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/DefaultGravitinoFileSystemCredentialsProvider.java b/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/DefaultGravitinoFileSystemCredentialsProvider.java new file mode 100644 index 00000000000..2f3278f8744 --- /dev/null +++ b/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/DefaultGravitinoFileSystemCredentialsProvider.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.filesystem.hadoop; + +import org.apache.gravitino.NameIdentifier; +import org.apache.gravitino.catalog.hadoop.fs.GravitinoFileSystemCredentialsProvider; +import org.apache.gravitino.client.GravitinoClient; +import org.apache.gravitino.credential.Credential; +import org.apache.gravitino.file.Fileset; +import org.apache.gravitino.file.FilesetCatalog; +import org.apache.hadoop.conf.Configuration; + +/** + * Default implementation of {@link GravitinoFileSystemCredentialsProvider} which provides + * credentials for Gravitino Virtual File System. + */ +public class DefaultGravitinoFileSystemCredentialsProvider + implements GravitinoFileSystemCredentialsProvider { + + private Configuration configuration; + + @Override + public void setConf(Configuration configuration) { + this.configuration = configuration; + } + + @Override + public Configuration getConf() { + return configuration; + } + + @Override + public Credential[] getCredentials() { + // The format of name identifier is `metalake.catalog.schema.fileset` + String nameIdentifier = configuration.get(GVFS_NAME_IDENTIFIER); + String[] idents = nameIdentifier.split("\\."); + try (GravitinoClient client = GravitinoVirtualFileSystemUtils.createClient(configuration)) { + FilesetCatalog filesetCatalog = client.loadCatalog(idents[1]).asFilesetCatalog(); + Fileset fileset = filesetCatalog.loadFileset(NameIdentifier.of(idents[2], idents[3])); + return fileset.supportsCredentials().getCredentials(); + } + } +} diff --git a/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/GravitinoVirtualFileSystem.java b/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/GravitinoVirtualFileSystem.java index a9c40e55840..26d248736a9 100644 --- a/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/GravitinoVirtualFileSystem.java +++ b/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/GravitinoVirtualFileSystem.java @@ -23,35 +23,44 @@ import com.github.benmanes.caffeine.cache.Scheduler; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.common.collect.Sets; import com.google.common.collect.Streams; import com.google.common.util.concurrent.ThreadFactoryBuilder; -import java.io.File; import java.io.IOException; import java.net.URI; import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.ServiceLoader; +import java.util.Set; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.regex.Matcher; import java.util.regex.Pattern; +import java.util.stream.Collectors; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.reflect.FieldUtils; +import org.apache.gravitino.Catalog; import org.apache.gravitino.NameIdentifier; import org.apache.gravitino.audit.CallerContext; import org.apache.gravitino.audit.FilesetAuditConstants; import org.apache.gravitino.audit.FilesetDataOperation; import org.apache.gravitino.audit.InternalClientType; import org.apache.gravitino.catalog.hadoop.fs.FileSystemProvider; -import org.apache.gravitino.client.DefaultOAuth2TokenProvider; +import org.apache.gravitino.catalog.hadoop.fs.GravitinoFileSystemCredentialsProvider; +import org.apache.gravitino.catalog.hadoop.fs.SupportsCredentialVending; import org.apache.gravitino.client.GravitinoClient; -import org.apache.gravitino.client.KerberosTokenProvider; +import org.apache.gravitino.credential.Credential; import org.apache.gravitino.exceptions.GravitinoRuntimeException; +import org.apache.gravitino.file.Fileset; import org.apache.gravitino.file.FilesetCatalog; +import org.apache.gravitino.storage.AzureProperties; +import org.apache.gravitino.storage.OSSProperties; +import org.apache.gravitino.storage.S3Properties; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; @@ -79,7 +88,9 @@ public class GravitinoVirtualFileSystem extends FileSystem { private String metalakeName; private Cache catalogCache; private ScheduledThreadPoolExecutor catalogCleanScheduler; - private Cache internalFileSystemCache; + // Fileset name identifier and its corresponding FileSystem cache, the name identifier has + // four levels, the first level is metalake name. + private Cache internalFileSystemCache; private ScheduledThreadPoolExecutor internalFileSystemCleanScheduler; // The pattern is used to match gvfs path. The scheme prefix (gvfs://fileset) is optional. @@ -91,6 +102,14 @@ public class GravitinoVirtualFileSystem extends FileSystem { private static final String SLASH = "/"; private final Map fileSystemProvidersMap = Maps.newHashMap(); + private static final Set CATALOG_NECESSARY_PROPERTIES_TO_KEEP = + Sets.newHashSet( + OSSProperties.GRAVITINO_OSS_ENDPOINT, + OSSProperties.GRAVITINO_OSS_REGION, + S3Properties.GRAVITINO_S3_ENDPOINT, + S3Properties.GRAVITINO_S3_REGION, + AzureProperties.GRAVITINO_AZURE_STORAGE_ACCOUNT_NAME); + @Override public void initialize(URI name, Configuration configuration) throws IOException { if (!name.toString().startsWith(GravitinoVirtualFileSystemConfiguration.GVFS_FILESET_PREFIX)) { @@ -132,8 +151,7 @@ public void initialize(URI name, Configuration configuration) throws IOException "'%s' is not set in the configuration", GravitinoVirtualFileSystemConfiguration.FS_GRAVITINO_CLIENT_METALAKE_KEY); - initializeClient(configuration); - + this.client = GravitinoVirtualFileSystemUtils.createClient(configuration); // Register the default local and HDFS FileSystemProvider fileSystemProvidersMap.putAll(getFileSystemProviders()); @@ -145,7 +163,7 @@ public void initialize(URI name, Configuration configuration) throws IOException } @VisibleForTesting - Cache internalFileSystemCache() { + Cache internalFileSystemCache() { return internalFileSystemCache; } @@ -193,116 +211,6 @@ private ThreadFactory newDaemonThreadFactory(String name) { return new ThreadFactoryBuilder().setDaemon(true).setNameFormat(name + "-%d").build(); } - private void initializeClient(Configuration configuration) { - // initialize the Gravitino client - String serverUri = - configuration.get(GravitinoVirtualFileSystemConfiguration.FS_GRAVITINO_SERVER_URI_KEY); - Preconditions.checkArgument( - StringUtils.isNotBlank(serverUri), - "'%s' is not set in the configuration", - GravitinoVirtualFileSystemConfiguration.FS_GRAVITINO_SERVER_URI_KEY); - - String authType = - configuration.get( - GravitinoVirtualFileSystemConfiguration.FS_GRAVITINO_CLIENT_AUTH_TYPE_KEY, - GravitinoVirtualFileSystemConfiguration.SIMPLE_AUTH_TYPE); - if (authType.equalsIgnoreCase(GravitinoVirtualFileSystemConfiguration.SIMPLE_AUTH_TYPE)) { - this.client = - GravitinoClient.builder(serverUri).withMetalake(metalakeName).withSimpleAuth().build(); - } else if (authType.equalsIgnoreCase( - GravitinoVirtualFileSystemConfiguration.OAUTH2_AUTH_TYPE)) { - String authServerUri = - configuration.get( - GravitinoVirtualFileSystemConfiguration.FS_GRAVITINO_CLIENT_OAUTH2_SERVER_URI_KEY); - checkAuthConfig( - GravitinoVirtualFileSystemConfiguration.OAUTH2_AUTH_TYPE, - GravitinoVirtualFileSystemConfiguration.FS_GRAVITINO_CLIENT_OAUTH2_SERVER_URI_KEY, - authServerUri); - - String credential = - configuration.get( - GravitinoVirtualFileSystemConfiguration.FS_GRAVITINO_CLIENT_OAUTH2_CREDENTIAL_KEY); - checkAuthConfig( - GravitinoVirtualFileSystemConfiguration.OAUTH2_AUTH_TYPE, - GravitinoVirtualFileSystemConfiguration.FS_GRAVITINO_CLIENT_OAUTH2_CREDENTIAL_KEY, - credential); - - String path = - configuration.get( - GravitinoVirtualFileSystemConfiguration.FS_GRAVITINO_CLIENT_OAUTH2_PATH_KEY); - checkAuthConfig( - GravitinoVirtualFileSystemConfiguration.OAUTH2_AUTH_TYPE, - GravitinoVirtualFileSystemConfiguration.FS_GRAVITINO_CLIENT_OAUTH2_PATH_KEY, - path); - - String scope = - configuration.get( - GravitinoVirtualFileSystemConfiguration.FS_GRAVITINO_CLIENT_OAUTH2_SCOPE_KEY); - checkAuthConfig( - GravitinoVirtualFileSystemConfiguration.OAUTH2_AUTH_TYPE, - GravitinoVirtualFileSystemConfiguration.FS_GRAVITINO_CLIENT_OAUTH2_SCOPE_KEY, - scope); - - DefaultOAuth2TokenProvider authDataProvider = - DefaultOAuth2TokenProvider.builder() - .withUri(authServerUri) - .withCredential(credential) - .withPath(path) - .withScope(scope) - .build(); - - this.client = - GravitinoClient.builder(serverUri) - .withMetalake(metalakeName) - .withOAuth(authDataProvider) - .build(); - } else if (authType.equalsIgnoreCase( - GravitinoVirtualFileSystemConfiguration.KERBEROS_AUTH_TYPE)) { - String principal = - configuration.get( - GravitinoVirtualFileSystemConfiguration.FS_GRAVITINO_CLIENT_KERBEROS_PRINCIPAL_KEY); - checkAuthConfig( - GravitinoVirtualFileSystemConfiguration.KERBEROS_AUTH_TYPE, - GravitinoVirtualFileSystemConfiguration.FS_GRAVITINO_CLIENT_KERBEROS_PRINCIPAL_KEY, - principal); - String keytabFilePath = - configuration.get( - GravitinoVirtualFileSystemConfiguration - .FS_GRAVITINO_CLIENT_KERBEROS_KEYTAB_FILE_PATH_KEY); - KerberosTokenProvider authDataProvider; - if (StringUtils.isNotBlank(keytabFilePath)) { - // Using principal and keytab to create auth provider - authDataProvider = - KerberosTokenProvider.builder() - .withClientPrincipal(principal) - .withKeyTabFile(new File(keytabFilePath)) - .build(); - } else { - // Using ticket cache to create auth provider - authDataProvider = KerberosTokenProvider.builder().withClientPrincipal(principal).build(); - } - this.client = - GravitinoClient.builder(serverUri) - .withMetalake(metalakeName) - .withKerberosAuth(authDataProvider) - .build(); - } else { - throw new IllegalArgumentException( - String.format( - "Unsupported authentication type: %s for %s.", - authType, GravitinoVirtualFileSystemConfiguration.FS_GRAVITINO_CLIENT_AUTH_TYPE_KEY)); - } - } - - private void checkAuthConfig(String authType, String configKey, String configValue) { - Preconditions.checkArgument( - StringUtils.isNotBlank(configValue), - "%s should not be null if %s is set to %s.", - configKey, - GravitinoVirtualFileSystemConfiguration.FS_GRAVITINO_CLIENT_AUTH_TYPE_KEY, - authType); - } - private String getVirtualLocation(NameIdentifier identifier, boolean withScheme) { return String.format( "%s/%s/%s/%s", @@ -360,6 +268,7 @@ private FilesetContextPair getFilesetContext(Path virtualPath, FilesetDataOperat FilesetCatalog filesetCatalog = catalogCache.get( catalogIdent, ident -> client.loadCatalog(catalogIdent.name()).asFilesetCatalog()); + Catalog catalog = (Catalog) filesetCatalog; Preconditions.checkArgument( filesetCatalog != null, String.format("Loaded fileset catalog: %s is null.", catalogIdent)); @@ -383,8 +292,8 @@ private FilesetContextPair getFilesetContext(Path virtualPath, FilesetDataOperat StringUtils.isNotBlank(scheme), "Scheme of the actual file location cannot be null."); FileSystem fs = internalFileSystemCache.get( - scheme, - str -> { + identifier, + ident -> { try { FileSystemProvider provider = fileSystemProvidersMap.get(scheme); if (provider == null) { @@ -398,8 +307,19 @@ private FilesetContextPair getFilesetContext(Path virtualPath, FilesetDataOperat // https://github.com/apache/gravitino/issues/5609 resetFileSystemServiceLoader(scheme); - Map maps = getConfigMap(getConf()); - return provider.getFileSystem(filePath, maps); + Map necessaryPropertyFromCatalog = + catalog.properties().entrySet().stream() + .filter( + property -> + CATALOG_NECESSARY_PROPERTIES_TO_KEEP.contains(property.getKey())) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + + Map totalProperty = Maps.newHashMap(necessaryPropertyFromCatalog); + totalProperty.putAll(getConfigMap(getConf())); + + totalProperty.putAll(getCredentialProperties(provider, catalog, identifier)); + + return provider.getFileSystem(filePath, totalProperty); } catch (IOException ioe) { throw new GravitinoRuntimeException( "Exception occurs when create new FileSystem for actual uri: %s, msg: %s", @@ -410,6 +330,41 @@ private FilesetContextPair getFilesetContext(Path virtualPath, FilesetDataOperat return new FilesetContextPair(new Path(actualFileLocation), fs); } + private Map getCredentialProperties( + FileSystemProvider fileSystemProvider, Catalog catalog, NameIdentifier filesetIdentifier) { + // Do not support credential vending, we do not need to add any credential properties. + if (!(fileSystemProvider instanceof SupportsCredentialVending)) { + return ImmutableMap.of(); + } + + ImmutableMap.Builder mapBuilder = ImmutableMap.builder(); + try { + Fileset fileset = + catalog + .asFilesetCatalog() + .loadFileset( + NameIdentifier.of( + filesetIdentifier.namespace().level(2), filesetIdentifier.name())); + Credential[] credentials = fileset.supportsCredentials().getCredentials(); + if (credentials.length > 0) { + mapBuilder.put( + GravitinoFileSystemCredentialsProvider.GVFS_CREDENTIAL_PROVIDER, + DefaultGravitinoFileSystemCredentialsProvider.class.getCanonicalName()); + mapBuilder.put( + GravitinoFileSystemCredentialsProvider.GVFS_NAME_IDENTIFIER, + filesetIdentifier.toString()); + + SupportsCredentialVending supportsCredentialVending = + (SupportsCredentialVending) fileSystemProvider; + mapBuilder.putAll(supportsCredentialVending.getFileSystemCredentialConf(credentials)); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + + return mapBuilder.build(); + } + private void resetFileSystemServiceLoader(String fsScheme) { try { Map> serviceFileSystems = diff --git a/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/GravitinoVirtualFileSystemUtils.java b/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/GravitinoVirtualFileSystemUtils.java new file mode 100644 index 00000000000..8a0d1d87433 --- /dev/null +++ b/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/GravitinoVirtualFileSystemUtils.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.filesystem.hadoop; + +import com.google.common.base.Preconditions; +import java.io.File; +import org.apache.commons.lang3.StringUtils; +import org.apache.gravitino.client.DefaultOAuth2TokenProvider; +import org.apache.gravitino.client.GravitinoClient; +import org.apache.gravitino.client.KerberosTokenProvider; +import org.apache.hadoop.conf.Configuration; + +/** Utility class for Gravitino Virtual File System. */ +public class GravitinoVirtualFileSystemUtils { + + /** + * Get Gravitino client by the configuration. + * + * @param configuration The configuration for the Gravitino client. + * @return The Gravitino client. + */ + public static GravitinoClient createClient(Configuration configuration) { + // initialize the Gravitino client + String serverUri = + configuration.get(GravitinoVirtualFileSystemConfiguration.FS_GRAVITINO_SERVER_URI_KEY); + String metalakeValue = + configuration.get(GravitinoVirtualFileSystemConfiguration.FS_GRAVITINO_CLIENT_METALAKE_KEY); + Preconditions.checkArgument( + StringUtils.isNotBlank(serverUri), + "'%s' is not set in the configuration", + GravitinoVirtualFileSystemConfiguration.FS_GRAVITINO_SERVER_URI_KEY); + + String authType = + configuration.get( + GravitinoVirtualFileSystemConfiguration.FS_GRAVITINO_CLIENT_AUTH_TYPE_KEY, + GravitinoVirtualFileSystemConfiguration.SIMPLE_AUTH_TYPE); + if (authType.equalsIgnoreCase(GravitinoVirtualFileSystemConfiguration.SIMPLE_AUTH_TYPE)) { + return GravitinoClient.builder(serverUri) + .withMetalake(metalakeValue) + .withSimpleAuth() + .build(); + } else if (authType.equalsIgnoreCase( + GravitinoVirtualFileSystemConfiguration.OAUTH2_AUTH_TYPE)) { + String authServerUri = + configuration.get( + GravitinoVirtualFileSystemConfiguration.FS_GRAVITINO_CLIENT_OAUTH2_SERVER_URI_KEY); + checkAuthConfig( + GravitinoVirtualFileSystemConfiguration.OAUTH2_AUTH_TYPE, + GravitinoVirtualFileSystemConfiguration.FS_GRAVITINO_CLIENT_OAUTH2_SERVER_URI_KEY, + authServerUri); + + String credential = + configuration.get( + GravitinoVirtualFileSystemConfiguration.FS_GRAVITINO_CLIENT_OAUTH2_CREDENTIAL_KEY); + checkAuthConfig( + GravitinoVirtualFileSystemConfiguration.OAUTH2_AUTH_TYPE, + GravitinoVirtualFileSystemConfiguration.FS_GRAVITINO_CLIENT_OAUTH2_CREDENTIAL_KEY, + credential); + + String path = + configuration.get( + GravitinoVirtualFileSystemConfiguration.FS_GRAVITINO_CLIENT_OAUTH2_PATH_KEY); + checkAuthConfig( + GravitinoVirtualFileSystemConfiguration.OAUTH2_AUTH_TYPE, + GravitinoVirtualFileSystemConfiguration.FS_GRAVITINO_CLIENT_OAUTH2_PATH_KEY, + path); + + String scope = + configuration.get( + GravitinoVirtualFileSystemConfiguration.FS_GRAVITINO_CLIENT_OAUTH2_SCOPE_KEY); + checkAuthConfig( + GravitinoVirtualFileSystemConfiguration.OAUTH2_AUTH_TYPE, + GravitinoVirtualFileSystemConfiguration.FS_GRAVITINO_CLIENT_OAUTH2_SCOPE_KEY, + scope); + + DefaultOAuth2TokenProvider authDataProvider = + DefaultOAuth2TokenProvider.builder() + .withUri(authServerUri) + .withCredential(credential) + .withPath(path) + .withScope(scope) + .build(); + + return GravitinoClient.builder(serverUri) + .withMetalake(metalakeValue) + .withOAuth(authDataProvider) + .build(); + } else if (authType.equalsIgnoreCase( + GravitinoVirtualFileSystemConfiguration.KERBEROS_AUTH_TYPE)) { + String principal = + configuration.get( + GravitinoVirtualFileSystemConfiguration.FS_GRAVITINO_CLIENT_KERBEROS_PRINCIPAL_KEY); + checkAuthConfig( + GravitinoVirtualFileSystemConfiguration.KERBEROS_AUTH_TYPE, + GravitinoVirtualFileSystemConfiguration.FS_GRAVITINO_CLIENT_KERBEROS_PRINCIPAL_KEY, + principal); + String keytabFilePath = + configuration.get( + GravitinoVirtualFileSystemConfiguration + .FS_GRAVITINO_CLIENT_KERBEROS_KEYTAB_FILE_PATH_KEY); + KerberosTokenProvider authDataProvider; + if (StringUtils.isNotBlank(keytabFilePath)) { + // Using principal and keytab to create auth provider + authDataProvider = + KerberosTokenProvider.builder() + .withClientPrincipal(principal) + .withKeyTabFile(new File(keytabFilePath)) + .build(); + } else { + // Using ticket cache to create auth provider + authDataProvider = KerberosTokenProvider.builder().withClientPrincipal(principal).build(); + } + + return GravitinoClient.builder(serverUri) + .withMetalake(metalakeValue) + .withKerberosAuth(authDataProvider) + .build(); + } else { + throw new IllegalArgumentException( + String.format( + "Unsupported authentication type: %s for %s.", + authType, GravitinoVirtualFileSystemConfiguration.FS_GRAVITINO_CLIENT_AUTH_TYPE_KEY)); + } + } + + private static void checkAuthConfig(String authType, String configKey, String configValue) { + Preconditions.checkArgument( + StringUtils.isNotBlank(configValue), + "%s should not be null if %s is set to %s.", + configKey, + GravitinoVirtualFileSystemConfiguration.FS_GRAVITINO_CLIENT_AUTH_TYPE_KEY, + authType); + } +} diff --git a/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/TestGvfsBase.java b/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/TestGvfsBase.java index e7e3b7857f5..be30e42a4b8 100644 --- a/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/TestGvfsBase.java +++ b/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/TestGvfsBase.java @@ -27,6 +27,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import com.fasterxml.jackson.core.JsonProcessingException; +import com.google.common.collect.ImmutableMap; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; @@ -40,7 +41,13 @@ import java.util.Objects; import java.util.concurrent.TimeUnit; import org.apache.gravitino.NameIdentifier; +import org.apache.gravitino.dto.AuditDTO; +import org.apache.gravitino.dto.credential.CredentialDTO; +import org.apache.gravitino.dto.file.FilesetDTO; +import org.apache.gravitino.dto.responses.CredentialResponse; import org.apache.gravitino.dto.responses.FileLocationResponse; +import org.apache.gravitino.dto.responses.FilesetResponse; +import org.apache.gravitino.file.Fileset; import org.apache.gravitino.rest.RESTUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; @@ -140,6 +147,7 @@ public void testFSCache() throws IOException { queryParams.put("sub_path", RESTUtils.encodeString("")); try { buildMockResource(Method.GET, locationPath, queryParams, null, fileLocationResponse, SC_OK); + buildMockResourceForCredential(filesetName, localPath.toString()); } catch (JsonProcessingException e) { throw new RuntimeException(e); } @@ -149,7 +157,8 @@ public void testFSCache() throws IOException { Objects.requireNonNull( ((GravitinoVirtualFileSystem) gravitinoFileSystem) .internalFileSystemCache() - .getIfPresent("file")); + .getIfPresent( + NameIdentifier.of(metalakeName, catalogName, schemaName, "testFSCache"))); String anotherFilesetName = "test_new_fs"; Path diffLocalPath = @@ -184,6 +193,7 @@ public void testInternalCache() throws IOException { try { buildMockResource( Method.GET, locationPath1, queryParams, null, fileLocationResponse, SC_OK); + buildMockResourceForCredential("fileset1", localPath1.toString()); } catch (JsonProcessingException e) { throw new RuntimeException(e); } @@ -199,7 +209,10 @@ public void testInternalCache() throws IOException { 0, ((GravitinoVirtualFileSystem) fs).internalFileSystemCache().asMap().size())); - assertNull(((GravitinoVirtualFileSystem) fs).internalFileSystemCache().getIfPresent("file")); + assertNull( + ((GravitinoVirtualFileSystem) fs) + .internalFileSystemCache() + .getIfPresent(NameIdentifier.of("file"))); } } @@ -224,6 +237,7 @@ public void testCreate(boolean withScheme) throws IOException { queryParams.put("sub_path", RESTUtils.encodeString("/test.txt")); try { buildMockResource(Method.GET, locationPath, queryParams, null, fileLocationResponse, SC_OK); + buildMockResourceForCredential(filesetName, localPath + "/test.txt"); } catch (JsonProcessingException e) { throw new RuntimeException(e); } @@ -276,6 +290,7 @@ public void testAppend(boolean withScheme) throws IOException { queryParams.put("sub_path", RESTUtils.encodeString("/test.txt")); try { buildMockResource(Method.GET, locationPath, queryParams, null, fileLocationResponse, SC_OK); + buildMockResourceForCredential(filesetName, localPath + "/test.txt"); } catch (JsonProcessingException e) { throw new RuntimeException(e); } @@ -309,6 +324,32 @@ public void testAppend(boolean withScheme) throws IOException { } } + private void buildMockResourceForCredential(String filesetName, String filesetLocation) + throws JsonProcessingException { + String filesetPath = + String.format( + "/api/metalakes/%s/catalogs/%s/schemas/%s/filesets/%s", + metalakeName, catalogName, schemaName, filesetName); + String credentialsPath = + String.format( + "/api/metalakes/%s/objects/fileset/%s.%s.%s/credentials", + metalakeName, catalogName, schemaName, filesetName); + FilesetResponse filesetResponse = + new FilesetResponse( + FilesetDTO.builder() + .name(filesetName) + .comment("comment") + .type(Fileset.Type.MANAGED) + .audit(AuditDTO.builder().build()) + .storageLocation(filesetLocation.toString()) + .build()); + CredentialResponse credentialResponse = new CredentialResponse(new CredentialDTO[] {}); + + buildMockResource(Method.GET, filesetPath, ImmutableMap.of(), null, filesetResponse, SC_OK); + buildMockResource( + Method.GET, credentialsPath, ImmutableMap.of(), null, credentialResponse, SC_OK); + } + @ParameterizedTest @ValueSource(booleans = {true, false}) public void testRename(boolean withScheme) throws IOException { @@ -343,6 +384,7 @@ public void testRename(boolean withScheme) throws IOException { try { buildMockResource( Method.GET, locationPath, queryParams1, null, fileLocationResponse1, SC_OK); + buildMockResourceForCredential(filesetName, localPath + "/rename_dst2"); } catch (JsonProcessingException e) { throw new RuntimeException(e); } @@ -409,6 +451,7 @@ public void testDelete(boolean withScheme) throws IOException { queryParams.put("sub_path", RESTUtils.encodeString("/test_delete")); try { buildMockResource(Method.GET, locationPath, queryParams, null, fileLocationResponse, SC_OK); + buildMockResourceForCredential(filesetName, localPath + "/test_delete"); } catch (JsonProcessingException e) { throw new RuntimeException(e); } @@ -455,6 +498,7 @@ public void testGetStatus() throws IOException { queryParams.put("sub_path", RESTUtils.encodeString("")); try { buildMockResource(Method.GET, locationPath, queryParams, null, fileLocationResponse, SC_OK); + buildMockResourceForCredential(filesetName, localPath.toString()); } catch (JsonProcessingException e) { throw new RuntimeException(e); } @@ -499,6 +543,7 @@ public void testListStatus() throws IOException { queryParams.put("sub_path", RESTUtils.encodeString("")); try { buildMockResource(Method.GET, locationPath, queryParams, null, fileLocationResponse, SC_OK); + buildMockResourceForCredential(filesetName, localPath.toString()); } catch (JsonProcessingException e) { throw new RuntimeException(e); } @@ -549,6 +594,7 @@ public void testMkdirs() throws IOException { queryParams.put("sub_path", RESTUtils.encodeString("/test_mkdirs")); try { buildMockResource(Method.GET, locationPath, queryParams, null, fileLocationResponse, SC_OK); + buildMockResourceForCredential(filesetName, localPath + "/test_mkdirs"); } catch (JsonProcessingException e) { throw new RuntimeException(e); } @@ -667,6 +713,7 @@ public void testGetDefaultReplications() throws IOException { queryParams.put("sub_path", RESTUtils.encodeString("")); try { buildMockResource(Method.GET, locationPath, queryParams, null, fileLocationResponse, SC_OK); + buildMockResourceForCredential(filesetName, localPath.toString()); } catch (JsonProcessingException e) { throw new RuntimeException(e); } @@ -693,6 +740,7 @@ public void testGetDefaultBlockSize() throws IOException { queryParams.put("sub_path", RESTUtils.encodeString("")); try { buildMockResource(Method.GET, locationPath, queryParams, null, fileLocationResponse, SC_OK); + buildMockResourceForCredential(filesetName, localPath.toString()); } catch (JsonProcessingException e) { throw new RuntimeException(e); } diff --git a/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/integration/test/GravitinoVirtualFileSystemABSCredentialIT.java b/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/integration/test/GravitinoVirtualFileSystemABSCredentialIT.java new file mode 100644 index 00000000000..2f79332e8b3 --- /dev/null +++ b/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/integration/test/GravitinoVirtualFileSystemABSCredentialIT.java @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.filesystem.hadoop.integration.test; + +import static org.apache.gravitino.catalog.hadoop.HadoopCatalogPropertiesMetadata.FILESYSTEM_PROVIDERS; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Maps; +import java.io.IOException; +import java.util.Collections; +import java.util.Map; +import org.apache.gravitino.Catalog; +import org.apache.gravitino.abs.fs.AzureFileSystemProvider; +import org.apache.gravitino.catalog.hadoop.fs.FileSystemUtils; +import org.apache.gravitino.credential.CredentialConstants; +import org.apache.gravitino.integration.test.util.GravitinoITUtils; +import org.apache.gravitino.storage.AzureProperties; +import org.apache.hadoop.conf.Configuration; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.condition.EnabledIf; +import org.junit.platform.commons.util.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@EnabledIf("absIsConfigured") +public class GravitinoVirtualFileSystemABSCredentialIT extends GravitinoVirtualFileSystemIT { + private static final Logger LOG = + LoggerFactory.getLogger(GravitinoVirtualFileSystemABSCredentialIT.class); + + public static final String ABS_ACCOUNT_NAME = System.getenv("ABS_ACCOUNT_NAME_FOR_CREDENTIAL"); + public static final String ABS_ACCOUNT_KEY = System.getenv("ABS_ACCOUNT_KEY_FOR_CREDENTIAL"); + public static final String ABS_CONTAINER_NAME = + System.getenv("ABS_CONTAINER_NAME_FOR_CREDENTIAL"); + public static final String ABS_TENANT_ID = System.getenv("ABS_TENANT_ID_FOR_CREDENTIAL"); + public static final String ABS_CLIENT_ID = System.getenv("ABS_CLIENT_ID_FOR_CREDENTIAL"); + public static final String ABS_CLIENT_SECRET = System.getenv("ABS_CLIENT_SECRET_FOR_CREDENTIAL"); + + @BeforeAll + public void startIntegrationTest() { + // Do nothing + } + + @BeforeAll + public void startUp() throws Exception { + // Copy the Azure jars to the gravitino server if in deploy mode. + copyBundleJarsToHadoop("azure-bundle"); + // Need to download jars to gravitino server + super.startIntegrationTest(); + + // This value can be by tune by the user, please change it accordingly. + defaultBlockSize = 32 * 1024 * 1024; + + // This value is 1 for ABS, 3 for GCS, and 1 for S3A. + defaultReplication = 1; + + metalakeName = GravitinoITUtils.genRandomName("gvfs_it_metalake"); + catalogName = GravitinoITUtils.genRandomName("catalog"); + schemaName = GravitinoITUtils.genRandomName("schema"); + + Assertions.assertFalse(client.metalakeExists(metalakeName)); + metalake = client.createMetalake(metalakeName, "metalake comment", Collections.emptyMap()); + Assertions.assertTrue(client.metalakeExists(metalakeName)); + + Map properties = Maps.newHashMap(); + + properties.put(AzureProperties.GRAVITINO_AZURE_STORAGE_ACCOUNT_NAME, ABS_ACCOUNT_NAME); + properties.put(AzureProperties.GRAVITINO_AZURE_STORAGE_ACCOUNT_KEY, ABS_ACCOUNT_KEY); + properties.put(AzureProperties.GRAVITINO_AZURE_CLIENT_ID, ABS_CLIENT_ID); + properties.put(AzureProperties.GRAVITINO_AZURE_CLIENT_SECRET, ABS_CLIENT_SECRET); + properties.put(AzureProperties.GRAVITINO_AZURE_TENANT_ID, ABS_TENANT_ID); + properties.put(CredentialConstants.CREDENTIAL_PROVIDERS, "adls-token"); + + properties.put(FILESYSTEM_PROVIDERS, AzureFileSystemProvider.ABS_PROVIDER_NAME); + + Catalog catalog = + metalake.createCatalog( + catalogName, Catalog.Type.FILESET, "hadoop", "catalog comment", properties); + Assertions.assertTrue(metalake.catalogExists(catalogName)); + + catalog.asSchemas().createSchema(schemaName, "schema comment", properties); + Assertions.assertTrue(catalog.asSchemas().schemaExists(schemaName)); + + conf.set("fs.gvfs.impl", "org.apache.gravitino.filesystem.hadoop.GravitinoVirtualFileSystem"); + conf.set("fs.AbstractFileSystem.gvfs.impl", "org.apache.gravitino.filesystem.hadoop.Gvfs"); + conf.set("fs.gvfs.impl.disable.cache", "true"); + conf.set("fs.gravitino.server.uri", serverUri); + conf.set("fs.gravitino.client.metalake", metalakeName); + + // Pass this configuration to the real file system + conf.set(AzureProperties.GRAVITINO_AZURE_STORAGE_ACCOUNT_NAME, ABS_ACCOUNT_NAME); + conf.set(AzureProperties.GRAVITINO_AZURE_STORAGE_ACCOUNT_KEY, ABS_ACCOUNT_KEY); + conf.set("fs.abfss.impl", "org.apache.hadoop.fs.azurebfs.SecureAzureBlobFileSystem"); + + conf.set("fs.gravitino.client.useCloudStoreCredential", "true"); + } + + @AfterAll + public void tearDown() throws IOException { + Catalog catalog = metalake.loadCatalog(catalogName); + catalog.asSchemas().dropSchema(schemaName, true); + metalake.dropCatalog(catalogName, true); + client.dropMetalake(metalakeName, true); + + if (client != null) { + client.close(); + client = null; + } + + try { + closer.close(); + } catch (Exception e) { + LOG.error("Exception in closing CloseableGroup", e); + } + } + + /** + * Remove the `gravitino.bypass` prefix from the configuration and pass it to the real file system + * This method corresponds to the method org.apache.gravitino.filesystem.hadoop + * .GravitinoVirtualFileSystem#getConfigMap(Configuration) in the original code. + */ + protected Configuration convertGvfsConfigToRealFileSystemConfig(Configuration gvfsConf) { + Configuration absConf = new Configuration(); + Map map = Maps.newHashMap(); + + gvfsConf.forEach(entry -> map.put(entry.getKey(), entry.getValue())); + + Map hadoopConfMap = FileSystemUtils.toHadoopConfigMap(map, ImmutableMap.of()); + + if (gvfsConf.get(AzureProperties.GRAVITINO_AZURE_STORAGE_ACCOUNT_NAME) != null + && gvfsConf.get(AzureProperties.GRAVITINO_AZURE_STORAGE_ACCOUNT_KEY) != null) { + hadoopConfMap.put( + String.format( + "fs.azure.account.key.%s.dfs.core.windows.net", + gvfsConf.get(AzureProperties.GRAVITINO_AZURE_STORAGE_ACCOUNT_NAME)), + gvfsConf.get(AzureProperties.GRAVITINO_AZURE_STORAGE_ACCOUNT_KEY)); + } + + hadoopConfMap.forEach(absConf::set); + + return absConf; + } + + protected String genStorageLocation(String fileset) { + return String.format( + "%s://%s@%s.dfs.core.windows.net/%s", + AzureFileSystemProvider.ABS_PROVIDER_SCHEME, ABS_CONTAINER_NAME, ABS_ACCOUNT_NAME, fileset); + } + + @Disabled("java.lang.UnsupportedOperationException: Append Support not enabled") + public void testAppend() throws IOException {} + + private static boolean absIsConfigured() { + return StringUtils.isNotBlank(System.getenv("ABS_ACCOUNT_NAME_FOR_CREDENTIAL")) + && StringUtils.isNotBlank(System.getenv("ABS_ACCOUNT_KEY_FOR_CREDENTIAL")) + && StringUtils.isNotBlank(System.getenv("ABS_CONTAINER_NAME_FOR_CREDENTIAL")) + && StringUtils.isNotBlank(System.getenv("ABS_TENANT_ID_FOR_CREDENTIAL")) + && StringUtils.isNotBlank(System.getenv("ABS_CLIENT_ID_FOR_CREDENTIAL")) + && StringUtils.isNotBlank(System.getenv("ABS_CLIENT_SECRET_FOR_CREDENTIAL")); + } +} diff --git a/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/integration/test/GravitinoVirtualFileSystemGCSCredentialIT.java b/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/integration/test/GravitinoVirtualFileSystemGCSCredentialIT.java new file mode 100644 index 00000000000..81b352fa55c --- /dev/null +++ b/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/integration/test/GravitinoVirtualFileSystemGCSCredentialIT.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.filesystem.hadoop.integration.test; + +import static org.apache.gravitino.catalog.hadoop.HadoopCatalogPropertiesMetadata.FILESYSTEM_PROVIDERS; + +import com.google.common.collect.Maps; +import java.io.IOException; +import java.util.Collections; +import java.util.Map; +import org.apache.commons.lang3.StringUtils; +import org.apache.gravitino.Catalog; +import org.apache.gravitino.catalog.hadoop.fs.FileSystemUtils; +import org.apache.gravitino.credential.CredentialConstants; +import org.apache.gravitino.gcs.fs.GCSFileSystemProvider; +import org.apache.gravitino.integration.test.util.GravitinoITUtils; +import org.apache.gravitino.storage.GCSProperties; +import org.apache.hadoop.conf.Configuration; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.condition.EnabledIf; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@EnabledIf(value = "isGCPConfigured", disabledReason = "GCP is not configured") +public class GravitinoVirtualFileSystemGCSCredentialIT extends GravitinoVirtualFileSystemIT { + private static final Logger LOG = + LoggerFactory.getLogger(GravitinoVirtualFileSystemGCSCredentialIT.class); + + public static final String BUCKET_NAME = System.getenv("GCS_BUCKET_NAME_FOR_CREDENTIAL"); + public static final String SERVICE_ACCOUNT_FILE = + System.getenv("GCS_SERVICE_ACCOUNT_JSON_PATH_FOR_CREDENTIAL"); + + @BeforeAll + public void startIntegrationTest() { + // Do nothing + } + + @BeforeAll + public void startUp() throws Exception { + // Copy the GCP jars to the gravitino server if in deploy mode. + copyBundleJarsToHadoop("gcp-bundle"); + // Need to download jars to gravitino server + super.startIntegrationTest(); + + // This value can be by tune by the user, please change it accordingly. + defaultBlockSize = 64 * 1024 * 1024; + + metalakeName = GravitinoITUtils.genRandomName("gvfs_it_metalake"); + catalogName = GravitinoITUtils.genRandomName("catalog"); + schemaName = GravitinoITUtils.genRandomName("schema"); + + Assertions.assertFalse(client.metalakeExists(metalakeName)); + metalake = client.createMetalake(metalakeName, "metalake comment", Collections.emptyMap()); + Assertions.assertTrue(client.metalakeExists(metalakeName)); + + Map properties = Maps.newHashMap(); + properties.put(FILESYSTEM_PROVIDERS, "gcs"); + properties.put(GCSProperties.GRAVITINO_GCS_SERVICE_ACCOUNT_FILE, SERVICE_ACCOUNT_FILE); + properties.put(CredentialConstants.CREDENTIAL_PROVIDERS, "gcs-token"); + + Catalog catalog = + metalake.createCatalog( + catalogName, Catalog.Type.FILESET, "hadoop", "catalog comment", properties); + Assertions.assertTrue(metalake.catalogExists(catalogName)); + + catalog.asSchemas().createSchema(schemaName, "schema comment", properties); + Assertions.assertTrue(catalog.asSchemas().schemaExists(schemaName)); + + conf.set("fs.gvfs.impl", "org.apache.gravitino.filesystem.hadoop.GravitinoVirtualFileSystem"); + conf.set("fs.AbstractFileSystem.gvfs.impl", "org.apache.gravitino.filesystem.hadoop.Gvfs"); + conf.set("fs.gvfs.impl.disable.cache", "true"); + conf.set("fs.gravitino.server.uri", serverUri); + conf.set("fs.gravitino.client.metalake", metalakeName); + + // Pass this configuration to the real file system + conf.set(GCSProperties.GRAVITINO_GCS_SERVICE_ACCOUNT_FILE, SERVICE_ACCOUNT_FILE); + } + + @AfterAll + public void tearDown() throws IOException { + Catalog catalog = metalake.loadCatalog(catalogName); + catalog.asSchemas().dropSchema(schemaName, true); + metalake.dropCatalog(catalogName, true); + client.dropMetalake(metalakeName, true); + + if (client != null) { + client.close(); + client = null; + } + + try { + closer.close(); + } catch (Exception e) { + LOG.error("Exception in closing CloseableGroup", e); + } + } + + /** + * Remove the `gravitino.bypass` prefix from the configuration and pass it to the real file system + * This method corresponds to the method org.apache.gravitino.filesystem.hadoop + * .GravitinoVirtualFileSystem#getConfigMap(Configuration) in the original code. + */ + protected Configuration convertGvfsConfigToRealFileSystemConfig(Configuration gvfsConf) { + Configuration gcsConf = new Configuration(); + Map map = Maps.newHashMap(); + + gvfsConf.forEach(entry -> map.put(entry.getKey(), entry.getValue())); + + Map hadoopConfMap = + FileSystemUtils.toHadoopConfigMap( + map, GCSFileSystemProvider.GRAVITINO_KEY_TO_GCS_HADOOP_KEY); + + hadoopConfMap.forEach(gcsConf::set); + + return gcsConf; + } + + protected String genStorageLocation(String fileset) { + return String.format("gs://%s/dir1/dir2/%s/", BUCKET_NAME, fileset); + } + + @Disabled( + "GCS does not support append, java.io.IOException: The append operation is not supported") + public void testAppend() throws IOException {} + + private static boolean isGCPConfigured() { + return StringUtils.isNotBlank(System.getenv("GCS_SERVICE_ACCOUNT_JSON_PATH_FOR_CREDENTIAL")) + && StringUtils.isNotBlank(System.getenv("GCS_BUCKET_NAME_FOR_CREDENTIAL")); + } +} diff --git a/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/integration/test/GravitinoVirtualFileSystemOSSCredentialIT.java b/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/integration/test/GravitinoVirtualFileSystemOSSCredentialIT.java new file mode 100644 index 00000000000..662e8f6e464 --- /dev/null +++ b/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/integration/test/GravitinoVirtualFileSystemOSSCredentialIT.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.filesystem.hadoop.integration.test; + +import static org.apache.gravitino.catalog.hadoop.HadoopCatalogPropertiesMetadata.FILESYSTEM_PROVIDERS; + +import com.google.common.collect.Maps; +import java.io.IOException; +import java.util.Collections; +import java.util.Map; +import org.apache.gravitino.Catalog; +import org.apache.gravitino.catalog.hadoop.fs.FileSystemUtils; +import org.apache.gravitino.credential.CredentialConstants; +import org.apache.gravitino.credential.OSSTokenCredential; +import org.apache.gravitino.integration.test.util.GravitinoITUtils; +import org.apache.gravitino.oss.fs.OSSFileSystemProvider; +import org.apache.gravitino.storage.OSSProperties; +import org.apache.hadoop.conf.Configuration; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.condition.EnabledIf; +import org.junit.platform.commons.util.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@EnabledIf(value = "ossIsConfigured", disabledReason = "OSS is not prepared") +public class GravitinoVirtualFileSystemOSSCredentialIT extends GravitinoVirtualFileSystemIT { + private static final Logger LOG = + LoggerFactory.getLogger(GravitinoVirtualFileSystemOSSCredentialIT.class); + + public static final String BUCKET_NAME = System.getenv("OSS_BUCKET_NAME_FOR_CREDENTIAL"); + public static final String OSS_ACCESS_KEY = System.getenv("OSS_ACCESS_KEY_ID_FOR_CREDENTIAL"); + public static final String OSS_SECRET_KEY = System.getenv("OSS_SECRET_ACCESS_KEY_FOR_CREDENTIAL"); + public static final String OSS_ENDPOINT = System.getenv("OSS_ENDPOINT_FOR_CREDENTIAL"); + public static final String OSS_REGION = System.getenv("OSS_REGION_FOR_CREDENTIAL"); + public static final String OSS_ROLE_ARN = System.getenv("OSS_ROLE_ARN_FOR_CREDENTIAL"); + + @BeforeAll + public void startIntegrationTest() { + // Do nothing + } + + @BeforeAll + public void startUp() throws Exception { + copyBundleJarsToHadoop("aliyun-bundle"); + // Need to download jars to gravitino server + super.startIntegrationTest(); + + // This value can be by tune by the user, please change it accordingly. + defaultBlockSize = 64 * 1024 * 1024; + + // The default replication factor is 1. + defaultReplication = 1; + + metalakeName = GravitinoITUtils.genRandomName("gvfs_it_metalake"); + catalogName = GravitinoITUtils.genRandomName("catalog"); + schemaName = GravitinoITUtils.genRandomName("schema"); + + Assertions.assertFalse(client.metalakeExists(metalakeName)); + metalake = client.createMetalake(metalakeName, "metalake comment", Collections.emptyMap()); + Assertions.assertTrue(client.metalakeExists(metalakeName)); + + Map properties = Maps.newHashMap(); + properties.put(FILESYSTEM_PROVIDERS, "oss"); + properties.put(OSSProperties.GRAVITINO_OSS_ACCESS_KEY_ID, OSS_ACCESS_KEY); + properties.put(OSSProperties.GRAVITINO_OSS_ACCESS_KEY_SECRET, OSS_SECRET_KEY); + properties.put(OSSProperties.GRAVITINO_OSS_ENDPOINT, OSS_ENDPOINT); + properties.put(OSSProperties.GRAVITINO_OSS_REGION, OSS_REGION); + properties.put(OSSProperties.GRAVITINO_OSS_ROLE_ARN, OSS_ROLE_ARN); + properties.put( + CredentialConstants.CREDENTIAL_PROVIDERS, OSSTokenCredential.OSS_TOKEN_CREDENTIAL_TYPE); + + Catalog catalog = + metalake.createCatalog( + catalogName, Catalog.Type.FILESET, "hadoop", "catalog comment", properties); + Assertions.assertTrue(metalake.catalogExists(catalogName)); + + catalog.asSchemas().createSchema(schemaName, "schema comment", properties); + Assertions.assertTrue(catalog.asSchemas().schemaExists(schemaName)); + + conf.set("fs.gvfs.impl", "org.apache.gravitino.filesystem.hadoop.GravitinoVirtualFileSystem"); + conf.set("fs.AbstractFileSystem.gvfs.impl", "org.apache.gravitino.filesystem.hadoop.Gvfs"); + conf.set("fs.gvfs.impl.disable.cache", "true"); + conf.set("fs.gravitino.server.uri", serverUri); + conf.set("fs.gravitino.client.metalake", metalakeName); + + // Pass this configuration to the real file system + conf.set(OSSProperties.GRAVITINO_OSS_ACCESS_KEY_ID, OSS_ACCESS_KEY); + conf.set(OSSProperties.GRAVITINO_OSS_ACCESS_KEY_SECRET, OSS_SECRET_KEY); + conf.set(OSSProperties.GRAVITINO_OSS_ENDPOINT, OSS_ENDPOINT); + conf.set("fs.oss.impl", "org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem"); + } + + @AfterAll + public void tearDown() throws IOException { + Catalog catalog = metalake.loadCatalog(catalogName); + catalog.asSchemas().dropSchema(schemaName, true); + metalake.dropCatalog(catalogName, true); + client.dropMetalake(metalakeName, true); + + if (client != null) { + client.close(); + client = null; + } + + try { + closer.close(); + } catch (Exception e) { + LOG.error("Exception in closing CloseableGroup", e); + } + } + + /** + * Remove the `gravitino.bypass` prefix from the configuration and pass it to the real file system + * This method corresponds to the method org.apache.gravitino.filesystem.hadoop + * .GravitinoVirtualFileSystem#getConfigMap(Configuration) in the original code. + */ + protected Configuration convertGvfsConfigToRealFileSystemConfig(Configuration gvfsConf) { + Configuration ossConf = new Configuration(); + Map map = Maps.newHashMap(); + + gvfsConf.forEach(entry -> map.put(entry.getKey(), entry.getValue())); + + Map hadoopConfMap = + FileSystemUtils.toHadoopConfigMap( + map, OSSFileSystemProvider.GRAVITINO_KEY_TO_OSS_HADOOP_KEY); + + hadoopConfMap.forEach(ossConf::set); + + return ossConf; + } + + protected String genStorageLocation(String fileset) { + return String.format("oss://%s/%s", BUCKET_NAME, fileset); + } + + @Disabled( + "OSS does not support append, java.io.IOException: The append operation is not supported") + public void testAppend() throws IOException {} + + protected static boolean ossIsConfigured() { + return StringUtils.isNotBlank(System.getenv("OSS_ACCESS_KEY_ID_FOR_CREDENTIAL")) + && StringUtils.isNotBlank(System.getenv("OSS_SECRET_ACCESS_KEY_FOR_CREDENTIAL")) + && StringUtils.isNotBlank(System.getenv("OSS_ENDPOINT_FOR_CREDENTIAL")) + && StringUtils.isNotBlank(System.getenv("OSS_BUCKET_NAME_FOR_CREDENTIAL")) + && StringUtils.isNotBlank(System.getenv("OSS_REGION_FOR_CREDENTIAL")) + && StringUtils.isNotBlank(System.getenv("OSS_ROLE_ARN_FOR_CREDENTIAL")); + } +} diff --git a/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/integration/test/GravitinoVirtualFileSystemS3CredentialIT.java b/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/integration/test/GravitinoVirtualFileSystemS3CredentialIT.java new file mode 100644 index 00000000000..12d5309675d --- /dev/null +++ b/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/integration/test/GravitinoVirtualFileSystemS3CredentialIT.java @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.filesystem.hadoop.integration.test; + +import static org.apache.gravitino.catalog.hadoop.HadoopCatalogPropertiesMetadata.FILESYSTEM_PROVIDERS; + +import com.google.common.collect.Maps; +import java.io.IOException; +import java.util.Collections; +import java.util.Map; +import org.apache.gravitino.Catalog; +import org.apache.gravitino.catalog.hadoop.fs.FileSystemUtils; +import org.apache.gravitino.credential.CredentialConstants; +import org.apache.gravitino.credential.S3TokenCredential; +import org.apache.gravitino.integration.test.util.GravitinoITUtils; +import org.apache.gravitino.s3.fs.S3FileSystemProvider; +import org.apache.gravitino.storage.S3Properties; +import org.apache.hadoop.conf.Configuration; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.condition.EnabledIf; +import org.junit.platform.commons.util.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@EnabledIf(value = "s3IsConfigured", disabledReason = "s3 with credential is not prepared") +public class GravitinoVirtualFileSystemS3CredentialIT extends GravitinoVirtualFileSystemIT { + private static final Logger LOG = + LoggerFactory.getLogger(GravitinoVirtualFileSystemS3CredentialIT.class); + + public static final String BUCKET_NAME = System.getenv("S3_BUCKET_NAME_FOR_CREDENTIAL"); + public static final String S3_ACCESS_KEY = System.getenv("S3_ACCESS_KEY_ID_FOR_CREDENTIAL"); + public static final String S3_SECRET_KEY = System.getenv("S3_SECRET_ACCESS_KEY_FOR_CREDENTIAL"); + public static final String S3_ENDPOINT = System.getenv("S3_ENDPOINT_FOR_CREDENTIAL"); + public static final String S3_REGION = System.getenv("S3_REGION_FOR_CREDENTIAL"); + public static final String S3_ROLE_ARN = System.getenv("S3_ROLE_ARN_FOR_CREDENTIAL"); + + @BeforeAll + public void startIntegrationTest() { + // Do nothing + } + + @BeforeAll + public void startUp() throws Exception { + copyBundleJarsToHadoop("aws-bundle"); + + // Need to download jars to gravitino server + super.startIntegrationTest(); + + // This value can be by tune by the user, please change it accordingly. + defaultBlockSize = 32 * 1024 * 1024; + + // The value is 1 for S3 + defaultReplication = 1; + + metalakeName = GravitinoITUtils.genRandomName("gvfs_it_metalake"); + catalogName = GravitinoITUtils.genRandomName("catalog"); + schemaName = GravitinoITUtils.genRandomName("schema"); + + Assertions.assertFalse(client.metalakeExists(metalakeName)); + metalake = client.createMetalake(metalakeName, "metalake comment", Collections.emptyMap()); + Assertions.assertTrue(client.metalakeExists(metalakeName)); + + Map properties = Maps.newHashMap(); + properties.put(S3Properties.GRAVITINO_S3_ACCESS_KEY_ID, S3_ACCESS_KEY); + properties.put(S3Properties.GRAVITINO_S3_SECRET_ACCESS_KEY, S3_SECRET_KEY); + properties.put(S3Properties.GRAVITINO_S3_ENDPOINT, S3_ENDPOINT); + properties.put( + "gravitino.bypass.fs.s3a.aws.credentials.provider", + "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider"); + properties.put(FILESYSTEM_PROVIDERS, "s3"); + + properties.put(S3Properties.GRAVITINO_S3_REGION, S3_REGION); + properties.put(S3Properties.GRAVITINO_S3_ROLE_ARN, S3_ROLE_ARN); + properties.put( + CredentialConstants.CREDENTIAL_PROVIDERS, S3TokenCredential.S3_TOKEN_CREDENTIAL_TYPE); + + Catalog catalog = + metalake.createCatalog( + catalogName, Catalog.Type.FILESET, "hadoop", "catalog comment", properties); + Assertions.assertTrue(metalake.catalogExists(catalogName)); + + catalog.asSchemas().createSchema(schemaName, "schema comment", properties); + Assertions.assertTrue(catalog.asSchemas().schemaExists(schemaName)); + + conf.set("fs.gvfs.impl", "org.apache.gravitino.filesystem.hadoop.GravitinoVirtualFileSystem"); + conf.set("fs.AbstractFileSystem.gvfs.impl", "org.apache.gravitino.filesystem.hadoop.Gvfs"); + conf.set("fs.gvfs.impl.disable.cache", "true"); + conf.set("fs.gravitino.server.uri", serverUri); + conf.set("fs.gravitino.client.metalake", metalakeName); + + // Pass this configuration to the real file system + conf.set(S3Properties.GRAVITINO_S3_SECRET_ACCESS_KEY, S3_SECRET_KEY); + conf.set(S3Properties.GRAVITINO_S3_ACCESS_KEY_ID, S3_ACCESS_KEY); + conf.set(S3Properties.GRAVITINO_S3_ENDPOINT, S3_ENDPOINT); + conf.set(S3Properties.GRAVITINO_S3_REGION, S3_REGION); + conf.set(S3Properties.GRAVITINO_S3_ROLE_ARN, S3_ROLE_ARN); + } + + @AfterAll + public void tearDown() throws IOException { + Catalog catalog = metalake.loadCatalog(catalogName); + catalog.asSchemas().dropSchema(schemaName, true); + metalake.dropCatalog(catalogName, true); + client.dropMetalake(metalakeName, true); + + if (client != null) { + client.close(); + client = null; + } + + try { + closer.close(); + } catch (Exception e) { + LOG.error("Exception in closing CloseableGroup", e); + } + } + + /** + * Remove the `gravitino.bypass` prefix from the configuration and pass it to the real file system + * This method corresponds to the method org.apache.gravitino.filesystem.hadoop + * .GravitinoVirtualFileSystem#getConfigMap(Configuration) in the original code. + */ + protected Configuration convertGvfsConfigToRealFileSystemConfig(Configuration gvfsConf) { + Configuration s3Conf = new Configuration(); + Map map = Maps.newHashMap(); + + gvfsConf.forEach(entry -> map.put(entry.getKey(), entry.getValue())); + + Map hadoopConfMap = + FileSystemUtils.toHadoopConfigMap(map, S3FileSystemProvider.GRAVITINO_KEY_TO_S3_HADOOP_KEY); + + hadoopConfMap.forEach(s3Conf::set); + + return s3Conf; + } + + protected String genStorageLocation(String fileset) { + return String.format("s3a://%s/%s", BUCKET_NAME, fileset); + } + + @Disabled( + "GCS does not support append, java.io.IOException: The append operation is not supported") + public void testAppend() throws IOException {} + + protected static boolean s3IsConfigured() { + return StringUtils.isNotBlank(System.getenv("S3_ACCESS_KEY_ID_FOR_CREDENTIAL")) + && StringUtils.isNotBlank(System.getenv("S3_SECRET_ACCESS_KEY_FOR_CREDENTIAL")) + && StringUtils.isNotBlank(System.getenv("S3_ENDPOINT_FOR_CREDENTIAL")) + && StringUtils.isNotBlank(System.getenv("S3_BUCKET_NAME_FOR_CREDENTIAL")) + && StringUtils.isNotBlank(System.getenv("S3_REGION_FOR_CREDENTIAL")) + && StringUtils.isNotBlank(System.getenv("S3_ROLE_ARN_FOR_CREDENTIAL")); + } +}