Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
domenkozar committed Mar 24, 2020
1 parent b5e41dc commit 79a50ac
Show file tree
Hide file tree
Showing 3 changed files with 133 additions and 76 deletions.
6 changes: 5 additions & 1 deletion amazonka-s3-streaming.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,18 @@ library
, amazonka-core >= 1.6 && < 1.7
, amazonka-s3 >= 1.6 && < 1.7
, conduit >= 1.3 && < 1.4
, conduit-concurrent-map
, bytestring >= 0.10.8.0 && < 0.11
, bytestring-to-vector
, mmorph >= 1.0.6 && < 1.2
, lens >= 4.13 && < 5.0
, mtl >= 2.2.1 && < 2.3
, exceptions >= 0.8.2.1 && < 0.11
, dlist >= 0.8 && < 0.9
, resourcet
, async >= 2 && < 2.3
, http-client >= 0.4 && < 0.7
, vector
, text

flag s3upload-exe
Description: Whether to build the s3upload executable for uploading files using this library.
Expand Down
196 changes: 123 additions & 73 deletions src/Network/AWS/S3/StreamingUpload.hs
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,15 @@ import Network.AWS
( AWS, HasEnv(..), LogLevel(..), MonadAWS, getFileSize, hashedBody, hashedFileRange,
liftAWS, runAWS, runResourceT, send, toBody )

import Network.AWS.Data.Crypto ( Digest, SHA256, hashFinalize, hashInit, hashUpdate )
import Network.AWS.Data.Crypto ( Digest, SHA256, hash)
import Network.AWS.Data.Body (HashedBody(..))

import Network.AWS.S3.AbortMultipartUpload
import Network.AWS.S3.CompleteMultipartUpload
import Network.AWS.S3.CreateMultipartUpload
import Network.AWS.S3.ListMultipartUploads
import Network.AWS.S3.Types
( BucketName, cmuParts, completedMultipartUpload, completedPart, muKey, muUploadId )
( ObjectKey, CompletedPart, BucketName, cmuParts, completedMultipartUpload, completedPart, muKey, muUploadId )
import Network.AWS.S3.UploadPart

import Control.Applicative
Expand All @@ -38,17 +39,23 @@ import Control.Monad.IO.Class ( MonadIO, liftIO )
import Control.Monad.Morph ( lift )
import Control.Monad.Reader.Class ( local )

import Conduit ( MonadUnliftIO(..) )
import Data.Conduit ( ConduitT, Void, await, catchC )
import Data.Word (Word8)
import qualified Data.Vector.Storable as ScopedTypeVariables
import Conduit ( MonadUnliftIO(..), mapC, PrimMonad )
import Data.Conduit ( ConduitT, Void, await, catchC, handleC, (.|), leftover, yield, awaitForever)
import Data.Conduit.Combinators (vectorBuilder, sinkList)
import Data.Conduit.List ( sourceList )

import Data.Conduit.ConcurrentMap (concurrentMapM_)
import Control.Monad.Trans.Resource (ResourceT, MonadResource)
import Data.ByteString ( ByteString )
import qualified Data.ByteString as BS
import Data.ByteString.Builder ( stringUtf8 )

import qualified Data.DList as D
import Data.ByteString.Unsafe (unsafeIndex)
import qualified Data.Vector.Storable as VS
import Data.Vector.Storable.ByteString (vectorToByteString)
import Data.List ( unfoldr )
import Data.List.NonEmpty ( nonEmpty )
import Data.List.NonEmpty ( nonEmpty, fromList )
import Data.Text (Text)

import Control.Lens ( set, view )
import Control.Lens.Operators
Expand Down Expand Up @@ -85,74 +92,117 @@ See the AWS documentation for more details.
May throw 'Network.AWS.Error'
-}
streamUpload :: (MonadUnliftIO m, MonadAWS m, MonadFail m)
streamUpload :: (MonadUnliftIO m, MonadAWS m, MonadFail m, MonadResource m, PrimMonad m)
=> Maybe ChunkSize -- ^ Optional chunk size
-> CreateMultipartUpload -- ^ Upload location
-> ConduitT ByteString Void m (Either (AbortMultipartUploadResponse, SomeException) CompleteMultipartUploadResponse)
streamUpload mChunkSize multiPartUploadDesc = do
logger <- lift $ liftAWS $ view envLogger
let logStr :: MonadIO m => String -> m ()
logStr = liftIO . logger Debug . stringUtf8
chunkSize = maybe minimumChunkSize (max minimumChunkSize) mChunkSize

multiPartUpload <- lift $ send multiPartUploadDesc
when (multiPartUpload ^. cmursResponseStatus /= 200) $
fail "Failed to create upload"

logStr "\n**** Created upload\n"

let Just upId = multiPartUpload ^. cmursUploadId
bucket = multiPartUploadDesc ^. cmuBucket
key = multiPartUploadDesc ^. cmuKey
-- go :: DList ByteString -> Int -> Context SHA256 -> Int -> DList (Maybe CompletedPart) -> Sink ByteString m ()
go !bss !bufsize !ctx !partnum !completed = await >>= \case
Just bs
| l <- BS.length bs, bufsize + l <= chunkSize
-> go (D.snoc bss bs) (bufsize + l) (hashUpdate ctx bs) partnum completed
| otherwise -> do
rs <- lift $ performUpload partnum (bufsize + BS.length bs)
(hashFinalize $ hashUpdate ctx bs)
(D.snoc bss bs)

logStr $ printf "\n**** Uploaded part %d size %d\n" partnum bufsize
let !part = completedPart partnum <$> (rs ^. uprsETag)
liftIO performGC
go empty 0 hashInit (partnum+1) . D.snoc completed $! part

Nothing -> lift $ do
prts <- if bufsize > 0
then do
rs <- performUpload partnum bufsize (hashFinalize ctx) bss
logStr $ printf "\n**** Uploaded (final) part %d size %d\n" partnum bufsize
let allParts = D.toList $ D.snoc completed $ completedPart partnum <$> (rs ^. uprsETag)
pure $ nonEmpty =<< sequence allParts
else do
logStr $ printf "\n**** No final data to upload\n"
pure $ nonEmpty =<< sequence (D.toList completed)

send $ completeMultipartUpload bucket key upId
& cMultipartUpload ?~ set cmuParts prts completedMultipartUpload


performUpload :: (MonadAWS m, MonadFail m) => Int -> Int -> Digest SHA256 -> D.DList ByteString -> m UploadPartResponse
performUpload pnum size digest =
D.toList
>>> sourceList
>>> hashedBody digest (fromIntegral size)
>>> toBody
>>> uploadPart bucket key pnum upId
>>> send
>=> checkUpload

checkUpload :: (Monad m, MonadFail m) => UploadPartResponse -> m UploadPartResponse
checkUpload upr = do
when (upr ^. uprsResponseStatus /= 200) $ fail "Failed to upload piece"
return upr

(Right <$> go D.empty 0 hashInit 1 D.empty) `catchC` \(except :: SomeException) ->
Left . (,except) <$> lift (send (abortMultipartUpload bucket key upId))
-- Whatever happens, we abort the upload and return the exception

streamUpload mChunkSize multiPartUploadDesc =
chunkConduit
.| mapC vectorToByteString
.| enumerateConduit
.| startSingleUpload
where
startSingleUpload :: (MonadUnliftIO m, MonadAWS m, MonadFail m, MonadResource m) =>
ConduitT (Int, ByteString) Void m (Either (AbortMultipartUploadResponse, SomeException) CompleteMultipartUploadResponse)
startSingleUpload = do
input <- await
case input of
Nothing -> fail "no body in stream"
Just buffer -> do
leftover buffer
streamMultiUpload

streamMultiUpload :: (MonadUnliftIO m, MonadAWS m, MonadFail m, MonadResource m) =>
ConduitT
(Int, ByteString)
Void
m
(Either
(AbortMultipartUploadResponse, SomeException)
CompleteMultipartUploadResponse)
streamMultiUpload = do
multiPartUpload <- lift $ send multiPartUploadDesc
when (multiPartUpload ^. cmursResponseStatus /= 200) $
fail "Failed to create upload"
logStr "\n**** Created upload\n"

let Just upId = multiPartUpload ^. cmursUploadId
bucket = multiPartUploadDesc ^. cmuBucket
key = multiPartUploadDesc ^. cmuKey

handleC (cancelMultiUploadConduit bucket key upId) $
concurrentMapM_ 10 3 (multiUpload bucket key upId)
.| (finishMultiUploadConduit bucket key upId)

multiUpload :: (MonadUnliftIO m, MonadAWS m, MonadFail m, MonadResource m) =>
BucketName
-> ObjectKey
-> Text
-> (Int, ByteString)
-> m (Maybe CompletedPart)
multiUpload bucket key upId (partnum, buffer) = do
let bufsize = BS.length buffer
let req = uploadPart bucket key partnum upId (toBody $ HashedBytes (hash buffer) buffer)
res <- liftAWS $ send req
when (res ^. uprsResponseStatus /= 200) $ fail "Failed to upload piece"
--logStr $ printf "\n**** Uploaded part %d size %d\n" partnum bufsize
return $ completedPart partnum <$> (res ^. uprsETag)

finishMultiUploadConduit :: (MonadUnliftIO m, MonadAWS m, MonadFail m) =>
BucketName
-> ObjectKey
-> Text
-> ConduitT
(Maybe CompletedPart)
Void
m
(Either (AbortMultipartUploadResponse, SomeException) CompleteMultipartUploadResponse)
finishMultiUploadConduit bucket key upId = do
parts <- sinkList
res <- lift $ send $ completeMultipartUpload bucket key upId & cMultipartUpload ?~ set cmuParts (sequenceA (fromList parts)) completedMultipartUpload
return $ Right res

cancelMultiUploadConduit :: (MonadUnliftIO m, MonadAWS m, MonadFail m) =>
BucketName
-> ObjectKey
-> Text
-> SomeException
-> ConduitT
i
Void
m (Either (AbortMultipartUploadResponse, SomeException) CompleteMultipartUploadResponse)
cancelMultiUploadConduit bucket key upId exc = do
res <- lift $ send $ abortMultipartUpload bucket key upId
return $ Left (res, exc)

chunkConduit :: (MonadUnliftIO m, MonadAWS m, MonadFail m, MonadResource m, PrimMonad m) =>
ConduitT ByteString (VS.Vector Word8) m ()
chunkConduit =
vectorBuilder (fromIntegral chunkSize) f
where f :: MonadUnliftIO m => (Word8 -> m b) -> ConduitT ByteString Void m ()
f yieldByte = do
awaitForever $ \bs ->
lift $ forM_ [0..BS.length bs - 1] $ \i ->
yieldByte $ unsafeIndex bs i
{-# INLINE chunkConduit #-}

enumerateConduit :: (MonadUnliftIO m, MonadAWS m, MonadFail m, MonadResource m) =>
ConduitT a (Int, a) m ()
enumerateConduit = loop 1
where
loop i = await >>= maybe (return ()) (go i)
go i x = do
yield (i, x)
loop (i + 1)
{-# INLINE enumerateConduit #-}

chunkSize :: ChunkSize
chunkSize = maybe minimumChunkSize (max minimumChunkSize) mChunkSize

logStr :: (MonadAWS m, MonadFail m) => (String -> ConduitT i Void m ())
logStr msg = do
logger <- lift $ liftAWS $ view envLogger
liftIO $ logger Debug $ stringUtf8 msg

-- | Specifies whether to upload a file or 'ByteString
data UploadLocation
Expand Down
7 changes: 5 additions & 2 deletions stack.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
# resolver:
# name: custom-snapshot
# location: "./custom-snapshot.yaml"
resolver: lts-13.18
resolver: lts-14.27

# User packages to be built.
# Various formats can be used as shown in the example below.
Expand All @@ -39,7 +39,10 @@ packages:
- '.'
# Dependency packages to be pulled from upstream that are not in the resolver
# (e.g., acme-missiles-0.3)
extra-deps: []
extra-deps:
- amazonka-1.6.1@sha256:f58b63e83876f93aa03c54e54b3eb8ba9358af93819d41f4a5a8f8b7d8b8399c,3544
- amazonka-core-1.6.1@sha256:9bc59ce403c6eeba3b3eaf3f10e5f0b6a33b6edbbf8f6de0dd6f4c67b86fa698,5135
- amazonka-s3-1.6.1@sha256:9d07240fca59ad5197fb614ce3051e701e4951e6d4625a2dab4a9c17a1900194,6317

# Override default flag values for local packages and extra-deps
flags: {}
Expand Down

0 comments on commit 79a50ac

Please sign in to comment.