From 79a50ac92da415edf57e4d04e1af3a7c4af3c781 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Domen=20Ko=C5=BEar?= Date: Tue, 24 Mar 2020 15:19:43 +0100 Subject: [PATCH] WIP --- amazonka-s3-streaming.cabal | 6 +- src/Network/AWS/S3/StreamingUpload.hs | 196 ++++++++++++++++---------- stack.yaml | 7 +- 3 files changed, 133 insertions(+), 76 deletions(-) diff --git a/amazonka-s3-streaming.cabal b/amazonka-s3-streaming.cabal index 7e45364..45530eb 100644 --- a/amazonka-s3-streaming.cabal +++ b/amazonka-s3-streaming.cabal @@ -25,14 +25,18 @@ library , amazonka-core >= 1.6 && < 1.7 , amazonka-s3 >= 1.6 && < 1.7 , conduit >= 1.3 && < 1.4 + , conduit-concurrent-map , bytestring >= 0.10.8.0 && < 0.11 + , bytestring-to-vector , mmorph >= 1.0.6 && < 1.2 , lens >= 4.13 && < 5.0 , mtl >= 2.2.1 && < 2.3 , exceptions >= 0.8.2.1 && < 0.11 - , dlist >= 0.8 && < 0.9 + , resourcet , async >= 2 && < 2.3 , http-client >= 0.4 && < 0.7 + , vector + , text flag s3upload-exe Description: Whether to build the s3upload executable for uploading files using this library. diff --git a/src/Network/AWS/S3/StreamingUpload.hs b/src/Network/AWS/S3/StreamingUpload.hs index fd4ae08..6aea13f 100644 --- a/src/Network/AWS/S3/StreamingUpload.hs +++ b/src/Network/AWS/S3/StreamingUpload.hs @@ -20,14 +20,15 @@ import Network.AWS ( AWS, HasEnv(..), LogLevel(..), MonadAWS, getFileSize, hashedBody, hashedFileRange, liftAWS, runAWS, runResourceT, send, toBody ) -import Network.AWS.Data.Crypto ( Digest, SHA256, hashFinalize, hashInit, hashUpdate ) +import Network.AWS.Data.Crypto ( Digest, SHA256, hash) +import Network.AWS.Data.Body (HashedBody(..)) import Network.AWS.S3.AbortMultipartUpload import Network.AWS.S3.CompleteMultipartUpload import Network.AWS.S3.CreateMultipartUpload import Network.AWS.S3.ListMultipartUploads import Network.AWS.S3.Types - ( BucketName, cmuParts, completedMultipartUpload, completedPart, muKey, muUploadId ) + ( ObjectKey, CompletedPart, BucketName, cmuParts, completedMultipartUpload, completedPart, muKey, muUploadId ) import Network.AWS.S3.UploadPart import Control.Applicative @@ -38,17 +39,23 @@ import Control.Monad.IO.Class ( MonadIO, liftIO ) import Control.Monad.Morph ( lift ) import Control.Monad.Reader.Class ( local ) -import Conduit ( MonadUnliftIO(..) ) -import Data.Conduit ( ConduitT, Void, await, catchC ) +import Data.Word (Word8) +import qualified Data.Vector.Storable as ScopedTypeVariables +import Conduit ( MonadUnliftIO(..), mapC, PrimMonad ) +import Data.Conduit ( ConduitT, Void, await, catchC, handleC, (.|), leftover, yield, awaitForever) +import Data.Conduit.Combinators (vectorBuilder, sinkList) import Data.Conduit.List ( sourceList ) - +import Data.Conduit.ConcurrentMap (concurrentMapM_) +import Control.Monad.Trans.Resource (ResourceT, MonadResource) import Data.ByteString ( ByteString ) import qualified Data.ByteString as BS import Data.ByteString.Builder ( stringUtf8 ) - -import qualified Data.DList as D +import Data.ByteString.Unsafe (unsafeIndex) +import qualified Data.Vector.Storable as VS +import Data.Vector.Storable.ByteString (vectorToByteString) import Data.List ( unfoldr ) -import Data.List.NonEmpty ( nonEmpty ) +import Data.List.NonEmpty ( nonEmpty, fromList ) +import Data.Text (Text) import Control.Lens ( set, view ) import Control.Lens.Operators @@ -85,74 +92,117 @@ See the AWS documentation for more details. May throw 'Network.AWS.Error' -} -streamUpload :: (MonadUnliftIO m, MonadAWS m, MonadFail m) +streamUpload :: (MonadUnliftIO m, MonadAWS m, MonadFail m, MonadResource m, PrimMonad m) => Maybe ChunkSize -- ^ Optional chunk size -> CreateMultipartUpload -- ^ Upload location -> ConduitT ByteString Void m (Either (AbortMultipartUploadResponse, SomeException) CompleteMultipartUploadResponse) -streamUpload mChunkSize multiPartUploadDesc = do - logger <- lift $ liftAWS $ view envLogger - let logStr :: MonadIO m => String -> m () - logStr = liftIO . logger Debug . stringUtf8 - chunkSize = maybe minimumChunkSize (max minimumChunkSize) mChunkSize - - multiPartUpload <- lift $ send multiPartUploadDesc - when (multiPartUpload ^. cmursResponseStatus /= 200) $ - fail "Failed to create upload" - - logStr "\n**** Created upload\n" - - let Just upId = multiPartUpload ^. cmursUploadId - bucket = multiPartUploadDesc ^. cmuBucket - key = multiPartUploadDesc ^. cmuKey - -- go :: DList ByteString -> Int -> Context SHA256 -> Int -> DList (Maybe CompletedPart) -> Sink ByteString m () - go !bss !bufsize !ctx !partnum !completed = await >>= \case - Just bs - | l <- BS.length bs, bufsize + l <= chunkSize - -> go (D.snoc bss bs) (bufsize + l) (hashUpdate ctx bs) partnum completed - | otherwise -> do - rs <- lift $ performUpload partnum (bufsize + BS.length bs) - (hashFinalize $ hashUpdate ctx bs) - (D.snoc bss bs) - - logStr $ printf "\n**** Uploaded part %d size %d\n" partnum bufsize - let !part = completedPart partnum <$> (rs ^. uprsETag) - liftIO performGC - go empty 0 hashInit (partnum+1) . D.snoc completed $! part - - Nothing -> lift $ do - prts <- if bufsize > 0 - then do - rs <- performUpload partnum bufsize (hashFinalize ctx) bss - logStr $ printf "\n**** Uploaded (final) part %d size %d\n" partnum bufsize - let allParts = D.toList $ D.snoc completed $ completedPart partnum <$> (rs ^. uprsETag) - pure $ nonEmpty =<< sequence allParts - else do - logStr $ printf "\n**** No final data to upload\n" - pure $ nonEmpty =<< sequence (D.toList completed) - - send $ completeMultipartUpload bucket key upId - & cMultipartUpload ?~ set cmuParts prts completedMultipartUpload - - - performUpload :: (MonadAWS m, MonadFail m) => Int -> Int -> Digest SHA256 -> D.DList ByteString -> m UploadPartResponse - performUpload pnum size digest = - D.toList - >>> sourceList - >>> hashedBody digest (fromIntegral size) - >>> toBody - >>> uploadPart bucket key pnum upId - >>> send - >=> checkUpload - - checkUpload :: (Monad m, MonadFail m) => UploadPartResponse -> m UploadPartResponse - checkUpload upr = do - when (upr ^. uprsResponseStatus /= 200) $ fail "Failed to upload piece" - return upr - - (Right <$> go D.empty 0 hashInit 1 D.empty) `catchC` \(except :: SomeException) -> - Left . (,except) <$> lift (send (abortMultipartUpload bucket key upId)) - -- Whatever happens, we abort the upload and return the exception - +streamUpload mChunkSize multiPartUploadDesc = + chunkConduit + .| mapC vectorToByteString + .| enumerateConduit + .| startSingleUpload + where + startSingleUpload :: (MonadUnliftIO m, MonadAWS m, MonadFail m, MonadResource m) => + ConduitT (Int, ByteString) Void m (Either (AbortMultipartUploadResponse, SomeException) CompleteMultipartUploadResponse) + startSingleUpload = do + input <- await + case input of + Nothing -> fail "no body in stream" + Just buffer -> do + leftover buffer + streamMultiUpload + + streamMultiUpload :: (MonadUnliftIO m, MonadAWS m, MonadFail m, MonadResource m) => + ConduitT + (Int, ByteString) + Void + m + (Either + (AbortMultipartUploadResponse, SomeException) + CompleteMultipartUploadResponse) + streamMultiUpload = do + multiPartUpload <- lift $ send multiPartUploadDesc + when (multiPartUpload ^. cmursResponseStatus /= 200) $ + fail "Failed to create upload" + logStr "\n**** Created upload\n" + + let Just upId = multiPartUpload ^. cmursUploadId + bucket = multiPartUploadDesc ^. cmuBucket + key = multiPartUploadDesc ^. cmuKey + + handleC (cancelMultiUploadConduit bucket key upId) $ + concurrentMapM_ 10 3 (multiUpload bucket key upId) + .| (finishMultiUploadConduit bucket key upId) + + multiUpload :: (MonadUnliftIO m, MonadAWS m, MonadFail m, MonadResource m) => + BucketName + -> ObjectKey + -> Text + -> (Int, ByteString) + -> m (Maybe CompletedPart) + multiUpload bucket key upId (partnum, buffer) = do + let bufsize = BS.length buffer + let req = uploadPart bucket key partnum upId (toBody $ HashedBytes (hash buffer) buffer) + res <- liftAWS $ send req + when (res ^. uprsResponseStatus /= 200) $ fail "Failed to upload piece" + --logStr $ printf "\n**** Uploaded part %d size %d\n" partnum bufsize + return $ completedPart partnum <$> (res ^. uprsETag) + + finishMultiUploadConduit :: (MonadUnliftIO m, MonadAWS m, MonadFail m) => + BucketName + -> ObjectKey + -> Text + -> ConduitT + (Maybe CompletedPart) + Void + m + (Either (AbortMultipartUploadResponse, SomeException) CompleteMultipartUploadResponse) + finishMultiUploadConduit bucket key upId = do + parts <- sinkList + res <- lift $ send $ completeMultipartUpload bucket key upId & cMultipartUpload ?~ set cmuParts (sequenceA (fromList parts)) completedMultipartUpload + return $ Right res + + cancelMultiUploadConduit :: (MonadUnliftIO m, MonadAWS m, MonadFail m) => + BucketName + -> ObjectKey + -> Text + -> SomeException + -> ConduitT + i + Void + m (Either (AbortMultipartUploadResponse, SomeException) CompleteMultipartUploadResponse) + cancelMultiUploadConduit bucket key upId exc = do + res <- lift $ send $ abortMultipartUpload bucket key upId + return $ Left (res, exc) + + chunkConduit :: (MonadUnliftIO m, MonadAWS m, MonadFail m, MonadResource m, PrimMonad m) => + ConduitT ByteString (VS.Vector Word8) m () + chunkConduit = + vectorBuilder (fromIntegral chunkSize) f + where f :: MonadUnliftIO m => (Word8 -> m b) -> ConduitT ByteString Void m () + f yieldByte = do + awaitForever $ \bs -> + lift $ forM_ [0..BS.length bs - 1] $ \i -> + yieldByte $ unsafeIndex bs i + {-# INLINE chunkConduit #-} + + enumerateConduit :: (MonadUnliftIO m, MonadAWS m, MonadFail m, MonadResource m) => + ConduitT a (Int, a) m () + enumerateConduit = loop 1 + where + loop i = await >>= maybe (return ()) (go i) + go i x = do + yield (i, x) + loop (i + 1) + {-# INLINE enumerateConduit #-} + + chunkSize :: ChunkSize + chunkSize = maybe minimumChunkSize (max minimumChunkSize) mChunkSize + + logStr :: (MonadAWS m, MonadFail m) => (String -> ConduitT i Void m ()) + logStr msg = do + logger <- lift $ liftAWS $ view envLogger + liftIO $ logger Debug $ stringUtf8 msg -- | Specifies whether to upload a file or 'ByteString data UploadLocation diff --git a/stack.yaml b/stack.yaml index eda51c2..99b23ad 100644 --- a/stack.yaml +++ b/stack.yaml @@ -15,7 +15,7 @@ # resolver: # name: custom-snapshot # location: "./custom-snapshot.yaml" -resolver: lts-13.18 +resolver: lts-14.27 # User packages to be built. # Various formats can be used as shown in the example below. @@ -39,7 +39,10 @@ packages: - '.' # Dependency packages to be pulled from upstream that are not in the resolver # (e.g., acme-missiles-0.3) -extra-deps: [] +extra-deps: +- amazonka-1.6.1@sha256:f58b63e83876f93aa03c54e54b3eb8ba9358af93819d41f4a5a8f8b7d8b8399c,3544 +- amazonka-core-1.6.1@sha256:9bc59ce403c6eeba3b3eaf3f10e5f0b6a33b6edbbf8f6de0dd6f4c67b86fa698,5135 +- amazonka-s3-1.6.1@sha256:9d07240fca59ad5197fb614ce3051e701e4951e6d4625a2dab4a9c17a1900194,6317 # Override default flag values for local packages and extra-deps flags: {}