@@ -28,6 +28,8 @@ import qualified Data.Heap as Heap
2828import qualified Data.HashSet as Set
2929import Data.IORef.Extra
3030import System.Random
31+ import Debug.Trace
32+ import GHC.Conc
3133
3234
3335---------------------------------------------------------------------
@@ -41,22 +43,15 @@ If any worker throws an exception, must signal to all the other workers
4143data S = S
4244 { alive :: ! Bool -- True until there's an exception, after which don't spawn more tasks
4345 ,threads :: ! (Set. HashSet Thread ) -- IMPORTANT: Must be strict or we leak thread stacks
44- ,threadsLimit :: {-# UNPACK #-} ! Int -- user supplied thread limit, Set.size threads <= threadsLimit
4546 ,threadsCount :: {-# UNPACK #-} ! Int -- Set.size threads, but in O(1)
4647 ,threadsMax :: {-# UNPACK #-} ! Int -- high water mark of Set.size threads (accounting only)
4748 ,threadsSum :: {-# UNPACK #-} ! Int -- number of threads we have been through (accounting only)
48- ,rand :: IO Int -- operation to give us the next random Int
49- ,todo :: ! (Heap. Heap (Heap. Entry (PoolPriority , Int ) (IO () ))) -- operations waiting a thread
5049 }
5150
5251
5352emptyS :: Int -> Bool -> IO S
54- emptyS n deterministic = do
55- rand <- if not deterministic then pure randomIO else do
56- ref <- newIORef 0
57- -- no need to be thread-safe - if two threads race they were basically the same time anyway
58- pure $ do i <- readIORef ref; writeIORef' ref (i+ 1 ); pure i
59- pure $ S True Set. empty n 0 0 0 rand Heap. empty
53+ emptyS n deterministic =
54+ pure $ S True Set. empty 0 0 0
6055
6156
6257data Pool = Pool
@@ -71,62 +66,51 @@ withPool (Pool var _) f = join $ modifyVar var $ \s ->
7166withPool_ :: Pool -> (S -> IO S ) -> IO ()
7267withPool_ pool act = withPool pool $ fmap (, pure () ) . act
7368
74-
75- worker :: Pool -> IO ()
76- worker pool = withPool pool $ \ s -> pure $ case Heap. uncons $ todo s of
77- Nothing -> (s, pure () )
78- Just (Heap. Entry _ now, todo2) -> (s{todo = todo2}, now >> worker pool)
79-
80- -- | Given a pool, and a function that breaks the S invariants, restore them.
81- -- They are only allowed to touch threadsLimit or todo.
82- -- Assumes only requires spawning a most one job (e.g. can't increase the pool by more than one at a time)
83- step :: Pool -> (S -> IO S ) -> IO ()
84- -- mask_ is so we don't spawn and not record it
85- step pool@ (Pool _ done) op = mask_ $ withPool_ pool $ \ s -> do
86- s <- op s
87- case Heap. uncons $ todo s of
88- Just (Heap. Entry _ now, todo2) | threadsCount s < threadsLimit s -> do
89- -- spawn a new worker
90- t <- newThreadFinally (now >> worker pool) $ \ t res -> case res of
91- Left e -> withPool_ pool $ \ s -> do
92- signalBarrier done $ Left e
93- pure (remThread t s){alive = False }
94- Right _ ->
95- step pool $ pure . remThread t
96- pure (addThread t s){todo = todo2}
97- Nothing | threadsCount s == 0 -> do
98- signalBarrier done $ Right s
99- pure s{alive = False }
100- _ -> pure s
101- where
102- addThread t s = s{threads = Set. insert t $ threads s, threadsCount = threadsCount s + 1
103- ,threadsSum = threadsSum s + 1 , threadsMax = threadsMax s `max` (threadsCount s + 1 )}
104- remThread t s = s{threads = Set. delete t $ threads s, threadsCount = threadsCount s - 1 }
105-
69+ threshold :: Float
70+ threshold = 0.05
10671
10772-- | Add a new task to the pool. See the top of the module for the relative ordering
10873-- and semantics.
10974addPool :: PoolPriority -> Pool -> IO a -> IO ()
110- addPool priority pool act = step pool $ \ s -> do
111- i <- rand s
112- pure s{todo = Heap. insert (Heap. Entry (priority, i) $ void act) $ todo s}
113-
75+ addPool priority pool@ (Pool _ done) act =
76+ withPool_ pool $ \ s -> do
77+ traceEventIO $ " Scheduling event with priority: " ++ show priority
78+ t <- newThreadFinally l mcap act $ \ t res -> do
79+ traceEventIO $ show l ++ " done."
80+ case res of
81+ Left e -> withPool_ pool $ \ s -> do
82+ signalBarrier done $ Left e
83+ pure (remThread t s){alive = False }
84+ Right _ -> withPool_ pool $ \ s -> do
85+ let s' = remThread t s
86+ when (threadsCount s' == 0 ) $
87+ signalBarrier done $ Right s'{alive = False }
88+ pure $ s'{alive = threadsCount s' /= 0 }
89+ pure (addThread t s)
90+ where
91+ addThread t s = s{threads = Set. insert t $ threads s, threadsCount = threadsCount s + 1
92+ ,threadsSum = threadsSum s + 1 , threadsMax = threadsMax s `max` (threadsCount s + 1 )}
93+ remThread t s = s{threads = Set. delete t $ threads s, threadsCount = threadsCount s - 1 }
94+ mcap = case priority of
95+ PoolEstimate t _ | t <= threshold -> Just 0
96+ _ -> Nothing
97+ l = case priority of
98+ PoolEstimate _ s -> s
99+ _ -> " Unknown"
114100
115101data PoolPriority
116102 = PoolException
117103 | PoolResume
118104 | PoolStart
119105 | PoolBatch
120106 | PoolDeprioritize Double
121- deriving (Eq ,Ord )
107+ | PoolEstimate { estimatedTime :: Float , label :: String }
108+ deriving (Eq ,Ord ,Show )
122109
123110-- | Temporarily increase the pool by 1 thread. Call the cleanup action to restore the value.
124111-- After calling cleanup you should requeue onto a new thread.
125112increasePool :: Pool -> IO (IO () )
126- increasePool pool = do
127- step pool $ \ s -> pure s{threadsLimit = threadsLimit s + 1 }
128- pure $ step pool $ \ s -> pure s{threadsLimit = threadsLimit s - 1 }
129-
113+ increasePool pool = pure (pure () )
130114
131115-- | Make sure the pool cannot run out of tasks (and thus everything finishes) until after the cancel is called.
132116-- Ensures that a pool that will requeue in time doesn't go idle.
@@ -139,7 +123,6 @@ keepAlivePool pool = do
139123 cancel
140124 pure $ signalBarrier bar ()
141125
142-
143126-- | Run all the tasks in the pool on the given number of works.
144127-- If any thread throws an exception, the exception will be reraised.
145128runPool :: Bool -> Int -> (Pool -> IO () ) -> IO () -- run all tasks in the pool
0 commit comments