diff --git a/docs/integration/simple-s3.md b/docs/integration/simple-s3.md index 70a68fe70..bdc8993bd 100644 --- a/docs/integration/simple-s3.md +++ b/docs/integration/simple-s3.md @@ -32,6 +32,9 @@ $resource = \fopen('/path/to/cat/image.jpg', 'r'); $s3->upload('my-image-bucket', 'photos/cat_2.jpg', $resource); $s3->upload('my-image-bucket', 'photos/cat_2.txt', 'I like this cat'); +// Copy objects between buckets +$s3->copy('source-bucket', 'source-key', 'destination-bucket', 'destination-key'); + // Check if a file exists $s3->has('my-image-bucket', 'photos/cat_2.jpg'); // true diff --git a/src/Integration/Aws/SimpleS3/CHANGELOG.md b/src/Integration/Aws/SimpleS3/CHANGELOG.md index 1f219e08e..9e2f1c79b 100644 --- a/src/Integration/Aws/SimpleS3/CHANGELOG.md +++ b/src/Integration/Aws/SimpleS3/CHANGELOG.md @@ -8,6 +8,10 @@ - Upgrade to `async-aws/s3` 2.0 +### Added + +- Added `SimpleS3Client::copy()` method + ## 1.1.1 ### Changed diff --git a/src/Integration/Aws/SimpleS3/composer.json b/src/Integration/Aws/SimpleS3/composer.json index a56241c2c..aa6f063b5 100644 --- a/src/Integration/Aws/SimpleS3/composer.json +++ b/src/Integration/Aws/SimpleS3/composer.json @@ -13,7 +13,7 @@ "require": { "php": "^7.2.5 || ^8.0", "ext-json": "*", - "async-aws/s3": "^2.0" + "async-aws/s3": "^2.1" }, "autoload": { "psr-4": { diff --git a/src/Integration/Aws/SimpleS3/src/SimpleS3Client.php b/src/Integration/Aws/SimpleS3/src/SimpleS3Client.php index 2b62227fb..9f21abf0d 100644 --- a/src/Integration/Aws/SimpleS3/src/SimpleS3Client.php +++ b/src/Integration/Aws/SimpleS3/src/SimpleS3Client.php @@ -4,13 +4,21 @@ namespace AsyncAws\SimpleS3; +use AsyncAws\Core\Exception\UnexpectedValue; use AsyncAws\Core\Stream\FixedSizeStream; use AsyncAws\Core\Stream\ResultStream; use AsyncAws\Core\Stream\StreamFactory; +use AsyncAws\S3\Input\AbortMultipartUploadRequest; +use AsyncAws\S3\Input\CompleteMultipartUploadRequest; +use AsyncAws\S3\Input\CopyObjectRequest; +use AsyncAws\S3\Input\CreateMultipartUploadRequest; use AsyncAws\S3\Input\GetObjectRequest; +use AsyncAws\S3\Input\UploadPartCopyRequest; +use AsyncAws\S3\Result\UploadPartCopyOutput; use AsyncAws\S3\S3Client; use AsyncAws\S3\ValueObject\CompletedMultipartUpload; use AsyncAws\S3\ValueObject\CompletedPart; +use AsyncAws\S3\ValueObject\CopyPartResult; /** * A simplified S3 client that hides some of the complexity of working with S3. @@ -47,6 +55,114 @@ public function has(string $bucket, string $key): bool return $this->objectExists(['Bucket' => $bucket, 'Key' => $key])->isSuccess(); } + /** + * @param array{ + * ACL?: \AsyncAws\S3\Enum\ObjectCannedACL::*, + * CacheControl?: string, + * Metadata?: array, + * PartSize?: positive-int, + * Concurrency?: positive-int, + * mupThreshold?: positive-int, + * } $options + */ + public function copy(string $srcBucket, string $srcKey, string $destBucket, string $destKey, array $options = []): void + { + $megabyte = 1024 * 1024; + $sourceHead = $this->headObject(['Bucket' => $srcBucket, 'Key' => $srcKey]); + $contentLength = (int) $sourceHead->getContentLength(); + $options['ContentType'] = $sourceHead->getContentType(); + $concurrency = (int) ($options['Concurrency'] ?? 10); + $mupThreshold = ((int) ($options['mupThreshold'] ?? 2 * 1024)) * $megabyte; + unset($options['Concurrency'], $options['mupThreshold']); + /* + * The maximum number of parts is 10.000. The partSize must be a power of 2. + * We default this to 64MB per part. That means that we only support to copy + * files smaller than 64 * 10 000 = 640GB. If you are coping larger files, + * please set PartSize to a higher number, like 128, 256 or 512. (Max 4096). + */ + $partSize = ($options['PartSize'] ?? 64) * $megabyte; + unset($options['PartSize']); + + // If file is less than multipart upload threshold, use normal atomic copy + if ($contentLength < $mupThreshold) { + $this->copyObject( + CopyObjectRequest::create( + array_merge($options, ['Bucket' => $destBucket, 'Key' => $destKey, 'CopySource' => "{$srcBucket}/{$srcKey}"]) + ) + ); + + return; + } + + $uploadId = $this->createMultipartUpload( + CreateMultipartUploadRequest::create( + array_merge($options, ['Bucket' => $destBucket, 'Key' => $destKey]) + ) + )->getUploadId(); + if (!$uploadId) { + throw new UnexpectedValue('UploadId can not be obtained'); + } + + $partNumber = 1; + $startByte = 0; + $parts = []; + while ($startByte < $contentLength) { + $parallelChunks = $concurrency; + /** @var UploadPartCopyOutput[] $responses */ + $responses = []; + while ($startByte < $contentLength && $parallelChunks > 0) { + $endByte = min($startByte + $partSize, $contentLength) - 1; + $responses[$partNumber] = $this->uploadPartCopy( + UploadPartCopyRequest::create([ + 'Bucket' => $destBucket, + 'Key' => $destKey, + 'UploadId' => $uploadId, + 'CopySource' => "{$srcBucket}/{$srcKey}", + 'CopySourceRange' => "bytes={$startByte}-{$endByte}", + 'PartNumber' => $partNumber, + ]) + ); + + $startByte += $partSize; + ++$partNumber; + --$parallelChunks; + } + $error = null; + foreach ($responses as $idx => $response) { + try { + /** @var CopyPartResult $copyPartResult */ + $copyPartResult = $response->getCopyPartResult(); + $parts[] = new CompletedPart(['ETag' => $copyPartResult->getEtag(), 'PartNumber' => $idx]); + } catch (\Throwable $e) { + $error = $e; + + break; + } + } + if ($error) { + foreach ($responses as $response) { + try { + $response->cancel(); + } catch (\Throwable $e) { + continue; + } + } + $this->abortMultipartUpload(AbortMultipartUploadRequest::create(['Bucket' => $destBucket, 'Key' => $destKey, 'UploadId' => $uploadId])); + + throw $error; + } + } + + $this->completeMultipartUpload( + CompleteMultipartUploadRequest::create([ + 'Bucket' => $destBucket, + 'Key' => $destKey, + 'UploadId' => $uploadId, + 'MultipartUpload' => new CompletedMultipartUpload(['Parts' => $parts]), + ]) + ); + } + /** * @param string|resource|(callable(int): string)|iterable $object * @param array{ diff --git a/src/Integration/Aws/SimpleS3/tests/Unit/SimpleS3ClientTest.php b/src/Integration/Aws/SimpleS3/tests/Unit/SimpleS3ClientTest.php index 32c00a92a..56afe57fe 100644 --- a/src/Integration/Aws/SimpleS3/tests/Unit/SimpleS3ClientTest.php +++ b/src/Integration/Aws/SimpleS3/tests/Unit/SimpleS3ClientTest.php @@ -6,7 +6,11 @@ use AsyncAws\Core\Credentials\NullProvider; use AsyncAws\Core\Test\ResultMockFactory; +use AsyncAws\S3\Input\CompleteMultipartUploadRequest; use AsyncAws\S3\Result\CreateMultipartUploadOutput; +use AsyncAws\S3\Result\HeadObjectOutput; +use AsyncAws\S3\Result\UploadPartCopyOutput; +use AsyncAws\S3\ValueObject\CopyPartResult; use AsyncAws\SimpleS3\SimpleS3Client; use PHPUnit\Framework\TestCase; use Symfony\Component\HttpClient\MockHttpClient; @@ -137,6 +141,60 @@ public function testUploadSmallFileEmptyClosure() }); } + public function testCopySmallFile() + { + $megabyte = 1024 * 1024; + $s3 = $this->getMockBuilder(SimpleS3Client::class) + ->disableOriginalConstructor() + ->onlyMethods(['createMultipartUpload', 'abortMultipartUpload', 'copyObject', 'completeMultipartUpload', 'headObject']) + ->getMock(); + + $s3->expects(self::never())->method('createMultipartUpload'); + $s3->expects(self::never())->method('abortMultipartUpload'); + $s3->expects(self::never())->method('completeMultipartUpload'); + $s3->expects(self::once())->method('copyObject'); + $s3->expects(self::once())->method('headObject') + ->willReturn(ResultMockFactory::create(HeadObjectOutput::class, ['ContentLength' => 50 * $megabyte])); + + $s3->copy('bucket', 'robots.txt', 'bucket', 'copy-robots.txt'); + } + + public function testCopyLargeFile() + { + $megabyte = 1024 * 1024; + $uploadedParts = 0; + $completedParts = 0; + + $s3 = $this->getMockBuilder(SimpleS3Client::class) + ->disableOriginalConstructor() + ->onlyMethods(['createMultipartUpload', 'abortMultipartUpload', 'copyObject', 'completeMultipartUpload', 'uploadPartCopy', 'headObject']) + ->getMock(); + + $s3->expects(self::once())->method('createMultipartUpload') + ->willReturn(ResultMockFactory::create(CreateMultipartUploadOutput::class, ['UploadId' => '4711'])); + $s3->expects(self::never())->method('abortMultipartUpload'); + $s3->expects(self::never())->method('copyObject'); + $s3->expects(self::any())->method('uploadPartCopy') + ->with(self::callback(function () use (&$uploadedParts) { + ++$uploadedParts; + + return true; + })) + ->willReturn(ResultMockFactory::create(UploadPartCopyOutput::class, ['copyPartResult' => new CopyPartResult(['ETag' => 'etag-4711'])])); + $s3->expects(self::once())->method('completeMultipartUpload')->with(self::callback(function (CompleteMultipartUploadRequest $request) use (&$completedParts) { + $completedParts = \count($request->getMultipartUpload()->getParts()); + + return true; + })); + $s3->expects(self::once()) + ->method('headObject') + ->willReturn(ResultMockFactory::create(HeadObjectOutput::class, ['ContentLength' => 6144 * $megabyte])); + + $s3->copy('bucket', 'robots.txt', 'bucket', 'copy-robots.txt'); + + self::assertEquals($completedParts, $uploadedParts); + } + private function assertSmallFileUpload(\Closure $callback, string $bucket, string $file, $object): void { $s3 = $this->getMockBuilder(SimpleS3Client::class)