Skip to content

Commit

Permalink
remove scanamo dependency from ddb integration (#1131)
Browse files Browse the repository at this point in the history
Co-authored-by: dmytro.obodowsky <[email protected]>
  • Loading branch information
semenodm and dmytro.obodowsky authored Nov 29, 2023
1 parent 7b5180e commit 5e11493
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 27 deletions.
56 changes: 56 additions & 0 deletions fs2-aws-s3/src/main/scala/fs2/aws/s3/AwsRequestModifier.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package fs2.aws.s3

import software.amazon.awssdk.services.s3.model.{
AbortMultipartUploadRequest,
CompleteMultipartUploadRequest,
CreateMultipartUploadRequest,
PutObjectRequest,
UploadPartRequest
}

object AwsRequestModifier {
trait MultipartUpload {
def createMultipartUpload(
b: CreateMultipartUploadRequest.Builder
): CreateMultipartUploadRequest.Builder

def uploadPart(b: UploadPartRequest.Builder): UploadPartRequest.Builder

def completeMultipartUpload(
b: CompleteMultipartUploadRequest.Builder
): CompleteMultipartUploadRequest.Builder

def abortMultipartUpload(
b: AbortMultipartUploadRequest.Builder
): AbortMultipartUploadRequest.Builder
}

object MultipartUpload {
def identity: MultipartUpload = new MultipartUpload {

def createMultipartUpload(
b: CreateMultipartUploadRequest.Builder
): CreateMultipartUploadRequest.Builder = b

def uploadPart(b: UploadPartRequest.Builder): UploadPartRequest.Builder = b

def completeMultipartUpload(
b: CompleteMultipartUploadRequest.Builder
): CompleteMultipartUploadRequest.Builder = b

def abortMultipartUpload(
b: AbortMultipartUploadRequest.Builder
): AbortMultipartUploadRequest.Builder = b
}
}

trait Upload1 {
def putObject(b: PutObjectRequest.Builder): PutObjectRequest.Builder
}

object Upload1 {
def identity: Upload1 = new Upload1 {
def putObject(b: PutObjectRequest.Builder): PutObjectRequest.Builder = b
}
}
}
76 changes: 49 additions & 27 deletions fs2-aws-s3/src/main/scala/fs2/aws/s3/S3.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,19 @@ import scala.jdk.CollectionConverters.*
/* A purely functional abstraction over the S3 API based on fs2.Stream */
trait S3[F[_]] {
def delete(bucket: BucketName, key: FileKey): F[Unit]
def uploadFile(bucket: BucketName, key: FileKey): Pipe[F, Byte, ETag]
def uploadFile(
bucket: BucketName,
key: FileKey,
awsRequestModifier: AwsRequestModifier.Upload1 = AwsRequestModifier.Upload1.identity
): Pipe[F, Byte, ETag]

def uploadFileMultipart(
bucket: BucketName,
key: FileKey,
partSize: PartSizeMB,
uploadEmptyFiles: UploadEmptyFiles = false,
multiPartConcurrency: MultiPartConcurrency = 10
multiPartConcurrency: MultiPartConcurrency = 10,
requestModifier: AwsRequestModifier.MultipartUpload = AwsRequestModifier.MultipartUpload.identity
): Pipe[F, Byte, Option[ETag]]
def readFile(bucket: BucketName, key: FileKey): fs2.Stream[F, Byte]

Expand Down Expand Up @@ -60,13 +65,13 @@ object S3 {
*
* For big files, consider using [[uploadFileMultipart]] instead.
*/
def uploadFile(bucket: BucketName, key: FileKey): Pipe[F, Byte, ETag] =
def uploadFile(bucket: BucketName, key: FileKey, modifier: AwsRequestModifier.Upload1): Pipe[F, Byte, ETag] =
in =>
fs2.Stream.eval {
in.compile.toVector.flatMap { vs =>
val bs = ByteBuffer.wrap(vs.toArray)
s3.putObject(
PutObjectRequest.builder().bucket(bucket.value).key(key.value).build(),
modifier.putObject(PutObjectRequest.builder().bucket(bucket.value).key(key.value)).build(),
AsyncRequestBody.fromByteBuffer(bs)
).map(_.eTag())
}
Expand Down Expand Up @@ -99,25 +104,31 @@ object S3 {
key: FileKey,
partSize: PartSizeMB,
uploadEmptyFiles: UploadEmptyFiles,
multiPartConcurrency: MultiPartConcurrency
multiPartConcurrency: MultiPartConcurrency,
requestModifier: AwsRequestModifier.MultipartUpload
): Pipe[F, Byte, Option[ETag]] = {
val chunkSizeBytes = partSize * 1048576

def initiateMultipartUpload: F[UploadId] =
s3.createMultipartUpload(
CreateMultipartUploadRequest.builder().bucket(bucket.value).key(key.value).build()
requestModifier
.createMultipartUpload(CreateMultipartUploadRequest.builder().bucket(bucket.value).key(key.value))
.build()
).map(_.uploadId())

def uploadPart(uploadId: UploadId): Pipe[F, (Chunk[Byte], Long), (PartETag, PartId)] =
_.parEvalMap(multiPartConcurrency) { case (c, i) =>
s3.uploadPart(
UploadPartRequest
.builder()
.bucket(bucket.value)
.key(key.value)
.uploadId(uploadId)
.partNumber(i.toInt)
.contentLength(c.size.toLong)
requestModifier
.uploadPart(
UploadPartRequest
.builder()
.bucket(bucket.value)
.key(key.value)
.uploadId(uploadId)
.partNumber(i.toInt)
.contentLength(c.size.toLong)
)
.build(),
AsyncRequestBody.fromBytes(c.toArray)
).map(_.eTag() -> i.toInt)
Expand All @@ -141,24 +152,30 @@ object S3 {
CompletedPart.builder().partNumber(i).eTag(t).build()
}.asJava
s3.completeMultipartUpload(
CompleteMultipartUploadRequest
.builder()
.bucket(bucket.value)
.key(key.value)
.uploadId(uploadId)
.multipartUpload(CompletedMultipartUpload.builder().parts(parts).build())
requestModifier
.completeMultipartUpload(
CompleteMultipartUploadRequest
.builder()
.bucket(bucket.value)
.key(key.value)
.uploadId(uploadId)
.multipartUpload(CompletedMultipartUpload.builder().parts(parts).build())
)
.build()
).map(eTag => Option(eTag.eTag()))

}

def cancelUpload(uploadId: UploadId): F[Unit] =
s3.abortMultipartUpload(
AbortMultipartUploadRequest
.builder()
.bucket(bucket.value)
.key(key.value)
.uploadId(uploadId)
requestModifier
.abortMultipartUpload(
AbortMultipartUploadRequest
.builder()
.bucket(bucket.value)
.key(key.value)
.uploadId(uploadId)
)
.build()
).void

Expand Down Expand Up @@ -255,17 +272,22 @@ object S3 {
override def delete(bucket: BucketName, key: FileKey): G[Unit] =
fToG(s3.delete(bucket, key))

override def uploadFile(bucket: BucketName, key: FileKey): Pipe[G, Byte, ETag] =
override def uploadFile(
bucket: BucketName,
key: FileKey,
modifier: AwsRequestModifier.Upload1
): Pipe[G, Byte, ETag] =
_.translate(gToF)
.through(s3.uploadFile(bucket, key))
.through(s3.uploadFile(bucket, key, modifier))
.translate(fToG)

override def uploadFileMultipart(
bucket: BucketName,
key: FileKey,
partSize: PartSizeMB,
uploadEmptyFiles: UploadEmptyFiles,
multiPartConcurrency: MultiPartConcurrency = 10
multiPartConcurrency: MultiPartConcurrency = 10,
requestModifier: AwsRequestModifier.MultipartUpload = AwsRequestModifier.MultipartUpload.identity
): Pipe[G, Byte, Option[ETag]] =
_.translate(gToF)
.through(s3.uploadFileMultipart(bucket, key, partSize, uploadEmptyFiles, multiPartConcurrency))
Expand Down

0 comments on commit 5e11493

Please sign in to comment.