Skip to content

Commit

Permalink
Remove IO.blocking from array copy
Browse files Browse the repository at this point in the history
* Also remove the array copy - using `.asByteArrayUnsafe` instead of
`.asByteArray` to return the original byte array without making a copy
of it. FS2 doesn't mutate the byte array afaict, and this
`readFileMultipart` method returns a `fs2.Stream[F, Byte]` - the caller
isn't going to be able to mutate those Bytes.

* Also removed the `Monad.ifM`. `ifM(pure(x))(t, f)` is the same as `if
(x) t else false`.

* Construct the `Chunk`s from byte arrays in a more direct fashion.
  • Loading branch information
Leif Warner committed Nov 13, 2024
1 parent 110af16 commit 814bbea
Showing 1 changed file with 13 additions and 20 deletions.
33 changes: 13 additions & 20 deletions fs2-aws-s3/src/main/scala/fs2/aws/s3/S3.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package fs2.aws.s3

import cats.effect.*
import cats.implicits.*
import cats.effect.Concurrent
import cats.syntax.all.*
import cats.{Applicative, ApplicativeThrow, ~>}
import eu.timepit.refined.auto.*
import fs2.aws.s3.S3.MultipartETagValidation
Expand All @@ -13,8 +13,6 @@ import software.amazon.awssdk.core.async.{AsyncRequestBody, AsyncResponseTransfo
import software.amazon.awssdk.services.s3.model.*

import java.security.MessageDigest
import scala.collection.immutable.ArraySeq
import scala.collection.immutable.ArraySeq.unsafeWrapArray
import scala.jdk.CollectionConverters.*
import scala.util.control.NoStackTrace

Expand Down Expand Up @@ -94,7 +92,7 @@ object S3 {
}

/** It creates an instance of the purely functional S3 API. */
def create[F[_]: Async](s3: S3AsyncClientOp[F]): S3[F] =
def create[F[_]: Concurrent](s3: S3AsyncClientOp[F]): S3[F] =
new S3[F] {

/** Deletes a file in a single request.
Expand Down Expand Up @@ -194,12 +192,10 @@ object S3 {
): Pipe[F, List[PartProcessingOutcome], (Option[ETag], Option[Checksum], Option[PartId])] =
_.evalMap[F, (Option[ETag], Option[Checksum], Option[PartId])] {
case Nil =>
cancelUpload(uploadId) *>
Async[F]
.ifM(Async[F].pure(uploadEmptyFiles))(
uploadEmptyFile.map(eTag => (Option(eTag.eTag()), Option.empty[Checksum], Option.empty[PartId])),
Async[F].pure((Option.empty[ETag], Option.empty[Checksum], Option.empty[PartId]))
)
cancelUpload(uploadId) *> (
if (uploadEmptyFiles) uploadEmptyFile.map(eTag => Option(eTag.eTag))
else Applicative[F].pure(Option.empty[ETag])
).map((_, Option.empty[Checksum], Option.empty[PartId]))
case tags =>
val parts = tags.map { case PartProcessingOutcome(t, i, _) =>
CompletedPart.builder().partNumber(i).eTag(t).build()
Expand Down Expand Up @@ -272,7 +268,7 @@ object S3 {
.through(uploadPart(uploadId))
.fold[List[PartProcessingOutcome]](List.empty)(_ :+ _)
.through(completeUpload(uploadId))
.handleErrorWith(ex => fs2.Stream.eval(cancelUpload(uploadId) >> Sync[F].raiseError(ex)))
.handleErrorWith(ex => fs2.Stream.eval(cancelUpload(uploadId) >> ApplicativeThrow[F].raiseError(ex)))
_ <- fs2.Stream.eval(validateETag(eTag, maxPartNum, checksum))
} yield eTag

Expand All @@ -294,7 +290,7 @@ object S3 {
AsyncResponseTransformer.toBytes[GetObjectResponse]
)
)
.flatMap(r => fs2.Stream.chunk(Chunk(ArraySeq.unsafeWrapArray(r.asByteArray)*)))
.flatMap(r => fs2.Stream.chunk(Chunk.array(r.asByteArrayUnsafe)))

/** Reads a file in multiple parts of the specified @partSize per request. Suitable for big files.
*
Expand Down Expand Up @@ -328,15 +324,12 @@ object S3 {
.last
.flatMap {
case Some(resp) =>
Pull.eval {
Async[F].blocking {
val bs = resp.asByteArray()
val len = bs.length
if (len < 0) None else Some(Chunk(unsafeWrapArray(bs)*))
}
Pull.pure {
val bs = resp.asByteArrayUnsafe
Option.when(bs.length >= 0)(Chunk.array(bs))
}
case None =>
Pull.eval(none.pure[F])
Pull.pure(none)
}
.flatMap {
case Some(o) =>
Expand Down

0 comments on commit 814bbea

Please sign in to comment.