Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CloudFileUploader - GCS #249

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions fbpcf/aws/S3Util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,12 @@ std::unique_ptr<Aws::S3::S3Client> createS3Client(
const S3ClientOption& option) {
Aws::Client::ClientConfiguration config;

if (option.endpointOverride.has_value()) {
config.endpointOverride = option.endpointOverride.value();
} else if (std::getenv("AWS_ENDPOINT_OVERRIDE")) {
config.endpointOverride = std::getenv("AWS_ENDPOINT_OVERRIDE");
}

if (option.region.has_value()) {
config.region = option.region.value();
} else if (std::getenv("AWS_DEFAULT_REGION")) {
Expand Down
2 changes: 2 additions & 0 deletions fbpcf/aws/S3Util.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ namespace fbpcf::aws {
// referencee of environment variables:
// https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-envvars.html
struct S3ClientOption {
// AWS_ENDPOINT_OVERRIDE
std::optional<std::string> endpointOverride;
// AWS_DEFAULT_REGION
std::optional<std::string> region;
// AWS_ACCESS_KEY_ID
Expand Down
6 changes: 5 additions & 1 deletion fbpcf/gcp/GCSUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
namespace fbpcf::gcp {
// Format:
// 1. https://storage.cloud.google.com/bucket-name/key-name
// 2. gs://bucket-name/key-name
// 2. https://bucket-name.storage.googleapis.com/key-name
// 3. https://storage.googleapis.com/bucket-name/key-name
// 4. gs://bucket-name/key-name
GCSObjectReference uriToObjectReference(std::string url) {
std::string bucket;
std::string key;
Expand All @@ -37,6 +39,8 @@ GCSObjectReference uriToObjectReference(std::string url) {

if (boost::iequals(scheme, "gs")) {
bucket = host;
} else if (host.find(".storage.googleapis.com") != std::string::npos) {
bucket = host.substr(0, host.find_first_of("."));
} else {
// Remove the first character '/' in path
path = path.substr(1);
Expand Down
16 changes: 16 additions & 0 deletions fbpcf/gcp/test/GCSUtilTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,22 @@ TEST(GCSUtil, uriToObjectReference) {
EXPECT_EQ("key", ref.key);
}

TEST(GCSUtil, uriToObjectReference_virtualHostStyle) {
auto uri = "https://bucket-name.storage.googleapis.com/key-name";
auto ref = fbpcf::gcp::uriToObjectReference(uri);

EXPECT_EQ("bucket-name", ref.bucket);
EXPECT_EQ("key-name", ref.key);
}

TEST(GCSUtil, uriToObjectReference_pathStyle) {
auto uri = "https://storage.googleapis.com/bucket-name/key-name";
auto ref = fbpcf::gcp::uriToObjectReference(uri);

EXPECT_EQ("bucket-name", ref.bucket);
EXPECT_EQ("key-name", ref.key);
}

TEST(GCSUtil, uriToObjectReference_Subfolder) {
auto uri = "https://storage.cloud.google.com/bucket/folder/key";
auto ref = fbpcf::gcp::uriToObjectReference(uri);
Expand Down
29 changes: 23 additions & 6 deletions fbpcf/io/cloud_util/CloudFileUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,12 @@
*/

#include "fbpcf/io/cloud_util/CloudFileUtil.h"
#include <aws/s3/S3Client.h>
#include <re2/re2.h>
#include "fbpcf/aws/S3Util.h"
#include "fbpcf/exception/PcfException.h"
#include "fbpcf/gcp/GCSUtil.h"
#include "fbpcf/io/cloud_util/GCSFileUploader.h"
#include "fbpcf/io/cloud_util/S3Client.h"
#include "fbpcf/io/cloud_util/S3FileReader.h"
#include "fbpcf/io/cloud_util/S3FileUploader.h"
Expand All @@ -17,12 +20,14 @@ namespace fbpcf::cloudio {

CloudFileType getCloudFileType(const std::string& filePath) {
// S3 file format:
// 1. https://bucket-name.s3.Region.amazonaws.com/key-name
// 2. https://bucket-name.s3-Region.amazonaws.com/key-name
// 1. https://bucket-name.s3.region.amazonaws.com/key-name
// 2. https://bucket-name.s3-region.amazonaws.com/key-name
// 3. s3://bucket-name/key-name
// GCS file format:
// 1. https://storage.cloud.google.com/bucket-name/key-name
// 2. gs://bucket-name/key-name
// 2. https://bucket-name.storage.googleapis.com/key-name
// 3. https://storage.googleapis.com/bucket-name/key-name
// 4. gs://bucket-name/key-name
static const re2::RE2 s3Regex1(
"https://[a-z0-9.-]+.s3.[a-z0-9-]+.amazonaws.com/.+");
static const re2::RE2 s3Regex2(
Expand All @@ -34,9 +39,14 @@ CloudFileType getCloudFileType(const std::string& filePath) {
return CloudFileType::S3;
}

static const re2::RE2 gcsRegex("https://storage.cloud.google.com/.*");
bool isGCSFile =
re2::RE2::FullMatch(filePath, gcsRegex) || filePath.find("gs://", 0) == 0;
static const re2::RE2 gcsRegex1("https://storage.cloud.google.com/.*");
static const re2::RE2 gcsRegex2(
"https://[a-z0-9.-]+.storage.googleapis.com/.+");
static const re2::RE2 gcsRegex3("https://storage.googleapis.com/.*");
bool isGCSFile = re2::RE2::FullMatch(filePath, gcsRegex1) ||
re2::RE2::FullMatch(filePath, gcsRegex2) ||
re2::RE2::FullMatch(filePath, gcsRegex3) ||
filePath.find("gs://", 0) == 0;
if (isGCSFile) {
return CloudFileType::GCS;
}
Expand Down Expand Up @@ -64,6 +74,13 @@ std::unique_ptr<IFileUploader> getCloudFileUploader(
fbpcf::aws::S3ClientOption{.region = ref.region})
.getS3Client(),
filePath);
} else if (fileType == CloudFileType::GCS) {
return std::make_unique<GCSFileUploader>(
fbpcf::cloudio::S3Client::getInstance(
fbpcf::aws::S3ClientOption{
.endpointOverride = "https://storage.googleapis.com/"})
.getS3Client(),
filePath);
} else {
throw fbpcf::PcfException("Not supported yet.");
}
Expand Down
125 changes: 125 additions & 0 deletions fbpcf/io/cloud_util/GCSFileUploader.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
*
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*/

#include "fbpcf/io/cloud_util/GCSFileUploader.h"
#include <aws/s3/model/AbortMultipartUploadRequest.h>
#include <aws/s3/model/CompleteMultipartUploadRequest.h>
#include <aws/s3/model/CompletedMultipartUpload.h>
#include <aws/s3/model/CreateMultipartUploadRequest.h>
#include <aws/s3/model/UploadPartRequest.h>
#include <folly/logging/xlog.h>
#include "fbpcf/aws/S3Util.h"
#include "fbpcf/exception/AwsException.h"
#include "fbpcf/gcp/GCSUtil.h"

namespace fbpcf::cloudio {

static const std::string FILE_TYPE = "text/csv";
static const int MAX_RETRY_COUNT = 3;

void GCSFileUploader::init() {
XLOG(INFO) << "Start multipart upload initialization. ";
const auto& ref = fbpcf::gcp::uriToObjectReference(filePath_);
bucket_ = ref.bucket;
key_ = ref.key;
Aws::S3::Model::CreateMultipartUploadRequest request;
request.SetBucket(bucket_);
request.SetKey(key_);
request.SetContentType(FILE_TYPE);

XLOG(INFO) << "Bucket: " << bucket_ << ", Key: " << key_;

auto createMultipartUploadOutcome =
gcsClient_->CreateMultipartUpload(request);

if (createMultipartUploadOutcome.IsSuccess()) {
uploadId_ = createMultipartUploadOutcome.GetResult().GetUploadId();
XLOG(INFO) << "Multipart upload initialization succeed. Upload id is: "
<< uploadId_;
} else {
XLOG(ERR) << createMultipartUploadOutcome.GetError();
throw AwsException{
"Multipart upload initialization failed: " +
createMultipartUploadOutcome.GetError().GetMessage()};
}
}

int GCSFileUploader::upload(std::vector<char>& buf) {
XLOG(INFO) << "Start uploading part:"
<< "Part number: " << partNumber_ << "\nBucket: " << bucket_
<< "\nKey: " << key_;
Aws::S3::Model::UploadPartRequest request;
request.SetBucket(bucket_);
request.SetKey(key_);
request.SetUploadId(uploadId_);
request.SetPartNumber(partNumber_);
request.SetContentLength(buf.size());

Aws::String str(buf.begin(), buf.end());
auto inputData = Aws::MakeShared<Aws::StringStream>("UploadPartStream", str);
request.SetBody(inputData);
XLOG(INFO) << "Upload stream size: " << str.size();

auto uploadPartResult = gcsClient_->UploadPart(request);
int retryCount = 0;
while (!uploadPartResult.IsSuccess() && retryCount < MAX_RETRY_COUNT) {
XLOG(INFO) << "Upload part " << partNumber_ << " failed. Retrying...";
uploadPartResult = gcsClient_->UploadPart(request);
retryCount++;
}

if (uploadPartResult.IsSuccess()) {
XLOG(INFO) << "Upload part " << partNumber_ << " succeeed.";
Aws::S3::Model::CompletedPart part;
part.SetPartNumber(request.GetPartNumber());
part.SetETag(uploadPartResult.GetResult().GetETag());
completedParts_.push_back(part);
partNumber_++;
return str.size();
} else {
XLOG(INFO) << "Upload part " << partNumber_ << " failed. Aborting...";
abortUpload();
return 0;
}
}

int GCSFileUploader::complete() {
Aws::S3::Model::CompleteMultipartUploadRequest request;
request.SetBucket(bucket_);
request.SetKey(key_);
request.SetUploadId(uploadId_);
request.SetMultipartUpload(
Aws::S3::Model::CompletedMultipartUpload().WithParts(completedParts_));

auto completeMultipartUploadResult =
gcsClient_->CompleteMultipartUpload(request);
if (completeMultipartUploadResult.IsSuccess()) {
XLOG(INFO) << "File " << filePath_ << " uploaded successfully.";
return 0;
} else {
XLOG(ERR) << "File " << filePath_ << " failed to upload.";
XLOG(ERR) << "Error: " << completeMultipartUploadResult.GetError();
abortUpload();
return -1;
}
}

void GCSFileUploader::abortUpload() {
Aws::S3::Model::AbortMultipartUploadRequest abortRequest;
abortRequest.SetBucket(bucket_);
abortRequest.SetKey(key_);
abortRequest.SetUploadId(uploadId_);
auto abortMultipartUploadResult =
gcsClient_->AbortMultipartUpload(abortRequest);
if (abortMultipartUploadResult.IsSuccess()) {
XLOG(INFO) << "Abort upload successed. ";
} else {
XLOG(ERR) << "Abort upload failed. Upload ID: " + uploadId_;
}
}

} // namespace fbpcf::cloudio
37 changes: 37 additions & 0 deletions fbpcf/io/cloud_util/GCSFileUploader.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
*
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*/

#pragma once
#include <aws/s3/S3Client.h>
#include <aws/s3/model/CompletedPart.h>
#include <memory>
#include "fbpcf/io/cloud_util/IFileUploader.h"

namespace fbpcf::cloudio {
class GCSFileUploader : public IFileUploader {
public:
explicit GCSFileUploader(
std::shared_ptr<Aws::S3::S3Client> client,
const std::string& filePath)
: gcsClient_{std::move(client)}, filePath_{filePath} {
init();
}
int upload(std::vector<char>& buf) override;
int complete() override;

private:
void init() override;
void abortUpload();
std::shared_ptr<Aws::S3::S3Client> gcsClient_;
const std::string filePath_;
std::string bucket_;
std::string key_;
std::string uploadId_;
std::size_t partNumber_ = 1;
Aws::Vector<Aws::S3::Model::CompletedPart> completedParts_;
};
} // namespace fbpcf::cloudio
10 changes: 9 additions & 1 deletion fbpcf/io/cloud_util/test/CloudFileUtilTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,17 @@ TEST(FileManagerUtilTest, TestGetCloudFileType) {
getCloudFileType("https://storage.cloud.google.com/bucket-name/key-name");
EXPECT_EQ(CloudFileType::GCS, gcsType1);

auto gcsType2 = getCloudFileType("gs://bucket-name/key-name");
auto gcsType2 =
getCloudFileType("https://bucket-name.storage.googleapis.com/key-name");
EXPECT_EQ(CloudFileType::GCS, gcsType2);

auto gcsType3 =
getCloudFileType("https://storage.googleapis.com/bucket-name/key-name");
EXPECT_EQ(CloudFileType::GCS, gcsType3);

auto gcsType4 = getCloudFileType("gs://bucket-name/key-name");
EXPECT_EQ(CloudFileType::GCS, gcsType4);

auto unkonwnType =
getCloudFileType("https://storage.test.com/bucket-name/key-name");
EXPECT_EQ(CloudFileType::UNKNOWN, unkonwnType);
Expand Down