From 3f2502259d6ae3668ce16bd877284bb119566cec Mon Sep 17 00:00:00 2001 From: Wenhai Zhu Date: Thu, 30 Jun 2022 11:12:14 -0700 Subject: [PATCH] CloudFileUploader - GCS (#249) Summary: Pull Request resolved: https://github.com/facebookresearch/fbpcf/pull/249 Diff for supporting multipart upload in GCP. Interesting that GCP Client Libarary doesn't support multipart upload (source: https://cloud.google.com/storage/docs/uploads-downloads#support_per_tool). There is a comment in stackoverflower (https://stackoverflow.com/a/71698884) mentioned that "You can do Multipart upload to GCS using the S3 library, as the GCS XML API is compatible with this. There is a guide to using the S3 library with GCS here. " So I'm following it here. Differential Revision: D37490665 fbshipit-source-id: 032b1b0f073a126b56764ed46deb497a4a4dd453 --- fbpcf/aws/S3Util.cpp | 6 + fbpcf/aws/S3Util.h | 2 + fbpcf/gcp/GCSUtil.cpp | 6 +- fbpcf/gcp/test/GCSUtilTest.cpp | 16 +++ fbpcf/io/cloud_util/CloudFileUtil.cpp | 29 +++- fbpcf/io/cloud_util/GCSFileUploader.cpp | 125 ++++++++++++++++++ fbpcf/io/cloud_util/GCSFileUploader.h | 37 ++++++ .../io/cloud_util/test/CloudFileUtilTest.cpp | 10 +- 8 files changed, 223 insertions(+), 8 deletions(-) create mode 100644 fbpcf/io/cloud_util/GCSFileUploader.cpp create mode 100644 fbpcf/io/cloud_util/GCSFileUploader.h diff --git a/fbpcf/aws/S3Util.cpp b/fbpcf/aws/S3Util.cpp index f3dba330..ae6f7b2e 100644 --- a/fbpcf/aws/S3Util.cpp +++ b/fbpcf/aws/S3Util.cpp @@ -84,6 +84,12 @@ std::unique_ptr createS3Client( const S3ClientOption& option) { Aws::Client::ClientConfiguration config; + if (option.endpointOverride.has_value()) { + config.endpointOverride = option.endpointOverride.value(); + } else if (std::getenv("AWS_ENDPOINT_OVERRIDE")) { + config.endpointOverride = std::getenv("AWS_ENDPOINT_OVERRIDE"); + } + if (option.region.has_value()) { config.region = option.region.value(); } else if (std::getenv("AWS_DEFAULT_REGION")) { diff --git a/fbpcf/aws/S3Util.h b/fbpcf/aws/S3Util.h index 133e2f12..a3a751af 100644 --- a/fbpcf/aws/S3Util.h +++ b/fbpcf/aws/S3Util.h @@ -17,6 +17,8 @@ namespace fbpcf::aws { // referencee of environment variables: // https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-envvars.html struct S3ClientOption { + // AWS_ENDPOINT_OVERRIDE + std::optional endpointOverride; // AWS_DEFAULT_REGION std::optional region; // AWS_ACCESS_KEY_ID diff --git a/fbpcf/gcp/GCSUtil.cpp b/fbpcf/gcp/GCSUtil.cpp index dbc94111..6f17306d 100644 --- a/fbpcf/gcp/GCSUtil.cpp +++ b/fbpcf/gcp/GCSUtil.cpp @@ -18,7 +18,9 @@ namespace fbpcf::gcp { // Format: // 1. https://storage.cloud.google.com/bucket-name/key-name -// 2. gs://bucket-name/key-name +// 2. https://bucket-name.storage.googleapis.com/key-name +// 3. https://storage.googleapis.com/bucket-name/key-name +// 4. gs://bucket-name/key-name GCSObjectReference uriToObjectReference(std::string url) { std::string bucket; std::string key; @@ -37,6 +39,8 @@ GCSObjectReference uriToObjectReference(std::string url) { if (boost::iequals(scheme, "gs")) { bucket = host; + } else if (host.find(".storage.googleapis.com") != std::string::npos) { + bucket = host.substr(0, host.find_first_of(".")); } else { // Remove the first character '/' in path path = path.substr(1); diff --git a/fbpcf/gcp/test/GCSUtilTest.cpp b/fbpcf/gcp/test/GCSUtilTest.cpp index 8213800a..8faa9e4f 100644 --- a/fbpcf/gcp/test/GCSUtilTest.cpp +++ b/fbpcf/gcp/test/GCSUtilTest.cpp @@ -23,6 +23,22 @@ TEST(GCSUtil, uriToObjectReference) { EXPECT_EQ("key", ref.key); } +TEST(GCSUtil, uriToObjectReference_virtualHostStyle) { + auto uri = "https://bucket-name.storage.googleapis.com/key-name"; + auto ref = fbpcf::gcp::uriToObjectReference(uri); + + EXPECT_EQ("bucket-name", ref.bucket); + EXPECT_EQ("key-name", ref.key); +} + +TEST(GCSUtil, uriToObjectReference_pathStyle) { + auto uri = "https://storage.googleapis.com/bucket-name/key-name"; + auto ref = fbpcf::gcp::uriToObjectReference(uri); + + EXPECT_EQ("bucket-name", ref.bucket); + EXPECT_EQ("key-name", ref.key); +} + TEST(GCSUtil, uriToObjectReference_Subfolder) { auto uri = "https://storage.cloud.google.com/bucket/folder/key"; auto ref = fbpcf::gcp::uriToObjectReference(uri); diff --git a/fbpcf/io/cloud_util/CloudFileUtil.cpp b/fbpcf/io/cloud_util/CloudFileUtil.cpp index 0c3f6e78..2747f4af 100644 --- a/fbpcf/io/cloud_util/CloudFileUtil.cpp +++ b/fbpcf/io/cloud_util/CloudFileUtil.cpp @@ -6,9 +6,12 @@ */ #include "fbpcf/io/cloud_util/CloudFileUtil.h" +#include #include #include "fbpcf/aws/S3Util.h" #include "fbpcf/exception/PcfException.h" +#include "fbpcf/gcp/GCSUtil.h" +#include "fbpcf/io/cloud_util/GCSFileUploader.h" #include "fbpcf/io/cloud_util/S3Client.h" #include "fbpcf/io/cloud_util/S3FileReader.h" #include "fbpcf/io/cloud_util/S3FileUploader.h" @@ -17,12 +20,14 @@ namespace fbpcf::cloudio { CloudFileType getCloudFileType(const std::string& filePath) { // S3 file format: - // 1. https://bucket-name.s3.Region.amazonaws.com/key-name - // 2. https://bucket-name.s3-Region.amazonaws.com/key-name + // 1. https://bucket-name.s3.region.amazonaws.com/key-name + // 2. https://bucket-name.s3-region.amazonaws.com/key-name // 3. s3://bucket-name/key-name // GCS file format: // 1. https://storage.cloud.google.com/bucket-name/key-name - // 2. gs://bucket-name/key-name + // 2. https://bucket-name.storage.googleapis.com/key-name + // 3. https://storage.googleapis.com/bucket-name/key-name + // 4. gs://bucket-name/key-name static const re2::RE2 s3Regex1( "https://[a-z0-9.-]+.s3.[a-z0-9-]+.amazonaws.com/.+"); static const re2::RE2 s3Regex2( @@ -34,9 +39,14 @@ CloudFileType getCloudFileType(const std::string& filePath) { return CloudFileType::S3; } - static const re2::RE2 gcsRegex("https://storage.cloud.google.com/.*"); - bool isGCSFile = - re2::RE2::FullMatch(filePath, gcsRegex) || filePath.find("gs://", 0) == 0; + static const re2::RE2 gcsRegex1("https://storage.cloud.google.com/.*"); + static const re2::RE2 gcsRegex2( + "https://[a-z0-9.-]+.storage.googleapis.com/.+"); + static const re2::RE2 gcsRegex3("https://storage.googleapis.com/.*"); + bool isGCSFile = re2::RE2::FullMatch(filePath, gcsRegex1) || + re2::RE2::FullMatch(filePath, gcsRegex2) || + re2::RE2::FullMatch(filePath, gcsRegex3) || + filePath.find("gs://", 0) == 0; if (isGCSFile) { return CloudFileType::GCS; } @@ -64,6 +74,13 @@ std::unique_ptr getCloudFileUploader( fbpcf::aws::S3ClientOption{.region = ref.region}) .getS3Client(), filePath); + } else if (fileType == CloudFileType::GCS) { + return std::make_unique( + fbpcf::cloudio::S3Client::getInstance( + fbpcf::aws::S3ClientOption{ + .endpointOverride = "https://storage.googleapis.com/"}) + .getS3Client(), + filePath); } else { throw fbpcf::PcfException("Not supported yet."); } diff --git a/fbpcf/io/cloud_util/GCSFileUploader.cpp b/fbpcf/io/cloud_util/GCSFileUploader.cpp new file mode 100644 index 00000000..d227ba2b --- /dev/null +++ b/fbpcf/io/cloud_util/GCSFileUploader.cpp @@ -0,0 +1,125 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + */ + +#include "fbpcf/io/cloud_util/GCSFileUploader.h" +#include +#include +#include +#include +#include +#include +#include "fbpcf/aws/S3Util.h" +#include "fbpcf/exception/AwsException.h" +#include "fbpcf/gcp/GCSUtil.h" + +namespace fbpcf::cloudio { + +static const std::string FILE_TYPE = "text/csv"; +static const int MAX_RETRY_COUNT = 3; + +void GCSFileUploader::init() { + XLOG(INFO) << "Start multipart upload initialization. "; + const auto& ref = fbpcf::gcp::uriToObjectReference(filePath_); + bucket_ = ref.bucket; + key_ = ref.key; + Aws::S3::Model::CreateMultipartUploadRequest request; + request.SetBucket(bucket_); + request.SetKey(key_); + request.SetContentType(FILE_TYPE); + + XLOG(INFO) << "Bucket: " << bucket_ << ", Key: " << key_; + + auto createMultipartUploadOutcome = + gcsClient_->CreateMultipartUpload(request); + + if (createMultipartUploadOutcome.IsSuccess()) { + uploadId_ = createMultipartUploadOutcome.GetResult().GetUploadId(); + XLOG(INFO) << "Multipart upload initialization succeed. Upload id is: " + << uploadId_; + } else { + XLOG(ERR) << createMultipartUploadOutcome.GetError(); + throw AwsException{ + "Multipart upload initialization failed: " + + createMultipartUploadOutcome.GetError().GetMessage()}; + } +} + +int GCSFileUploader::upload(std::vector& buf) { + XLOG(INFO) << "Start uploading part:" + << "Part number: " << partNumber_ << "\nBucket: " << bucket_ + << "\nKey: " << key_; + Aws::S3::Model::UploadPartRequest request; + request.SetBucket(bucket_); + request.SetKey(key_); + request.SetUploadId(uploadId_); + request.SetPartNumber(partNumber_); + request.SetContentLength(buf.size()); + + Aws::String str(buf.begin(), buf.end()); + auto inputData = Aws::MakeShared("UploadPartStream", str); + request.SetBody(inputData); + XLOG(INFO) << "Upload stream size: " << str.size(); + + auto uploadPartResult = gcsClient_->UploadPart(request); + int retryCount = 0; + while (!uploadPartResult.IsSuccess() && retryCount < MAX_RETRY_COUNT) { + XLOG(INFO) << "Upload part " << partNumber_ << " failed. Retrying..."; + uploadPartResult = gcsClient_->UploadPart(request); + retryCount++; + } + + if (uploadPartResult.IsSuccess()) { + XLOG(INFO) << "Upload part " << partNumber_ << " succeeed."; + Aws::S3::Model::CompletedPart part; + part.SetPartNumber(request.GetPartNumber()); + part.SetETag(uploadPartResult.GetResult().GetETag()); + completedParts_.push_back(part); + partNumber_++; + return str.size(); + } else { + XLOG(INFO) << "Upload part " << partNumber_ << " failed. Aborting..."; + abortUpload(); + return 0; + } +} + +int GCSFileUploader::complete() { + Aws::S3::Model::CompleteMultipartUploadRequest request; + request.SetBucket(bucket_); + request.SetKey(key_); + request.SetUploadId(uploadId_); + request.SetMultipartUpload( + Aws::S3::Model::CompletedMultipartUpload().WithParts(completedParts_)); + + auto completeMultipartUploadResult = + gcsClient_->CompleteMultipartUpload(request); + if (completeMultipartUploadResult.IsSuccess()) { + XLOG(INFO) << "File " << filePath_ << " uploaded successfully."; + return 0; + } else { + XLOG(ERR) << "File " << filePath_ << " failed to upload."; + XLOG(ERR) << "Error: " << completeMultipartUploadResult.GetError(); + abortUpload(); + return -1; + } +} + +void GCSFileUploader::abortUpload() { + Aws::S3::Model::AbortMultipartUploadRequest abortRequest; + abortRequest.SetBucket(bucket_); + abortRequest.SetKey(key_); + abortRequest.SetUploadId(uploadId_); + auto abortMultipartUploadResult = + gcsClient_->AbortMultipartUpload(abortRequest); + if (abortMultipartUploadResult.IsSuccess()) { + XLOG(INFO) << "Abort upload successed. "; + } else { + XLOG(ERR) << "Abort upload failed. Upload ID: " + uploadId_; + } +} + +} // namespace fbpcf::cloudio diff --git a/fbpcf/io/cloud_util/GCSFileUploader.h b/fbpcf/io/cloud_util/GCSFileUploader.h new file mode 100644 index 00000000..4183a3cf --- /dev/null +++ b/fbpcf/io/cloud_util/GCSFileUploader.h @@ -0,0 +1,37 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + */ + +#pragma once +#include +#include +#include +#include "fbpcf/io/cloud_util/IFileUploader.h" + +namespace fbpcf::cloudio { +class GCSFileUploader : public IFileUploader { + public: + explicit GCSFileUploader( + std::shared_ptr client, + const std::string& filePath) + : gcsClient_{std::move(client)}, filePath_{filePath} { + init(); + } + int upload(std::vector& buf) override; + int complete() override; + + private: + void init() override; + void abortUpload(); + std::shared_ptr gcsClient_; + const std::string filePath_; + std::string bucket_; + std::string key_; + std::string uploadId_; + std::size_t partNumber_ = 1; + Aws::Vector completedParts_; +}; +} // namespace fbpcf::cloudio diff --git a/fbpcf/io/cloud_util/test/CloudFileUtilTest.cpp b/fbpcf/io/cloud_util/test/CloudFileUtilTest.cpp index 83fa8d51..6be0db31 100644 --- a/fbpcf/io/cloud_util/test/CloudFileUtilTest.cpp +++ b/fbpcf/io/cloud_util/test/CloudFileUtilTest.cpp @@ -26,9 +26,17 @@ TEST(FileManagerUtilTest, TestGetCloudFileType) { getCloudFileType("https://storage.cloud.google.com/bucket-name/key-name"); EXPECT_EQ(CloudFileType::GCS, gcsType1); - auto gcsType2 = getCloudFileType("gs://bucket-name/key-name"); + auto gcsType2 = + getCloudFileType("https://bucket-name.storage.googleapis.com/key-name"); EXPECT_EQ(CloudFileType::GCS, gcsType2); + auto gcsType3 = + getCloudFileType("https://storage.googleapis.com/bucket-name/key-name"); + EXPECT_EQ(CloudFileType::GCS, gcsType3); + + auto gcsType4 = getCloudFileType("gs://bucket-name/key-name"); + EXPECT_EQ(CloudFileType::GCS, gcsType4); + auto unkonwnType = getCloudFileType("https://storage.test.com/bucket-name/key-name"); EXPECT_EQ(CloudFileType::UNKNOWN, unkonwnType);