diff --git a/time-manager/ChangeLog.md b/time-manager/ChangeLog.md index 31ead95a5..2a402f2d4 100644 --- a/time-manager/ChangeLog.md +++ b/time-manager/ChangeLog.md @@ -1,5 +1,11 @@ # ChangeLog for time-manager +## 0.2.0 + +* Providing `System.ThreadManager`. +* `withHandle` catches `TimeoutThread` internally. + It returns `Nothing` on timeout. + ## 0.1.3 * Providing `withHandle` and `withHandleKillThread`. diff --git a/time-manager/System/ThreadManager.hs b/time-manager/System/ThreadManager.hs new file mode 100644 index 000000000..ed0ad6157 --- /dev/null +++ b/time-manager/System/ThreadManager.hs @@ -0,0 +1,211 @@ +{-# LANGUAGE CPP #-} +{-# LANGUAGE RankNTypes #-} +{-# LANGUAGE ScopedTypeVariables #-} + +-- | A thread manager including a time manager. +-- The manager has responsibility to kill managed threads. +module System.ThreadManager ( + ThreadManager, + newThreadManager, + stopAfter, + + -- * Fork + forkManaged, + forkManagedFinally, + forkManagedUnmask, + forkManagedTimeout, + forkManagedTimeoutFinally, + + -- * Synchronization + waitUntilAllGone, + + -- * Re-exports + T.Manager, + withHandle, + T.Handle, + T.tickle, + T.pause, + T.resume, +) where + +import Control.Concurrent +import Control.Concurrent.STM +import Control.Exception (Exception (..), SomeException (..)) +import qualified Control.Exception as E +import Control.Monad (unless, void) +import Data.Foldable (forM_) +import Data.IORef +import Data.Map.Strict (Map) +import qualified Data.Map.Strict as Map +import Data.Word (Word64) +import GHC.Conc.Sync (labelThread) +#if __GLASGOW_HASKELL__ >= 908 +import GHC.Conc.Sync (fromThreadId) +#endif +import System.Mem.Weak (Weak, deRefWeak) +import qualified System.TimeManager as T + +---------------------------------------------------------------- + +-- | Manager to manage the thread and the timer. +data ThreadManager = ThreadManager T.Manager (TVar ManagedThreads) + +type Key = Word64 +type ManagedThreads = Map Key ManagedThread + +---------------------------------------------------------------- + +-- 'IORef' prevents race between WAI TimeManager (TimeoutThread) +-- and stopAfter (KilledByThreadManager). +-- It is initialized with 'False' and turned into 'True' when locked. +-- The winner can throw an asynchronous exception. +data ManagedThread = ManagedThread (Weak ThreadId) (IORef Bool) + +---------------------------------------------------------------- + +-- | Starting a thread manager. +-- Its action is initially set to 'return ()' and should be set +-- by 'setAction'. This allows that the action can include +-- the manager itself. +newThreadManager :: T.Manager -> IO ThreadManager +newThreadManager timmgr = ThreadManager timmgr <$> newTVarIO Map.empty + +---------------------------------------------------------------- + +-- | An exception used internally to kill a managed thread. +data KilledByThreadManager = KilledByThreadManager (Maybe SomeException) + deriving (Show) + +instance Exception KilledByThreadManager where + toException = E.asyncExceptionToException + fromException = E.asyncExceptionFromException + +-- | Stopping the manager. +-- +-- The action is run in the scope of an exception handler that catches all +-- exceptions (including asynchronous ones); this allows the cleanup handler +-- to cleanup in all circumstances. If an exception is caught, it is rethrown +-- after the cleanup is complete. +stopAfter :: ThreadManager -> IO a -> (Maybe SomeException -> IO ()) -> IO a +stopAfter (ThreadManager _timmgr var) action cleanup = do + E.mask $ \unmask -> do + ma <- E.try $ unmask action + m <- atomically $ do + m0 <- readTVar var + writeTVar var Map.empty + return m0 + let ths = Map.elems m + er = either Just (const Nothing) ma + ex = KilledByThreadManager er + forM_ ths $ \(ManagedThread wtid ref) -> lockAndKill wtid ref ex + case ma of + Left err -> cleanup (Just err) >> E.throwIO err + Right a -> cleanup Nothing >> return a + +---------------------------------------------------------------- + +-- | Fork a managed thread. +-- +-- This guarantees that the thread ID is added to the manager's queue before +-- the thread starts, and is removed again when the thread terminates +-- (normally or abnormally). +forkManaged + :: ThreadManager + -> String + -- ^ Thread name + -> IO () + -- ^ Action + -> IO () +forkManaged mgr label io = + forkManagedUnmask mgr label $ \unmask -> unmask io + +-- | Like 'forkManaged', but run action with exceptions masked +forkManagedUnmask + :: ThreadManager -> String -> ((forall x. IO x -> IO x) -> IO ()) -> IO () +forkManagedUnmask (ThreadManager _timmgr var) label io = + void $ E.mask_ $ forkIOWithUnmask $ \unmask -> E.handle ignore $ do + labelMe label + E.bracket (setup var) (clear var) $ \_ -> io unmask + +-- | Fork a managed thread with a handle created by a timeout manager. +forkManagedTimeout :: ThreadManager -> String -> (T.Handle -> IO ()) -> IO () +forkManagedTimeout (ThreadManager timmgr var) label io = + void $ forkIO $ E.handle ignore $ do + labelMe label + E.bracket (setup var) (clear var) $ \(_n, wtid, ref) -> + -- 'TimeoutThread' is ignored by 'withHandle'. + void $ T.withHandle timmgr (lockAndKill wtid ref T.TimeoutThread) io + +-- | Fork a managed thread with a cleanup function. +forkManagedFinally :: ThreadManager -> String -> IO () -> IO () -> IO () +forkManagedFinally mgr label io final = E.mask $ \restore -> + forkManaged + mgr + label + (E.try (restore io) >>= \(_ :: Either E.SomeException ()) -> final) + +-- | Fork a managed thread with a handle created by a timeout manager +-- and with a cleanup function. +forkManagedTimeoutFinally + :: ThreadManager -> String -> (T.Handle -> IO ()) -> IO () -> IO () +forkManagedTimeoutFinally mgr label io final = E.mask $ \restore -> + forkManagedTimeout + mgr + label + (\th -> E.try (restore $ io th) >>= \(_ :: Either E.SomeException ()) -> final) + +setup :: TVar (Map Key ManagedThread) -> IO (Key, Weak ThreadId, IORef Bool) +setup var = do + (wtid, n) <- myWeakThradId + ref <- newIORef False + let ent = ManagedThread wtid ref + -- asking to throw KilledByThreadManager to me + atomically $ modifyTVar' var $ Map.insert n ent + return (n, wtid, ref) + +lockAndKill :: Exception e => Weak ThreadId -> IORef Bool -> e -> IO () +lockAndKill wtid ref e = do + alreadyLocked <- atomicModifyIORef' ref (\b -> (True, b)) -- try to lock + unless alreadyLocked $ do + mtid <- deRefWeak wtid + case mtid of + Nothing -> return () + Just tid -> E.throwTo tid e + +clear + :: TVar (Map Key ManagedThread) + -> (Key, Weak ThreadId, IORef Bool) + -> IO () +clear var (n, _, _) = atomically $ modifyTVar' var $ Map.delete n + +ignore :: KilledByThreadManager -> IO () +ignore (KilledByThreadManager _) = return () + +-- | Wait until all managed thread are finished. +waitUntilAllGone :: ThreadManager -> IO () +waitUntilAllGone (ThreadManager _timmgr var) = atomically $ do + m <- readTVar var + check (Map.size m == 0) + +---------------------------------------------------------------- + +myWeakThradId :: IO (Weak ThreadId, Key) +myWeakThradId = do + tid <- myThreadId + wtid <- mkWeakThreadId tid + let n = fromThreadId tid + return (wtid, n) + +labelMe :: String -> IO () +labelMe l = do + tid <- myThreadId + labelThread tid l + +withHandle + :: ThreadManager -> T.TimeoutAction -> (T.Handle -> IO a) -> IO (Maybe a) +withHandle (ThreadManager timmgr _) = T.withHandle timmgr + +#if __GLASGOW_HASKELL__ < 908 +fromThreadId :: ThreadId -> Word64 +fromThreadId tid = read (drop 9 $ show tid) +#endif diff --git a/time-manager/System/TimeManager.hs b/time-manager/System/TimeManager.hs index c7ce39bb5..0f2a87703 100644 --- a/time-manager/System/TimeManager.hs +++ b/time-manager/System/TimeManager.hs @@ -1,5 +1,6 @@ {-# LANGUAGE BangPatterns #-} {-# LANGUAGE DeriveDataTypeable #-} +{-# LANGUAGE RecordWildCards #-} module System.TimeManager ( -- ** Types @@ -18,7 +19,7 @@ module System.TimeManager ( withHandle, withHandleKillThread, - -- ** Control + -- ** Control timeout tickle, pause, resume, @@ -38,7 +39,7 @@ import Control.Reaper import Data.IORef (IORef) import qualified Data.IORef as I import Data.Typeable (Typeable) -import GHC.Weak (deRefWeak) +import System.Mem.Weak (deRefWeak) ---------------------------------------------------------------- @@ -48,8 +49,12 @@ type Manager = Reaper [Handle] Handle -- | An action to be performed on timeout. type TimeoutAction = IO () --- | A handle used by 'Manager' -data Handle = Handle Manager !(IORef TimeoutAction) !(IORef State) +-- | A handle used by a timeout manager. +data Handle = Handle + { handleManager :: Manager + , handleActionRef :: IORef TimeoutAction + , handleStateRef :: IORef State + } data State = Active -- Manager turns it to Inactive. @@ -64,17 +69,19 @@ initialize :: Int -> IO Manager initialize timeout = mkReaper defaultReaperSettings - { reaperAction = mkListAction prune + { -- Data.Set cannot be used since 'partition' cannot be used + -- with 'readIORef`. So, let's just use a list. + reaperAction = mkListAction prune , reaperDelay = timeout , reaperThreadName = "WAI timeout manager (Reaper)" } where - prune m@(Handle _ actionRef stateRef) = do - state <- I.atomicModifyIORef' stateRef (\x -> (inactivate x, x)) + prune m@Handle{..} = do + state <- I.atomicModifyIORef' handleStateRef (\x -> (inactivate x, x)) case state of Inactive -> do - onTimeout <- I.readIORef actionRef - onTimeout `E.catch` ignoreAll + onTimeout <- I.readIORef handleActionRef + onTimeout `E.catch` ignoreSync return Nothing _ -> return $ Just m @@ -87,12 +94,9 @@ initialize timeout = stopManager :: Manager -> IO () stopManager mgr = E.mask_ (reaperStop mgr >>= mapM_ fire) where - fire (Handle _ actionRef _) = do - onTimeout <- I.readIORef actionRef - onTimeout `E.catch` ignoreAll - -ignoreAll :: E.SomeException -> IO () -ignoreAll _ = return () + fire Handle{..} = do + onTimeout <- I.readIORef handleActionRef + onTimeout `E.catch` ignoreSync -- | Killing timeout manager immediately without firing onTimeout. killManager :: Manager -> IO () @@ -102,17 +106,21 @@ killManager = reaperKill -- | Registering a timeout action and unregister its handle -- when the body action is finished. -withHandle :: Manager -> TimeoutAction -> (Handle -> IO a) -> IO a +-- 'Nothing' is returned on timeout. +withHandle :: Manager -> TimeoutAction -> (Handle -> IO a) -> IO (Maybe a) withHandle mgr onTimeout action = - E.bracket (register mgr onTimeout) cancel action + E.handle ignore $ E.bracket (register mgr onTimeout) cancel $ \th -> + Just <$> action th + where + ignore TimeoutThread = return Nothing -- | Registering a timeout action of killing this thread and -- unregister its handle when the body action is killed or finished. withHandleKillThread :: Manager -> TimeoutAction -> (Handle -> IO ()) -> IO () withHandleKillThread mgr onTimeout action = - E.handle handler $ E.bracket (registerKillThread mgr onTimeout) cancel action + E.handle ignore $ E.bracket (registerKillThread mgr onTimeout) cancel action where - handler TimeoutThread = return () + ignore TimeoutThread = return () ---------------------------------------------------------------- @@ -121,21 +129,26 @@ register :: Manager -> TimeoutAction -> IO Handle register mgr !onTimeout = do actionRef <- I.newIORef onTimeout stateRef <- I.newIORef Active - let h = Handle mgr actionRef stateRef + let h = + Handle + { handleManager = mgr + , handleActionRef = actionRef + , handleStateRef = stateRef + } reaperAdd mgr h return h -- | Removing the 'Handle' from the 'Manager' immediately. cancel :: Handle -> IO () -cancel (Handle mgr _ stateRef) = do - _ <- reaperModify mgr filt +cancel Handle{..} = do + _ <- reaperModify handleManager filt return () where -- It's very important that this function forces the whole workload so we -- don't retain old handles, otherwise disasterous leaks occur. filt [] = [] - filt (h@(Handle _ _ stateRef') : hs) - | stateRef == stateRef' = hs + filt (h@(Handle _ _ ref) : hs) + | handleStateRef == ref = hs | otherwise = let !hs' = filt hs in h : hs' @@ -174,12 +187,12 @@ registerKillThread m onTimeout = do -- | Setting the state to active. -- 'Manager' turns active to inactive repeatedly. tickle :: Handle -> IO () -tickle (Handle _ _ stateRef) = I.writeIORef stateRef Active +tickle Handle{..} = I.writeIORef handleStateRef Active -- | Setting the state to paused. -- 'Manager' does not change the value. pause :: Handle -> IO () -pause (Handle _ _ stateRef) = I.writeIORef stateRef Paused +pause Handle{..} = I.writeIORef handleStateRef Paused -- | Setting the paused state to active. -- This is an alias to 'tickle'. @@ -213,3 +226,16 @@ withManager' timeout f = (initialize timeout) killManager f + +---------------------------------------------------------------- + +isAsyncException :: E.Exception e => e -> Bool +isAsyncException e = + case E.fromException (E.toException e) of + Just (E.SomeAsyncException _) -> True + Nothing -> False + +ignoreSync :: E.SomeException -> IO () +ignoreSync se + | isAsyncException se = E.throwIO se + | otherwise = return () diff --git a/time-manager/time-manager.cabal b/time-manager/time-manager.cabal index 26eb0c35b..33f576ebc 100644 --- a/time-manager/time-manager.cabal +++ b/time-manager/time-manager.cabal @@ -1,5 +1,5 @@ Name: time-manager -Version: 0.1.3 +Version: 0.2.0 Synopsis: Scalable timer License: MIT License-file: LICENSE @@ -10,12 +10,17 @@ Category: System Build-Type: Simple Cabal-Version: >=1.10 Stability: Stable -Description: Scalable timer functions provided by a timer manager. +Description: Scalable timer functions provided by a timer manager + and thread management functions to prevent thread + leak by a thread manager. Extra-Source-Files: ChangeLog.md Library Build-Depends: base >= 4.12 && < 5 , auto-update >= 0.2 && < 0.3 + , containers + , stm Default-Language: Haskell2010 Exposed-modules: System.TimeManager + Exposed-modules: System.ThreadManager Ghc-Options: -Wall diff --git a/warp/warp.cabal b/warp/warp.cabal index f5c9b0c46..7740f1e7c 100644 --- a/warp/warp.cabal +++ b/warp/warp.cabal @@ -102,7 +102,7 @@ library stm >=2.3, streaming-commons >=0.1.10, text, - time-manager >=0.1.3 && <0.2, + time-manager >=0.2 && <0.3, vault >=0.3, wai >=3.2.4 && <3.3, word8