Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Amazonka-2.0: Better streaming of parts? #26

Open
endgame opened this issue Oct 27, 2021 · 3 comments
Open

Amazonka-2.0: Better streaming of parts? #26

endgame opened this issue Oct 27, 2021 · 3 comments

Comments

@endgame
Copy link
Collaborator

endgame commented Oct 27, 2021

At a high level, amazonka-s3-streaming draws ByteStrings from a conduit and assembles them into the parts of an S3 Multi-Part Upload:

streamUpload :: (MonadUnliftIO m, MonadAWS m, MonadFail m)
=> Maybe ChunkSize -- ^ Optional chunk size
-> CreateMultipartUpload -- ^ Upload location
-> ConduitT ByteString Void m (Either (AbortMultipartUploadResponse, SomeException) CompleteMultipartUploadResponse)

At the moment, chunked uploads are broken on the Hackage 1.6.1 release of amazonka (see e.g. https://github.com/brendanhay/amazonka/issue/596 https://github.com/brendanhay/amazonka/issue/547), but once amazonka-2.0 comes out, better streaming would become possible - but for some frustrating limitations:

  • We want to split the input conduit across multiple UploadPart operations without buffering each part in memory or unnecessarily rechunking the stream
  • A sensible type for the operation we'd like to perform could be something like:
    inParts ::
      MonadIO m =>
      -- | Minimum part size
      Integer ->
      -- | Maximum part size
      Integer ->
      ConduitT ByteString ByteString m () ->
      ConduitT ByteString (ConduitT () ByteString m ()) m ()
    But I can't work out how to await on the outer conduit. Other streaming libraries seem to handle this differently (streaming looks nice, pipes-group looks completely unusable, streamly ... does something, I guess?) but I doubt I could ram through a change to such a core part of the library without causing disproportionate amounts of pain.
  • Conduit has connect-and-resume support, but to use it you need to use a SealedConduitT which we can't provide to amazonka as a chunkedBody. Even if we could, there's no way for the request to know how much to take, nor any way to return the leftover stream (see haddock for e.g., (($$++)).
  • We could feed the input conduit into a TBQueue, along wtih end-of-part/end-of-stream markers. stm-conduit has been unmaintained for years, so maybe we need to write something like:
    data PartItem = Part !ByteString | EndPart | EndStream
    -- | Break up the stream into parts between the given minimum and maximum sizes.
    -- Insert 'EndPart' after each part, and 'EndStream' after the last part.
    -- Only break up a 'ByteString' if including it in a part would take us over the maximum part size.
    inParts :: MonadIO m => Integer -> Integer -> ConduitT ByteString PartItem m r
    toTBQueue :: MonadIO m => TBQueue a -> ConduitT a Void m r
    -- | Read from the TBQueue until 'EndStream'. Each time an 'EndPart' is seen, call the provided function with a new conduit.
    fromTBQueueParts :: MonadIO m => TBQueue PartItem -> (ConduitT () a m () -> m ()) -> m ()
    This feels like the best option to me - we should mostly end up passing ByteStrings around without copying them, and with a reasonable size on the TBQueue we'd start preparing the next upload before the first one finishes.
@ghost
Copy link

ghost commented Oct 27, 2021

I think I agree on all points, though I think that in the branch which will target amazonka >= 1.6.1 I'll stick to the current implementation of building a buffer and writing that out when its full (needing some the changes we've spoken about regarding capping the chunk size etc.)

For Amazonka 2.0, I wouldn't mind playing with concurrency for the chunk sending, but what I'm not too sure about is what constraints I am happy to place on the user so that I can fork threads and still manage resources and exceptions. Lifted-async seems useful but I don't know how onerous MonadBaseControl would be for a user of the lib. I could potentially offer a concurrent-stream-in-and-send-to-s3 version with a MonadBaseControl constraint and one with the current constraints that buffers and sends the chunk before moving onto the next one.

@endgame
Copy link
Collaborator Author

endgame commented Oct 27, 2021

[I]n the branch which will target amazonka >= 1.6.1 I'll stick to the current implementation of building a buffer and writing that out when its full (needing some the changes we've spoken about regarding capping the chunk size etc.)

You will have to - chunked uploads are broken on 1.6.1.

For Amazonka-2.0, if you want streaming directly from the source conduit into amazonka (without assembling the buffer yourself), I can't see a way that doesn't involve some kind of rendezvous structure. I'd stay away from anything demanding MonadBaseControl - it's not really seeing as much use in Snoymanverse projects, and think UnliftIO.Async provides similar functions. Also, resourcet (used by amazonka) already forces a dependency on unliftio-core via runResourceT :: MonadUnliftIO m => ResourceT m a -> m a, so you wouldn't be adding additional constraints to users of amazonka-s3-streaming.

@endgame
Copy link
Collaborator Author

endgame commented Nov 3, 2022

For streamUpload, this library has to buffer each chunk in full before sending it to amazonka. To get perfect streaming, I have to "seal" the conduit, and then I can write a function that streams a single chunk's worth of data from a SealedConduitT:

-- | Extract a single chunk from a stream, reallocating as little as possible.
streamChunk :: forall m a.
  MonadIO m
  => ChunkSize
  -> SealedConduitT () ByteString m a
  -> SealedConduitT () ByteString m (SealedConduitT () ByteString m a)
streamChunk size (SealedConduitT pipe) =
  SealedConduitT $ SealedConduitT <$> pipeChunk pipe
  where
    pipeChunk
      :: Pipe () () ByteString () m a
      -> Pipe () () ByteString () m (Pipe () () ByteString () m a)
    pipeChunk = loop size
      where
        loop !n = \case
          HaveOutput p b -> case compare n bLen of
            -- Emit 'n' bytes from 'b' and push the rest onto the return value
            LT ->
              let (bN, bRest) = BS.splitAt n b
              in  HaveOutput (pure $ HaveOutput p bRest) bN
            -- Emit 'b' and then we're done
            EQ -> HaveOutput (pure p) b
            -- 'b' fits entirely in the stream we want to emit
            GT -> HaveOutput (loop (n - bLen) p) b
            where
              bLen = BS.length b
          NeedInput f _ -> loop n $ f ()
          Done a -> pure $ Done a
          PipeM m -> PipeM $ loop n <$> m
          Leftover p () -> loop n p

I can't figure out how to plug this in, because streamUpload returns a conduit as a result, drawing input using the monadic action await provided by conduit. There should be some kind of streaming await that pulls up to N bytes and returns them as a sub-conduit.

I am increasingly of the opinion that having "input" in a streaming library's core transformer type is a misfeature, and you should instead consume an input stream as a function argument.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant