-
-
Notifications
You must be signed in to change notification settings - Fork 134
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add SimpleS3Client::copy #1592
base: master
Are you sure you want to change the base?
Add SimpleS3Client::copy #1592
Changes from all commits
d8e70c1
c66b0f9
d5d8e83
7587b8e
90121f1
83ac70a
132c58e
1575060
fd8f7fb
54091e1
57666cb
d67d689
91447f5
6bf1c47
6a065bd
a350cb5
b1e9178
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -4,13 +4,21 @@ | |||||
|
||||||
namespace AsyncAws\SimpleS3; | ||||||
|
||||||
use AsyncAws\Core\Exception\UnexpectedValue; | ||||||
use AsyncAws\Core\Stream\FixedSizeStream; | ||||||
use AsyncAws\Core\Stream\ResultStream; | ||||||
use AsyncAws\Core\Stream\StreamFactory; | ||||||
use AsyncAws\S3\Input\AbortMultipartUploadRequest; | ||||||
use AsyncAws\S3\Input\CompleteMultipartUploadRequest; | ||||||
use AsyncAws\S3\Input\CopyObjectRequest; | ||||||
use AsyncAws\S3\Input\CreateMultipartUploadRequest; | ||||||
use AsyncAws\S3\Input\GetObjectRequest; | ||||||
use AsyncAws\S3\Input\UploadPartCopyRequest; | ||||||
use AsyncAws\S3\Result\UploadPartCopyOutput; | ||||||
use AsyncAws\S3\S3Client; | ||||||
use AsyncAws\S3\ValueObject\CompletedMultipartUpload; | ||||||
use AsyncAws\S3\ValueObject\CompletedPart; | ||||||
use AsyncAws\S3\ValueObject\CopyPartResult; | ||||||
|
||||||
/** | ||||||
* A simplified S3 client that hides some of the complexity of working with S3. | ||||||
|
@@ -47,6 +55,114 @@ public function has(string $bucket, string $key): bool | |||||
return $this->objectExists(['Bucket' => $bucket, 'Key' => $key])->isSuccess(); | ||||||
} | ||||||
|
||||||
/** | ||||||
* @param array{ | ||||||
* ACL?: \AsyncAws\S3\Enum\ObjectCannedACL::*, | ||||||
* CacheControl?: string, | ||||||
* Metadata?: array<string, string>, | ||||||
* PartSize?: positive-int, | ||||||
* Concurrency?: positive-int, | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. please use |
||||||
* mupThreshold?: positive-int, | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For consistency
Suggested change
|
||||||
* } $options | ||||||
*/ | ||||||
public function copy(string $srcBucket, string $srcKey, string $destBucket, string $destKey, array $options = []): void | ||||||
{ | ||||||
$megabyte = 1024 * 1024; | ||||||
$sourceHead = $this->headObject(['Bucket' => $srcBucket, 'Key' => $srcKey]); | ||||||
$contentLength = (int) $sourceHead->getContentLength(); | ||||||
$options['ContentType'] = $sourceHead->getContentType(); | ||||||
$concurrency = (int) ($options['Concurrency'] ?? 10); | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. no need to cast, as it's an int by contract. Let people deal with PHP runtime errors if they misuse the method. |
||||||
$mupThreshold = ((int) ($options['mupThreshold'] ?? 2 * 1024)) * $megabyte; | ||||||
unset($options['Concurrency'], $options['mupThreshold']); | ||||||
/* | ||||||
* The maximum number of parts is 10.000. The partSize must be a power of 2. | ||||||
* We default this to 64MB per part. That means that we only support to copy | ||||||
* files smaller than 64 * 10 000 = 640GB. If you are coping larger files, | ||||||
* please set PartSize to a higher number, like 128, 256 or 512. (Max 4096). | ||||||
*/ | ||||||
$partSize = ($options['PartSize'] ?? 64) * $megabyte; | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If PartSize is not defined, I'd rather default to |
||||||
unset($options['PartSize']); | ||||||
|
||||||
// If file is less than multipart upload threshold, use normal atomic copy | ||||||
if ($contentLength < $mupThreshold) { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. do we really need a parameter for this? |
||||||
$this->copyObject( | ||||||
CopyObjectRequest::create( | ||||||
array_merge($options, ['Bucket' => $destBucket, 'Key' => $destKey, 'CopySource' => "{$srcBucket}/{$srcKey}"]) | ||||||
) | ||||||
); | ||||||
|
||||||
return; | ||||||
} | ||||||
|
||||||
$uploadId = $this->createMultipartUpload( | ||||||
CreateMultipartUploadRequest::create( | ||||||
array_merge($options, ['Bucket' => $destBucket, 'Key' => $destKey]) | ||||||
) | ||||||
)->getUploadId(); | ||||||
if (!$uploadId) { | ||||||
throw new UnexpectedValue('UploadId can not be obtained'); | ||||||
} | ||||||
|
||||||
$partNumber = 1; | ||||||
$startByte = 0; | ||||||
$parts = []; | ||||||
while ($startByte < $contentLength) { | ||||||
$parallelChunks = $concurrency; | ||||||
/** @var UploadPartCopyOutput[] $responses */ | ||||||
$responses = []; | ||||||
while ($startByte < $contentLength && $parallelChunks > 0) { | ||||||
$endByte = min($startByte + $partSize, $contentLength) - 1; | ||||||
$responses[$partNumber] = $this->uploadPartCopy( | ||||||
UploadPartCopyRequest::create([ | ||||||
stof marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
'Bucket' => $destBucket, | ||||||
'Key' => $destKey, | ||||||
'UploadId' => $uploadId, | ||||||
'CopySource' => "{$srcBucket}/{$srcKey}", | ||||||
'CopySourceRange' => "bytes={$startByte}-{$endByte}", | ||||||
'PartNumber' => $partNumber, | ||||||
]) | ||||||
); | ||||||
|
||||||
$startByte += $partSize; | ||||||
++$partNumber; | ||||||
--$parallelChunks; | ||||||
} | ||||||
$error = null; | ||||||
foreach ($responses as $idx => $response) { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is not efficient. We define 10 concurents upload, but in fact, we are uploading 10, waiting for the 10 to finish, and uploading 10 more, ...
A Beter implementation would be to use a pool (or buffer), to fill the pool with 10 responses, and once 1 response is over, add a new one before checking the other responses. Better: leverage async processing, by not waiting for the end of a response, but checking if the response is over, if not check the next one.
|
||||||
try { | ||||||
/** @var CopyPartResult $copyPartResult */ | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is totally useless. The getter already define its return type properly. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If the static analysis or the IDE does not properly detect the type of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. here is the same situation, |
||||||
$copyPartResult = $response->getCopyPartResult(); | ||||||
$parts[] = new CompletedPart(['ETag' => $copyPartResult->getEtag(), 'PartNumber' => $idx]); | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The weird thing about the PossiblyNullReference error reported by Psalm here is that https://docs.aws.amazon.com/AmazonS3/latest/API/API_UploadPartCopy.html#API_UploadPartCopy_ResponseElements documents CopyPartResult as required in the output. So it is weird that the operation was generated with a nullable type. Maybe the AWS SDK has metadata that does not match the documentation there. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It seems there are no "required" fields for outputs There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I suggest reporting this to AWS as it looks like a mismatch between the human-readable documentation and the machine-readable data used to generate the SDKs. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what should I do? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @stof any updates on this? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hi @stof Considering the delay in fixing the manifests, I'd like to propose a potential solution. Would it be feasible to remove the annotations related to this issue from the code and handle the error suppression in the Psalm baseline instead? I've come across a few similar issues in the baseline that could potentially benefit from this approach. I believe this adjustment could provide a temporary workaround while we await a resolution from the AWS side. However, I'm open to discussing alternative solutions or any concerns you may have regarding this proposal. |
||||||
} catch (\Throwable $e) { | ||||||
$error = $e; | ||||||
|
||||||
break; | ||||||
} | ||||||
Comment on lines
+136
to
+140
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. you could move the try/catch outside the loop for more readability, and removing the error variable.
|
||||||
} | ||||||
if ($error) { | ||||||
foreach ($responses as $response) { | ||||||
try { | ||||||
$response->cancel(); | ||||||
} catch (\Throwable $e) { | ||||||
continue; | ||||||
} | ||||||
} | ||||||
$this->abortMultipartUpload(AbortMultipartUploadRequest::create(['Bucket' => $destBucket, 'Key' => $destKey, 'UploadId' => $uploadId])); | ||||||
|
||||||
throw $error; | ||||||
} | ||||||
} | ||||||
|
||||||
$this->completeMultipartUpload( | ||||||
CompleteMultipartUploadRequest::create([ | ||||||
'Bucket' => $destBucket, | ||||||
'Key' => $destKey, | ||||||
'UploadId' => $uploadId, | ||||||
'MultipartUpload' => new CompletedMultipartUpload(['Parts' => $parts]), | ||||||
]) | ||||||
); | ||||||
} | ||||||
|
||||||
/** | ||||||
* @param string|resource|(callable(int): string)|iterable<string> $object | ||||||
* @param array{ | ||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Options parameters deserve a small documentation IMHO