diff --git a/dev/interop.md b/dev/interop.md index f165a87f..c37b243f 100644 --- a/dev/interop.md +++ b/dev/interop.md @@ -240,12 +240,7 @@ $ docker stop $(docker ps -q) > > The only exception is `timeout_on_sleeping_server` for Java: it seems that the > server does not conform to the gRPC specification here, and simply closes the -> connection without sending `DEADLINE_EXCEEDED` to the client. The Java -> _client_ does not notice this because (it seems) it imposes a _local_ timeout -> also, and doesn't even _connect_ to the server: the test even passes without -> the server running at all. (And indeed, the other clients don't seem precise -> enough here: the `grapesy` server was passing this test before it implemented -> timeouts _at all_.) +> connection without sending `DEADLINE_EXCEEDED` to the client. It is also possible to only run one specific test case, for example: diff --git a/grapesy/interop/Main.hs b/grapesy/interop/Main.hs index d893fd2b..bf13912c 100644 --- a/grapesy/interop/Main.hs +++ b/grapesy/interop/Main.hs @@ -15,13 +15,9 @@ import Interop.Server (runInteropServer) main :: IO () main = do - -- TODO: - -- - -- Under some circumstances we can see a "Thread killed" exception when a - -- test fails. By setting this handler, we can at least confirm that this - -- exception is coming from an uncaught exception in a thread. - setUncaughtExceptionHandler $ \err -> + setUncaughtExceptionHandler $ \err -> do hPutStrLn stderr $ "Uncaught exception: " ++ show err + hFlush stderr -- Ensure we see server output when running inside docker hSetBuffering stdout NoBuffering diff --git a/grapesy/src/Network/GRPC/Client/Call.hs b/grapesy/src/Network/GRPC/Client/Call.hs index c071560a..a97fc894 100644 --- a/grapesy/src/Network/GRPC/Client/Call.hs +++ b/grapesy/src/Network/GRPC/Client/Call.hs @@ -29,7 +29,9 @@ module Network.GRPC.Client.Call ( , recvInitialResponse ) where +import Control.Concurrent import Control.Concurrent.STM +import Control.Concurrent.Thread.Delay qualified as UnboundedDelays import Control.Monad import Control.Monad.Catch import Control.Monad.IO.Class @@ -104,143 +106,23 @@ data Call rpc = SupportsClientRpc rpc => Call { -- If there are still /inbound/ messages upon leaving the scope of 'withRPC' no -- exception is raised (but the call is nonetheless still closed, and the server -- handler will be informed that the client has disappeared). +-- +-- Note on timeouts: if a timeout is specified for the call (either through +-- 'callTimeout' or through 'connDefaultTimeout'), when the timeout is reached +-- the RPC is cancelled; any further attempts to receive or send messages will +-- result in a 'GrpcException' with 'GrpcDeadlineExceeded'. As per the gRPC +-- specification, this does /not/ rely on the server; this does mean that the +-- same deadline also applies if the /client/ is slow (rather than the server). withRPC :: forall rpc m a. (MonadMask m, MonadIO m, SupportsClientRpc rpc, HasCallStack) => Connection -> CallParams rpc -> Proxy rpc -> (Call rpc -> m a) -> m a withRPC conn callParams proxy k = fmap fst $ - generalBracket - (liftIO $ startRPC conn proxy callParams) - closeRPC - (k . fst) - where - closeRPC :: (Call rpc, Session.CancelRequest) -> ExitCase a -> m () - closeRPC (call, cancelRequest) exitCase = liftIO $ do - -- /Before/ we do anything else (see below), check if we have evidence - -- that we can discard the connection. - canDiscard <- checkCanDiscard call - - -- Send the RST_STREAM frame /before/ closing the outbound thread. - -- - -- When we call 'Session.close', we will terminate the - -- 'sendMessageLoop', @http2@ will interpret this as a clean termination - -- of the stream. We must therefore cancel this stream before calling - -- 'Session.close'. /If/ the final message has already been sent, - -- @http2@ guarantees (as a postcondition of @outBodyPushFinal@) that - -- cancellation will be a no-op. - sendResetFrame cancelRequest exitCase - - -- Now close the /outbound/ thread, see docs of 'Session.close' for - -- details. - mException <- liftIO $ Session.close (callChannel call) exitCase - case mException of - Nothing -> - -- The outbound thread had already terminated - return () - Just ex -> - case fromException ex of - Nothing -> - -- We are leaving the scope of 'withRPC' because of an exception - -- in the client, just rethrow that exception. - throwM ex - Just discarded -> - -- We are leaving the scope of 'withRPC' without having sent the - -- final message. - -- - -- If the server was closed before we cancelled the stream, this - -- means that the server unilaterally closed the connection. - -- This should be regarded as normal termination of the RPC (see - -- the docs for 'withRPC') - -- - -- Otherwise, the client left the scope of 'withRPC' before the - -- RPC was complete, which the gRPC spec mandates to result in a - -- 'GrpcCancelled' exception. See docs of 'throwCancelled'. - unless canDiscard $ - throwCancelled discarded - - -- Send a @RST_STREAM@ frame if necessary - sendResetFrame :: Session.CancelRequest -> ExitCase a -> IO () - sendResetFrame cancelRequest exitCase = - cancelRequest $ - case exitCase of - ExitCaseSuccess _ -> - -- Error code will be CANCEL - Nothing - ExitCaseAbort -> - -- Error code will be INTERNAL_ERROR. The client aborted with an - -- error that we don't have access to. We want to tell the server - -- that something has gone wrong (i.e. INTERNAL_ERROR), so we must - -- pass an exception, however the exact nature of the exception is - -- not particularly important as it is only recorded locally. - Just . toException $ Session.ChannelAborted callStack - ExitCaseException e -> - -- Error code will be INTERNAL_ERROR - Just e - - -- The spec mandates that when a client cancels a request (which in grapesy - -- means exiting the scope of withRPC), the client receives a CANCELLED - -- exception. We need to deal with the edge case mentioned above, however: - -- the server might have already closed the connection. The client must have - -- evidence that this is the case, which could mean one of two things: - -- - -- o The client received the final message from the server - -- o The server threw an exception (and the client saw this) - -- - -- We can check for the former using 'channelRecvFinal', and the latter - -- using 'hasThreadTerminated'. By checking both, we avoid race conditions: - -- - -- o If the client received the final message, 'channelRecvFinal' /will/ - -- have been updated (we update this in the same transaction that returns - -- the actual element; see 'Network.GRPC.Util.Session.Channel.recv'). - -- o If the server threw an exception, and the client observed this, then - -- the inbound thread state /must/ have changed to 'ThreadException'. - -- - -- Note that it is /not/ sufficient to check if the inbound thread has - -- terminated: we might have received the final message, but the thread - -- might still be /about/ to terminate, but not /actually/ have terminated. - -- - -- See also: - -- - -- o - -- o - throwCancelled :: ChannelDiscarded -> IO () - throwCancelled (ChannelDiscarded cs) = do - throwM $ GrpcException { - grpcError = GrpcCancelled - , grpcErrorMessage = Just $ mconcat [ - "Channel discarded by client at " - , Text.pack $ prettyCallStack cs - ] - , grpcErrorMetadata = [] - } - - checkCanDiscard :: Call rpc -> IO Bool - checkCanDiscard Call{callChannel} = do - mRecvFinal <- atomically $ - readTVar $ Session.channelRecvFinal callChannel - let onNotRunning :: STM () - onNotRunning = return () - mTerminated <- atomically $ - Thread.getThreadState_ - (Session.channelInbound callChannel) - onNotRunning - return $ - or [ - case mRecvFinal of - Session.RecvNotFinal -> False - Session.RecvWithoutTrailers _ -> True - Session.RecvFinal _ -> True - - -- We are checking if we have evidence that we can discard the - -- channel. If the inbound thread is not yet running, this implies - -- that the server has not yet initiated their response to us, - -- which means we have no evidence to believe we can discard the - -- channel. - , case mTerminated of - Thread.ThreadNotYetRunning_ () -> False - Thread.ThreadRunning_ -> False - Thread.ThreadDone_ -> True - Thread.ThreadException_ _ -> True - ] + generalBracket + (liftIO $ + startRPC conn proxy callParams) + (\(Call{callChannel}, cancelRequest) exitCase -> liftIO $ + closeRPC callChannel cancelRequest exitCase) + (k . fst) -- | Open new channel to the server -- @@ -280,6 +162,67 @@ startRPC conn _ callParams = do serverClosedConnection flowStart + -- The spec mandates that + -- + -- > If a server has gone past the deadline when processing a request, the + -- > client will give up and fail the RPC with the DEADLINE_EXCEEDED status. + -- + -- and also that the deadline applies when when wait-for-ready semantics is + -- used. + -- + -- We have to be careful implementing this. In particular, we definitely + -- don't want to impose the timeout on the /client/ (that is, we should not + -- force the client to exit the scope of 'withRPC' within the timeout). + -- Instead, we work a thread that cancels the RPC after the timeout expires; + -- this means that /if/ the client that attempts to communicate with the + -- server after the timeout, only then will it receive an exception. + -- + -- The thread we spawn here is cleaned up by the monitor thread (below). + -- + -- See + -- + -- o + -- o + mClientSideTimeout <- + case callTimeout callParams of + Nothing -> return Nothing + Just t -> fmap Just $ forkLabelled "grapesy:clientSideTimeout" $ do + UnboundedDelays.delay (timeoutToMicro t) + let timeout :: SomeException + timeout = toException $ GrpcException { + grpcError = GrpcDeadlineExceeded + , grpcErrorMessage = Nothing + , grpcErrorMetadata = [] + } + + -- We recognized client-side that the timeout we imposed on the server + -- has passed. Acting on this is however tricky: + -- + -- o A call to 'closeRPC' will only terminate the /outbound/ thread; + -- the idea is the inbound thread might still be reading in-flight + -- messages, and it will terminate once the last message is read or + -- the thread notices a broken connection. + -- o Unfortunately, this does not work in the timeout case: /if/ the + -- outbound thread has not yet terminated (that is, the client has + -- not yet sent their final message), then calling 'closeRPC' will + -- result in a RST_STREAM being sent to the server, which /should/ + -- result in the inbound connection being closed also, but may not, + -- in the case of a non-compliant server. + -- o Worse, if the client /did/ already send their final message, the + -- outbound thread has already terminated, no RST_STREAM will be + -- sent, and the we will continue to wait for messages from the + -- server. + -- + -- Ideally we'd inform the receiving thread that a timeout has been + -- reached and to "continue until it would block", but that is hard + -- to do. So instead we just kill the receiving thread, which means + -- that once the timeout is reached, the client will not be able to + -- receive any further messages (even if that is because the /client/ + -- was slow, rather than the server). + + void $ Thread.cancelThread (Session.channelInbound channel) timeout + closeRPC channel cancelRequest $ ExitCaseException timeout + -- Spawn a thread to monitor the connection, and close the new channel when -- the connection is closed. To prevent a memory leak by hanging on to the -- channel for the lifetime of the connection, the thread also terminates in @@ -287,9 +230,10 @@ startRPC conn _ callParams = do _ <- forkLabelled "grapesy:monitorConnection" $ do status <- atomically $ do (Left <$> Thread.waitForNormalOrAbnormalThreadTermination - (Session.channelOutbound channel)) + (Session.channelInbound channel)) `orElse` (Right <$> readTMVar connClosed) + forM_ mClientSideTimeout killThread case status of Left _ -> return () -- Channel closed before the connection Right mErr -> do @@ -347,6 +291,142 @@ startRPC conn _ callParams = do clientConnection = conn } +-- | Close the RPC (internal API only) +-- +-- This is more subtle than one might think. The spec mandates that when a +-- client cancels a request (which in grapesy means exiting the scope of +-- withRPC), the client receives a CANCELLED exception. We need to deal with the +-- edge case mentioned in 'withRPC', however: the server might have already +-- closed the connection. The client must have evidence that this is the case, +-- which could mean one of two things: +-- +-- o The client received the final message from the server +-- o The server threw an exception (and the client saw this) +-- +-- We can check for the former using 'channelRecvFinal', and the latter using +-- 'hasThreadTerminated'. By checking both, we avoid race conditions: +-- +-- o If the client received the final message, 'channelRecvFinal' /will/ have +-- been updated (we update this in the same transaction that returns the +-- actual element; see 'Network.GRPC.Util.Session.Channel.recv'). +-- o If the server threw an exception, and the client observed this, then the +-- inbound thread state /must/ have changed to 'ThreadException'. +-- +-- Note that it is /not/ sufficient to check if the inbound thread has +-- terminated: we might have received the final message, but the thread might +-- still be /about/ to terminate, but not /actually/ have terminated. +-- +-- See also: +-- +-- o +-- o +closeRPC :: + Session.Channel rpc + -> Session.CancelRequest + -> ExitCase a + -> IO () +closeRPC callChannel cancelRequest exitCase = liftIO $ do + -- /Before/ we do anything else (see below), check if we have evidence + -- that we can discard the connection. + canDiscard <- checkCanDiscard + + -- Send the RST_STREAM frame /before/ closing the outbound thread. + -- + -- When we call 'Session.close', we will terminate the + -- 'sendMessageLoop', @http2@ will interpret this as a clean termination + -- of the stream. We must therefore cancel this stream before calling + -- 'Session.close'. /If/ the final message has already been sent, + -- @http2@ guarantees (as a postcondition of @outBodyPushFinal@) that + -- cancellation will be a no-op. + sendResetFrame + + -- Now close the /outbound/ thread, see docs of 'Session.close' for + -- details. + mException <- liftIO $ Session.close callChannel exitCase + case mException of + Nothing -> + -- The outbound thread had already terminated + return () + Just ex -> + case fromException ex of + Nothing -> + -- We are leaving the scope of 'withRPC' because of an exception + -- in the client, just rethrow that exception. + throwM ex + Just discarded -> + -- We are leaving the scope of 'withRPC' without having sent the + -- final message. + -- + -- If the server was closed before we cancelled the stream, this + -- means that the server unilaterally closed the connection. + -- This should be regarded as normal termination of the RPC (see + -- the docs for 'withRPC') + -- + -- Otherwise, the client left the scope of 'withRPC' before the + -- RPC was complete, which the gRPC spec mandates to result in a + -- 'GrpcCancelled' exception. See docs of 'throwCancelled'. + unless canDiscard $ + throwCancelled discarded + where + -- Send a @RST_STREAM@ frame if necessary + sendResetFrame :: IO () + sendResetFrame = + cancelRequest $ + case exitCase of + ExitCaseSuccess _ -> + -- Error code will be CANCEL + Nothing + ExitCaseAbort -> + -- Error code will be INTERNAL_ERROR. The client aborted with an + -- error that we don't have access to. We want to tell the server + -- that something has gone wrong (i.e. INTERNAL_ERROR), so we must + -- pass an exception, however the exact nature of the exception is + -- not particularly important as it is only recorded locally. + Just . toException $ Session.ChannelAborted callStack + ExitCaseException e -> + -- Error code will be INTERNAL_ERROR + Just e + + throwCancelled :: ChannelDiscarded -> IO () + throwCancelled (ChannelDiscarded cs) = do + throwM $ GrpcException { + grpcError = GrpcCancelled + , grpcErrorMessage = Just $ mconcat [ + "Channel discarded by client at " + , Text.pack $ prettyCallStack cs + ] + , grpcErrorMetadata = [] + } + + checkCanDiscard :: IO Bool + checkCanDiscard = do + mRecvFinal <- atomically $ + readTVar $ Session.channelRecvFinal callChannel + let onNotRunning :: STM () + onNotRunning = return () + mTerminated <- atomically $ + Thread.getThreadState_ + (Session.channelInbound callChannel) + onNotRunning + return $ + or [ + case mRecvFinal of + Session.RecvNotFinal -> False + Session.RecvWithoutTrailers _ -> True + Session.RecvFinal _ -> True + + -- We are checking if we have evidence that we can discard the + -- channel. If the inbound thread is not yet running, this implies + -- that the server has not yet initiated their response to us, + -- which means we have no evidence to believe we can discard the + -- channel. + , case mTerminated of + Thread.ThreadNotYetRunning_ () -> False + Thread.ThreadRunning_ -> False + Thread.ThreadDone_ -> True + Thread.ThreadException_ _ -> True + ] + {------------------------------------------------------------------------------- Open (ongoing) call -------------------------------------------------------------------------------} diff --git a/grapesy/src/Network/GRPC/Util/Session/Channel.hs b/grapesy/src/Network/GRPC/Util/Session/Channel.hs index 5b02206c..5288f748 100644 --- a/grapesy/src/Network/GRPC/Util/Session/Channel.hs +++ b/grapesy/src/Network/GRPC/Util/Session/Channel.hs @@ -408,7 +408,7 @@ waitForOutbound Channel{channelOutbound} = atomically $ -- Not doing so is considered a bug (it is not possible to do this implicitly, -- because the final call to 'send' involves a choice of trailers, and calling -- 'waitForOutbound' /without/ a final close to 'send' will result in deadlock). --- Typically code will also process all /incoming/ messages, but doing so of +-- Typically code will also process all /incoming/ messages, but doing so is of -- course not mandatory. -- -- Calling 'close' will kill the outbound thread ('sendMessageLoop'), /if/ it is @@ -431,7 +431,7 @@ close :: close Channel{channelOutbound} reason = do -- We leave the inbound thread running. Although the channel is closed, -- there might still be unprocessed messages in the queue. The inbound - -- thread will terminate once it reaches the end of the queue + -- thread will terminate once it reaches the end of the queue. outbound <- cancelThread channelOutbound channelClosed case outbound of AlreadyTerminated _ -> diff --git a/grapesy/test-grapesy/Test/Sanity/BrokenDeployments.hs b/grapesy/test-grapesy/Test/Sanity/BrokenDeployments.hs index 58b63157..f7f8cdcb 100644 --- a/grapesy/test-grapesy/Test/Sanity/BrokenDeployments.hs +++ b/grapesy/test-grapesy/Test/Sanity/BrokenDeployments.hs @@ -5,6 +5,7 @@ module Test.Sanity.BrokenDeployments (tests) where +import Control.Concurrent import Control.Exception import Data.ByteString.Char8 qualified as BS.Strict.Char8 import Data.ByteString.UTF8 qualified as BS.Strict.UTF8 @@ -54,6 +55,9 @@ tests = testGroup "Test.Sanity.BrokenDeployments" [ , testGroup "Undefined" [ testCase "output" test_undefinedOutput ] + , testGroup "Timeout" [ + testCase "serverIgnoresTimeout" test_serverIgnoresTimeout + ] ] connParams :: Client.ConnParams @@ -380,3 +384,40 @@ test_undefinedOutput = do if isFirst then return $ throw $ DeliberateException (userError "uhoh") else return $ defMessage & #id .~ req ^. #id +{------------------------------------------------------------------------------- + Timeouts +-------------------------------------------------------------------------------} + +-- | Check that timeouts don't depend on the server +-- +-- When a timeout is set for an RPC, the server should respect it, but the +-- client should not /depend/ on the server respecting it. +-- +-- See also . +test_serverIgnoresTimeout :: Assertion +test_serverIgnoresTimeout = respondWithIO response $ \addr -> do + mResp :: Either GrpcException + (StreamElem NoMetadata (Proto PongMessage)) <- try $ + Client.withConnection connParams (Client.ServerInsecure addr) $ \conn -> + Client.withRPC conn callParams (Proxy @Ping) $ \call -> do + Client.sendFinalInput call defMessage + Client.recvOutput call + case mResp of + Left e | grpcError e == GrpcDeadlineExceeded -> + return () + Left e -> + assertFailure $ "unexpected error: " ++ show e + Right _ -> + assertFailure "Timeout did not trigger" + where + response :: IO Response + response = do + threadDelay 10_000_000 + return def + + callParams :: Client.CallParams Ping + callParams = def { + Client.callTimeout = Just $ + Client.Timeout Client.Millisecond (Client.TimeoutValue 100) + } + diff --git a/grapesy/test-grapesy/Test/Util/RawTestServer.hs b/grapesy/test-grapesy/Test/Util/RawTestServer.hs index 03362f23..a3b465fe 100644 --- a/grapesy/test-grapesy/Test/Util/RawTestServer.hs +++ b/grapesy/test-grapesy/Test/Util/RawTestServer.hs @@ -1,6 +1,7 @@ module Test.Util.RawTestServer ( -- * Raw test server respondWith + , respondWithIO -- * Abstract response type , Response(..) @@ -35,7 +36,7 @@ withTestServer server k = do ServerConfig { serverInsecure = Just $ InsecureConfig { insecureHost = Just "127.0.0.1" - , insecurePort = 0 + , insecurePort = 50051 } , serverSecure = Nothing } @@ -51,7 +52,12 @@ withTestServer server k = do -- | Server that responds with the given 'Response', independent of the request respondWith :: Response -> (Client.Address -> IO a) -> IO a -respondWith response = withTestServer $ \_req _aux respond -> +respondWith resp = respondWithIO (return resp) + +-- | Version of 'respondWith' that constructs the response +respondWithIO :: IO Response -> (Client.Address -> IO a) -> IO a +respondWithIO mkResponse = withTestServer $ \_req _aux respond -> do + response <- mkResponse respond (toHTTP2Response response) [] data Response = Response {