Skip to content

Commit

Permalink
fixes transfer manager checksum issues
Browse files Browse the repository at this point in the history
  • Loading branch information
sbiscigl committed Dec 8, 2023
1 parent a380685 commit 748c664
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,8 @@ namespace Aws
mutable Aws::UnorderedSet<std::shared_ptr<TransferHandle>> m_tasks;
mutable std::condition_variable m_tasksSignal;
mutable std::mutex m_tasksMutex;
mutable std::mutex m_createPutObjectRequestMutex;
mutable std::mutex m_createMpuRequstMutex;

protected:
static bool IsWithinParentDirectory(Aws::String parentDirectory, Aws::String filePath);
Expand Down
31 changes: 12 additions & 19 deletions src/aws-cpp-sdk-transfer/source/transfer/TransferManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -361,16 +361,15 @@ namespace Aws
if (!isRetry)
{
Aws::S3::Model::CreateMultipartUploadRequest createMultipartRequest = m_transferConfig.createMultipartUploadTemplate
.WithChecksumAlgorithm(m_transferConfig.computeContentMD5
? S3::Model::ChecksumAlgorithm::NOT_SET
: m_transferConfig.checksumAlgorithm)
.WithCustomizedAccessLogTag(m_transferConfig.customizedAccessLogTag)
.WithBucket(handle->GetBucketName())
.WithContentType(handle->GetContentType())
.WithKey(handle->GetKey())
.WithMetadata(handle->GetMetadata());

if (!m_transferConfig.computeContentMD5) {
createMultipartRequest.SetChecksumAlgorithm(m_transferConfig.checksumAlgorithm);
}

auto createMultipartResponse = m_transferConfig.s3Client->CreateMultipartUpload(createMultipartRequest);
if (createMultipartResponse.IsSuccess())
{
Expand Down Expand Up @@ -434,6 +433,7 @@ namespace Aws
auto buffer = m_bufferManager.Acquire();
if(handle->ShouldContinue())
{
std::unique_lock<std::mutex> partLock(m_createMpuRequstMutex);
auto lengthToWrite = partsIter->second->GetSizeInBytes();
streamToPut->seekg((partsIter->first - 1) * m_transferConfig.bufferSize);
streamToPut->read(reinterpret_cast<char*>(buffer), lengthToWrite);
Expand All @@ -444,17 +444,16 @@ namespace Aws
auto self = shared_from_this(); // keep transfer manager alive until all callbacks are finished.
PartPointer partPtr = partsIter->second;
Aws::S3::Model::UploadPartRequest uploadPartRequest = m_transferConfig.uploadPartTemplate
.WithChecksumAlgorithm(m_transferConfig.computeContentMD5
? S3::Model::ChecksumAlgorithm::NOT_SET
: m_transferConfig.checksumAlgorithm)
.WithCustomizedAccessLogTag(m_transferConfig.customizedAccessLogTag)
.WithBucket(handle->GetBucketName())
.WithContentLength(static_cast<long long>(lengthToWrite))
.WithKey(handle->GetKey())
.WithPartNumber(partsIter->first)
.WithUploadId(handle->GetMultiPartId());

if (!m_transferConfig.computeContentMD5) {
uploadPartRequest.SetChecksumAlgorithm(m_transferConfig.checksumAlgorithm);
}

uploadPartRequest.SetContinueRequestHandler([handle](const Aws::Http::HttpRequest*) { return handle->ShouldContinue(); });
uploadPartRequest.SetDataSentEventHandler([self, handle, partPtr](const Aws::Http::HttpRequest*, long long amount){ partPtr->OnDataTransferred(amount, handle); self->TriggerUploadProgressCallback(handle); });
uploadPartRequest.SetRequestRetryHandler([partPtr](const AmazonWebServiceRequest&){ partPtr->Reset(); });
Expand All @@ -463,9 +462,6 @@ namespace Aws

uploadPartRequest.SetBody(preallocatedStreamReader);
uploadPartRequest.SetContentType(handle->GetContentType());
if (m_transferConfig.computeContentMD5) {
uploadPartRequest.SetContentMD5(Aws::Utils::HashingUtils::Base64Encode(Aws::Utils::HashingUtils::CalculateMD5(*preallocatedStreamReader)));
}

auto asyncContext = Aws::MakeShared<TransferHandleAsyncContext>(CLASS_TAG);
asyncContext->handle = handle;
Expand Down Expand Up @@ -518,6 +514,7 @@ namespace Aws

void TransferManager::DoSinglePartUpload(const std::shared_ptr<Aws::IOStream>& streamToPut, const std::shared_ptr<TransferHandle>& handle)
{
std::unique_lock<std::mutex> singlePartLock(m_createPutObjectRequestMutex);
auto partState = Aws::MakeShared<PartState>(CLASS_TAG, 1, 0, handle->GetBytesTotalSize(), true);

handle->UpdateStatus(TransferStatus::IN_PROGRESS);
Expand All @@ -526,7 +523,9 @@ namespace Aws
TriggerTransferStatusUpdatedCallback(handle);

auto putObjectRequest = m_transferConfig.putObjectTemplate
.WithChecksumAlgorithm(m_transferConfig.checksumAlgorithm)
.WithChecksumAlgorithm(m_transferConfig.computeContentMD5
? S3::Model::ChecksumAlgorithm::NOT_SET
: m_transferConfig.checksumAlgorithm)
.WithBucket(handle->GetBucketName())
.WithKey(handle->GetKey())
.WithContentLength(static_cast<long long>(handle->GetBytesTotalSize()))
Expand All @@ -545,9 +544,6 @@ namespace Aws
auto preallocatedStreamReader = Aws::MakeShared<Aws::IOStream>(CLASS_TAG, streamBuf);

putObjectRequest.SetBody(preallocatedStreamReader);
if (m_transferConfig.computeContentMD5) {
putObjectRequest.SetContentMD5(Aws::Utils::HashingUtils::Base64Encode(Aws::Utils::HashingUtils::CalculateMD5(*preallocatedStreamReader)));
}

auto self = shared_from_this(); // keep transfer manager alive until all callbacks are finished.
auto uploadProgressCallback = [self, partState, handle](const Aws::Http::HttpRequest*, long long progress)
Expand Down Expand Up @@ -666,10 +662,7 @@ namespace Aws
.WithPartNumber(part.first)
.WithETag(part.second->GetETag());

if(!m_transferConfig.computeContentMD5) {
SetChecksumForAlgorithm(part.second, completedPart);
}

SetChecksumForAlgorithm(part.second, completedPart);
completedUpload.AddParts(completedPart);
}

Expand Down

0 comments on commit 748c664

Please sign in to comment.