Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Aws request interceptors for S3 uploads #1131

Merged
merged 1 commit into from
Nov 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 56 additions & 0 deletions fs2-aws-s3/src/main/scala/fs2/aws/s3/AwsRequestModifier.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package fs2.aws.s3

import software.amazon.awssdk.services.s3.model.{
AbortMultipartUploadRequest,
CompleteMultipartUploadRequest,
CreateMultipartUploadRequest,
PutObjectRequest,
UploadPartRequest
}

object AwsRequestModifier {
trait MultipartUpload {
def createMultipartUpload(
b: CreateMultipartUploadRequest.Builder
): CreateMultipartUploadRequest.Builder

def uploadPart(b: UploadPartRequest.Builder): UploadPartRequest.Builder

def completeMultipartUpload(
b: CompleteMultipartUploadRequest.Builder
): CompleteMultipartUploadRequest.Builder

def abortMultipartUpload(
b: AbortMultipartUploadRequest.Builder
): AbortMultipartUploadRequest.Builder
}

object MultipartUpload {
def identity: MultipartUpload = new MultipartUpload {

def createMultipartUpload(
b: CreateMultipartUploadRequest.Builder
): CreateMultipartUploadRequest.Builder = b

def uploadPart(b: UploadPartRequest.Builder): UploadPartRequest.Builder = b

def completeMultipartUpload(
b: CompleteMultipartUploadRequest.Builder
): CompleteMultipartUploadRequest.Builder = b

def abortMultipartUpload(
b: AbortMultipartUploadRequest.Builder
): AbortMultipartUploadRequest.Builder = b
}
}

trait Upload1 {
def putObject(b: PutObjectRequest.Builder): PutObjectRequest.Builder
}

object Upload1 {
def identity: Upload1 = new Upload1 {
def putObject(b: PutObjectRequest.Builder): PutObjectRequest.Builder = b
}
}
}
76 changes: 49 additions & 27 deletions fs2-aws-s3/src/main/scala/fs2/aws/s3/S3.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,19 @@ import scala.jdk.CollectionConverters.*
/* A purely functional abstraction over the S3 API based on fs2.Stream */
trait S3[F[_]] {
def delete(bucket: BucketName, key: FileKey): F[Unit]
def uploadFile(bucket: BucketName, key: FileKey): Pipe[F, Byte, ETag]
def uploadFile(
bucket: BucketName,
key: FileKey,
awsRequestModifier: AwsRequestModifier.Upload1 = AwsRequestModifier.Upload1.identity
): Pipe[F, Byte, ETag]

def uploadFileMultipart(
bucket: BucketName,
key: FileKey,
partSize: PartSizeMB,
uploadEmptyFiles: UploadEmptyFiles = false,
multiPartConcurrency: MultiPartConcurrency = 10
multiPartConcurrency: MultiPartConcurrency = 10,
requestModifier: AwsRequestModifier.MultipartUpload = AwsRequestModifier.MultipartUpload.identity
): Pipe[F, Byte, Option[ETag]]
def readFile(bucket: BucketName, key: FileKey): fs2.Stream[F, Byte]

Expand Down Expand Up @@ -60,13 +65,13 @@ object S3 {
*
* For big files, consider using [[uploadFileMultipart]] instead.
*/
def uploadFile(bucket: BucketName, key: FileKey): Pipe[F, Byte, ETag] =
def uploadFile(bucket: BucketName, key: FileKey, modifier: AwsRequestModifier.Upload1): Pipe[F, Byte, ETag] =
in =>
fs2.Stream.eval {
in.compile.toVector.flatMap { vs =>
val bs = ByteBuffer.wrap(vs.toArray)
s3.putObject(
PutObjectRequest.builder().bucket(bucket.value).key(key.value).build(),
modifier.putObject(PutObjectRequest.builder().bucket(bucket.value).key(key.value)).build(),
AsyncRequestBody.fromByteBuffer(bs)
).map(_.eTag())
}
Expand Down Expand Up @@ -99,25 +104,31 @@ object S3 {
key: FileKey,
partSize: PartSizeMB,
uploadEmptyFiles: UploadEmptyFiles,
multiPartConcurrency: MultiPartConcurrency
multiPartConcurrency: MultiPartConcurrency,
requestModifier: AwsRequestModifier.MultipartUpload
): Pipe[F, Byte, Option[ETag]] = {
val chunkSizeBytes = partSize * 1048576

def initiateMultipartUpload: F[UploadId] =
s3.createMultipartUpload(
CreateMultipartUploadRequest.builder().bucket(bucket.value).key(key.value).build()
requestModifier
.createMultipartUpload(CreateMultipartUploadRequest.builder().bucket(bucket.value).key(key.value))
.build()
).map(_.uploadId())

def uploadPart(uploadId: UploadId): Pipe[F, (Chunk[Byte], Long), (PartETag, PartId)] =
_.parEvalMap(multiPartConcurrency) { case (c, i) =>
s3.uploadPart(
UploadPartRequest
.builder()
.bucket(bucket.value)
.key(key.value)
.uploadId(uploadId)
.partNumber(i.toInt)
.contentLength(c.size.toLong)
requestModifier
.uploadPart(
UploadPartRequest
.builder()
.bucket(bucket.value)
.key(key.value)
.uploadId(uploadId)
.partNumber(i.toInt)
.contentLength(c.size.toLong)
)
.build(),
AsyncRequestBody.fromBytes(c.toArray)
).map(_.eTag() -> i.toInt)
Expand All @@ -141,24 +152,30 @@ object S3 {
CompletedPart.builder().partNumber(i).eTag(t).build()
}.asJava
s3.completeMultipartUpload(
CompleteMultipartUploadRequest
.builder()
.bucket(bucket.value)
.key(key.value)
.uploadId(uploadId)
.multipartUpload(CompletedMultipartUpload.builder().parts(parts).build())
requestModifier
.completeMultipartUpload(
CompleteMultipartUploadRequest
.builder()
.bucket(bucket.value)
.key(key.value)
.uploadId(uploadId)
.multipartUpload(CompletedMultipartUpload.builder().parts(parts).build())
)
.build()
).map(eTag => Option(eTag.eTag()))

}

def cancelUpload(uploadId: UploadId): F[Unit] =
s3.abortMultipartUpload(
AbortMultipartUploadRequest
.builder()
.bucket(bucket.value)
.key(key.value)
.uploadId(uploadId)
requestModifier
.abortMultipartUpload(
AbortMultipartUploadRequest
.builder()
.bucket(bucket.value)
.key(key.value)
.uploadId(uploadId)
)
.build()
).void

Expand Down Expand Up @@ -255,17 +272,22 @@ object S3 {
override def delete(bucket: BucketName, key: FileKey): G[Unit] =
fToG(s3.delete(bucket, key))

override def uploadFile(bucket: BucketName, key: FileKey): Pipe[G, Byte, ETag] =
override def uploadFile(
bucket: BucketName,
key: FileKey,
modifier: AwsRequestModifier.Upload1
): Pipe[G, Byte, ETag] =
_.translate(gToF)
.through(s3.uploadFile(bucket, key))
.through(s3.uploadFile(bucket, key, modifier))
.translate(fToG)

override def uploadFileMultipart(
bucket: BucketName,
key: FileKey,
partSize: PartSizeMB,
uploadEmptyFiles: UploadEmptyFiles,
multiPartConcurrency: MultiPartConcurrency = 10
multiPartConcurrency: MultiPartConcurrency = 10,
requestModifier: AwsRequestModifier.MultipartUpload = AwsRequestModifier.MultipartUpload.identity
): Pipe[G, Byte, Option[ETag]] =
_.translate(gToF)
.through(s3.uploadFileMultipart(bucket, key, partSize, uploadEmptyFiles, multiPartConcurrency))
Expand Down
Loading