diff --git a/src/Development/Shake/Internal/Core/Build.hs b/src/Development/Shake/Internal/Core/Build.hs index d25ee8153..6e812121c 100644 --- a/src/Development/Shake/Internal/Core/Build.hs +++ b/src/Development/Shake/Internal/Core/Build.hs @@ -102,7 +102,10 @@ buildOne global@Global{..} stack database i k r = case addStack i k stack of Right stack -> Later $ \continue -> do setIdKeyStatus global database i k (Running (NoShow continue) r) let go = buildRunMode global stack database r - fromLater go $ \mode -> liftIO $ addPool PoolStart globalPool $ + priority = case r of + Nothing -> PoolStart + Just (execution -> t) -> PoolEstimate t (show k) + fromLater go $ \mode -> liftIO $ addPool priority globalPool $ runKey global stack k r mode $ \res -> do runLocked database $ do let val = fmap runValue res diff --git a/src/General/Pool.hs b/src/General/Pool.hs index 6fe23a984..803674b7d 100644 --- a/src/General/Pool.hs +++ b/src/General/Pool.hs @@ -28,6 +28,8 @@ import qualified Data.Heap as Heap import qualified Data.HashSet as Set import Data.IORef.Extra import System.Random +import Debug.Trace +import GHC.Conc --------------------------------------------------------------------- @@ -41,22 +43,15 @@ If any worker throws an exception, must signal to all the other workers data S = S {alive :: !Bool -- True until there's an exception, after which don't spawn more tasks ,threads :: !(Set.HashSet Thread) -- IMPORTANT: Must be strict or we leak thread stacks - ,threadsLimit :: {-# UNPACK #-} !Int -- user supplied thread limit, Set.size threads <= threadsLimit ,threadsCount :: {-# UNPACK #-} !Int -- Set.size threads, but in O(1) ,threadsMax :: {-# UNPACK #-} !Int -- high water mark of Set.size threads (accounting only) ,threadsSum :: {-# UNPACK #-} !Int -- number of threads we have been through (accounting only) - ,rand :: IO Int -- operation to give us the next random Int - ,todo :: !(Heap.Heap (Heap.Entry (PoolPriority, Int) (IO ()))) -- operations waiting a thread } emptyS :: Int -> Bool -> IO S -emptyS n deterministic = do - rand <- if not deterministic then pure randomIO else do - ref <- newIORef 0 - -- no need to be thread-safe - if two threads race they were basically the same time anyway - pure $ do i <- readIORef ref; writeIORef' ref (i+1); pure i - pure $ S True Set.empty n 0 0 0 rand Heap.empty +emptyS n deterministic = + pure $ S True Set.empty 0 0 0 data Pool = Pool @@ -71,46 +66,37 @@ withPool (Pool var _) f = join $ modifyVar var $ \s -> withPool_ :: Pool -> (S -> IO S) -> IO () withPool_ pool act = withPool pool $ fmap (, pure()) . act - -worker :: Pool -> IO () -worker pool = withPool pool $ \s -> pure $ case Heap.uncons $ todo s of - Nothing -> (s, pure ()) - Just (Heap.Entry _ now, todo2) -> (s{todo = todo2}, now >> worker pool) - --- | Given a pool, and a function that breaks the S invariants, restore them. --- They are only allowed to touch threadsLimit or todo. --- Assumes only requires spawning a most one job (e.g. can't increase the pool by more than one at a time) -step :: Pool -> (S -> IO S) -> IO () --- mask_ is so we don't spawn and not record it -step pool@(Pool _ done) op = mask_ $ withPool_ pool $ \s -> do - s <- op s - case Heap.uncons $ todo s of - Just (Heap.Entry _ now, todo2) | threadsCount s < threadsLimit s -> do - -- spawn a new worker - t <- newThreadFinally (now >> worker pool) $ \t res -> case res of - Left e -> withPool_ pool $ \s -> do - signalBarrier done $ Left e - pure (remThread t s){alive = False} - Right _ -> - step pool $ pure . remThread t - pure (addThread t s){todo = todo2} - Nothing | threadsCount s == 0 -> do - signalBarrier done $ Right s - pure s{alive = False} - _ -> pure s - where - addThread t s = s{threads = Set.insert t $ threads s, threadsCount = threadsCount s + 1 - ,threadsSum = threadsSum s + 1, threadsMax = threadsMax s `max` (threadsCount s + 1)} - remThread t s = s{threads = Set.delete t $ threads s, threadsCount = threadsCount s - 1} - +threshold :: Float +threshold = 0.05 -- | Add a new task to the pool. See the top of the module for the relative ordering -- and semantics. addPool :: PoolPriority -> Pool -> IO a -> IO () -addPool priority pool act = step pool $ \s -> do - i <- rand s - pure s{todo = Heap.insert (Heap.Entry (priority, i) $ void act) $ todo s} - +addPool priority pool@(Pool _ done) act = + withPool_ pool $ \s -> do + traceEventIO $ "Scheduling event with priority: " ++ show priority + t <- newThreadFinally l mcap act $ \t res -> do + traceEventIO $ show l ++ " done." + case res of + Left e -> withPool_ pool $ \s -> do + signalBarrier done $ Left e + pure (remThread t s){alive = False} + Right _ -> withPool_ pool $ \s -> do + let s' = remThread t s + when (threadsCount s' == 0) $ + signalBarrier done $ Right s'{alive = False} + pure $ s'{alive = threadsCount s' /= 0} + pure (addThread t s) + where + addThread t s = s{threads = Set.insert t $ threads s, threadsCount = threadsCount s + 1 + ,threadsSum = threadsSum s + 1, threadsMax = threadsMax s `max` (threadsCount s + 1)} + remThread t s = s{threads = Set.delete t $ threads s, threadsCount = threadsCount s - 1} + mcap = case priority of + PoolEstimate t _ | t <= threshold -> Just 0 + _ -> Nothing + l = case priority of + PoolEstimate _ s -> s + _ -> "Unknown" data PoolPriority = PoolException @@ -118,15 +104,13 @@ data PoolPriority | PoolStart | PoolBatch | PoolDeprioritize Double - deriving (Eq,Ord) + | PoolEstimate { estimatedTime :: Float, label :: String } + deriving (Eq,Ord,Show) -- | Temporarily increase the pool by 1 thread. Call the cleanup action to restore the value. -- After calling cleanup you should requeue onto a new thread. increasePool :: Pool -> IO (IO ()) -increasePool pool = do - step pool $ \s -> pure s{threadsLimit = threadsLimit s + 1} - pure $ step pool $ \s -> pure s{threadsLimit = threadsLimit s - 1} - +increasePool pool = pure (pure ()) -- | Make sure the pool cannot run out of tasks (and thus everything finishes) until after the cancel is called. -- Ensures that a pool that will requeue in time doesn't go idle. @@ -139,7 +123,6 @@ keepAlivePool pool = do cancel pure $ signalBarrier bar () - -- | Run all the tasks in the pool on the given number of works. -- If any thread throws an exception, the exception will be reraised. runPool :: Bool -> Int -> (Pool -> IO ()) -> IO () -- run all tasks in the pool diff --git a/src/General/Thread.hs b/src/General/Thread.hs index bd5b32757..0e67c4ff0 100644 --- a/src/General/Thread.hs +++ b/src/General/Thread.hs @@ -1,4 +1,5 @@ {-# LANGUAGE ScopedTypeVariables #-} +{-# LANGUAGE RankNTypes #-} -- | A bit like 'Fence', but not thread safe and optimised for avoiding taking the fence module General.Thread( @@ -14,6 +15,7 @@ import Control.Concurrent.Extra import Control.Exception import General.Extra import Control.Monad.Extra +import GHC.Conc data Thread = Thread ThreadId (Barrier ()) @@ -25,15 +27,21 @@ instance Hashable Thread where hashWithSalt salt (Thread a _) = hashWithSalt salt a -- | The inner thread is unmasked even if you started masked. -newThreadFinally :: IO a -> (Thread -> Either SomeException a -> IO ()) -> IO Thread -newThreadFinally act cleanup = do +newThreadFinally :: String -> Maybe Int -> IO a -> (Thread -> Either SomeException a -> IO ()) -> IO Thread +newThreadFinally label mcap act cleanup = do bar <- newBarrier - t <- mask_ $ forkIOWithUnmask $ \unmask -> flip finally (signalBarrier bar ()) $ do - res <- try $ unmask act + t <- mask_ $ fork $ \unmask -> flip finally (signalBarrier bar ()) $ do me <- myThreadId + res <- try $ unmask act cleanup (Thread me bar) res + labelThread t $ label ++ labeltype pure $ Thread t bar - + where + labeltype = maybe "(Free)" (\i -> "(Restricted to "++show i++")") mcap + fork :: ((forall a. IO a -> IO a) -> IO ()) -> IO ThreadId + fork = case mcap of + Nothing -> forkIOWithUnmask + Just n -> forkOnWithUnmask n stopThreads :: [Thread] -> IO () stopThreads threads = do