1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
|
{-# LANGUAGE BlockArguments #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE TupleSections #-}
{-# LANGUAGE NumericUnderscores #-}
-- | Implementation of a jobserver using system semaphores.
--
--
module GHC.Driver.MakeSem
( -- * JSem: parallelism semaphore backed
-- by a system semaphore (Posix/Windows)
runJSemAbstractSem
-- * System semaphores
, Semaphore, SemaphoreName(..)
-- * Abstract semaphores
, AbstractSem(..)
, withAbstractSem
)
where
import GHC.Prelude
import GHC.Conc
import GHC.Data.OrdList
import GHC.IO.Exception
import GHC.Utils.Outputable
import GHC.Utils.Panic
import GHC.Utils.Json
import System.Semaphore
import Control.Monad
import qualified Control.Monad.Catch as MC
import Control.Concurrent.MVar
import Control.Concurrent.STM
import Data.Foldable
import Data.Functor
import GHC.Stack
import Debug.Trace
---------------------------------------
-- Semaphore jobserver
-- | A jobserver based off a system 'Semaphore'.
--
-- Keeps track of the pending jobs and resources
-- available from the semaphore.
data Jobserver
= Jobserver
{ jSemaphore :: !Semaphore
-- ^ The semaphore which controls available resources
, jobs :: !(TVar JobResources)
-- ^ The currently pending jobs, and the resources
-- obtained from the semaphore
}
data JobserverOptions
= JobserverOptions
{ releaseDebounce :: !Int
-- ^ Minimum delay, in milliseconds, between acquiring a token
-- and releasing a token.
, setNumCapsDebounce :: !Int
-- ^ Minimum delay, in milliseconds, between two consecutive
-- calls of 'setNumCapabilities'.
}
defaultJobserverOptions :: JobserverOptions
defaultJobserverOptions =
JobserverOptions
{ releaseDebounce = 1000 -- 1 second
, setNumCapsDebounce = 1000 -- 1 second
}
-- | Resources available for running jobs, i.e.
-- tokens obtained from the parallelism semaphore.
data JobResources
= Jobs
{ tokensOwned :: !Int
-- ^ How many tokens have been claimed from the semaphore
, tokensFree :: !Int
-- ^ How many tokens are not currently being used
, jobsWaiting :: !(OrdList (TMVar ()))
-- ^ Pending jobs waiting on a token, the job will be blocked on the TMVar so putting into
-- the TMVar will allow the job to continue.
}
instance Outputable JobResources where
ppr Jobs{..}
= text "JobResources" <+>
( braces $ hsep
[ text "owned=" <> ppr tokensOwned
, text "free=" <> ppr tokensFree
, text "num_waiting=" <> ppr (length jobsWaiting)
] )
-- | Add one new token.
addToken :: JobResources -> JobResources
addToken jobs@( Jobs { tokensOwned = owned, tokensFree = free })
= jobs { tokensOwned = owned + 1, tokensFree = free + 1 }
-- | Free one token.
addFreeToken :: JobResources -> JobResources
addFreeToken jobs@( Jobs { tokensFree = free })
= assertPpr (tokensOwned jobs > free)
(text "addFreeToken:" <+> ppr (tokensOwned jobs) <+> ppr free)
$ jobs { tokensFree = free + 1 }
-- | Use up one token.
removeFreeToken :: JobResources -> JobResources
removeFreeToken jobs@( Jobs { tokensFree = free })
= assertPpr (free > 0)
(text "removeFreeToken:" <+> ppr free)
$ jobs { tokensFree = free - 1 }
-- | Return one owned token.
removeOwnedToken :: JobResources -> JobResources
removeOwnedToken jobs@( Jobs { tokensOwned = owned })
= assertPpr (owned > 1)
(text "removeOwnedToken:" <+> ppr owned)
$ jobs { tokensOwned = owned - 1 }
-- | Add one new job to the end of the list of pending jobs.
addJob :: TMVar () -> JobResources -> JobResources
addJob job jobs@( Jobs { jobsWaiting = wait })
= jobs { jobsWaiting = wait `SnocOL` job }
-- | The state of the semaphore job server.
data JobserverState
= JobserverState
{ jobserverAction :: !JobserverAction
-- ^ The current action being performed by the
-- job server.
, canChangeNumCaps :: !(TVar Bool)
-- ^ A TVar that signals whether it has been long
-- enough since we last changed 'numCapabilities'.
, canReleaseToken :: !(TVar Bool)
-- ^ A TVar that signals whether we last acquired
-- a token long enough ago that we can now release
-- a token.
}
data JobserverAction
-- | The jobserver is idle: no thread is currently
-- interacting with the semaphore.
= Idle
-- | A thread is waiting for a token on the semaphore.
| Acquiring
{ activeWaitId :: WaitId
, threadFinished :: TMVar (Maybe MC.SomeException) }
-- | Retrieve the 'TMVar' that signals if the current thread has finished,
-- if any thread is currently active in the jobserver.
activeThread_maybe :: JobserverAction -> Maybe (TMVar (Maybe MC.SomeException))
activeThread_maybe Idle = Nothing
activeThread_maybe (Acquiring { threadFinished = tmvar }) = Just tmvar
-- | Whether we should try to acquire a new token from the semaphore:
-- there is a pending job and no free tokens.
guardAcquire :: JobResources -> Bool
guardAcquire ( Jobs { tokensFree, jobsWaiting } )
= tokensFree == 0 && not (null jobsWaiting)
-- | Whether we should release a token from the semaphore:
-- there are no pending jobs and we can release a token.
guardRelease :: JobResources -> Bool
guardRelease ( Jobs { tokensFree, tokensOwned, jobsWaiting } )
= null jobsWaiting && tokensFree > 0 && tokensOwned > 1
---------------------------------------
-- Semaphore jobserver implementation
-- | Add one pending job to the jobserver.
--
-- Blocks, waiting on the jobserver to supply a free token.
acquireJob :: TVar JobResources -> IO ()
acquireJob jobs_tvar = do
(job_tmvar, _jobs0) <- tracedAtomically "acquire" $
modifyJobResources jobs_tvar \ jobs -> do
job_tmvar <- newEmptyTMVar
return ((job_tmvar, jobs), addJob job_tmvar jobs)
atomically $ takeTMVar job_tmvar
-- | Signal to the job server that one job has completed,
-- releasing its corresponding token.
releaseJob :: TVar JobResources -> IO ()
releaseJob jobs_tvar = do
tracedAtomically "release" do
modifyJobResources jobs_tvar \ jobs -> do
massertPpr (tokensFree jobs < tokensOwned jobs)
(text "releaseJob: more free jobs than owned jobs!")
return ((), addFreeToken jobs)
-- | Release all tokens owned from the semaphore (to clean up
-- the jobserver at the end).
cleanupJobserver :: Jobserver -> IO ()
cleanupJobserver (Jobserver { jSemaphore = sem
, jobs = jobs_tvar })
= do
Jobs { tokensOwned = owned } <- readTVarIO jobs_tvar
let toks_to_release = owned - 1
-- Subtract off the implicit token: whoever spawned the ghc process
-- in the first place is responsible for that token.
releaseSemaphore sem toks_to_release
-- | Dispatch the available tokens acquired from the semaphore
-- to the pending jobs in the job server.
dispatchTokens :: JobResources -> STM JobResources
dispatchTokens jobs@( Jobs { tokensFree = toks_free, jobsWaiting = wait } )
| toks_free > 0
, next `ConsOL` rest <- wait
-- There's a pending job and a free token:
-- pass on the token to that job, and recur.
= do
putTMVar next ()
let jobs' = jobs { tokensFree = toks_free - 1, jobsWaiting = rest }
dispatchTokens jobs'
| otherwise
= return jobs
-- | Update the available resources used from a semaphore, dispatching
-- any newly acquired resources.
--
-- Invariant: if the number of available resources decreases, there
-- must be no pending jobs.
--
-- All modifications should go through this function to ensure the contents
-- of the 'TVar' remains in normal form.
modifyJobResources :: HasCallStack => TVar JobResources
-> (JobResources -> STM (a, JobResources))
-> STM (a, Maybe JobResources)
modifyJobResources jobs_tvar action = do
old_jobs <- readTVar jobs_tvar
(a, jobs) <- action old_jobs
-- Check the invariant: if the number of free tokens has decreased,
-- there must be no pending jobs.
massertPpr (null (jobsWaiting jobs) || tokensFree jobs >= tokensFree old_jobs) $
vcat [ text "modiyJobResources: pending jobs but fewer free tokens" ]
dispatched_jobs <- dispatchTokens jobs
writeTVar jobs_tvar dispatched_jobs
return (a, Just dispatched_jobs)
tracedAtomically_ :: String -> STM (Maybe JobResources) -> IO ()
tracedAtomically_ s act = tracedAtomically s (((),) <$> act)
tracedAtomically :: String -> STM (a, Maybe JobResources) -> IO a
tracedAtomically origin act = do
(a, mjr) <- atomically act
forM_ mjr $ \ jr -> do
-- Use the "jsem:" prefix to identify where the write traces are
traceEventIO ("jsem:" ++ renderJobResources origin jr)
return a
renderJobResources :: String -> JobResources -> String
renderJobResources origin (Jobs own free pending) = showSDocUnsafe $ renderJSON $
JSObject [ ("name", JSString origin)
, ("owned", JSInt own)
, ("free", JSInt free)
, ("pending", JSInt (length pending) )
]
-- | Spawn a new thread that waits on the semaphore in order to acquire
-- an additional token.
acquireThread :: Jobserver -> IO JobserverAction
acquireThread (Jobserver { jSemaphore = sem, jobs = jobs_tvar }) = do
threadFinished_tmvar <- newEmptyTMVarIO
let
wait_result_action :: Either MC.SomeException Bool -> IO ()
wait_result_action wait_res =
tracedAtomically_ "acquire_thread" do
(r, jb) <- case wait_res of
Left (e :: MC.SomeException) -> do
return $ (Just e, Nothing)
Right success -> do
if success
then do
modifyJobResources jobs_tvar \ jobs ->
return (Nothing, addToken jobs)
else
return (Nothing, Nothing)
putTMVar threadFinished_tmvar r
return jb
wait_id <- forkWaitOnSemaphoreInterruptible sem wait_result_action
labelThread (waitingThreadId wait_id) "acquire_thread"
return $ Acquiring { activeWaitId = wait_id
, threadFinished = threadFinished_tmvar }
-- | Spawn a thread to release ownership of one resource from the semaphore,
-- provided we have spare resources and no pending jobs.
releaseThread :: Jobserver -> IO JobserverAction
releaseThread (Jobserver { jSemaphore = sem, jobs = jobs_tvar }) = do
threadFinished_tmvar <- newEmptyTMVarIO
MC.mask_ do
-- Pre-release the resource so that another thread doesn't take control of it
-- just as we release the lock on the semaphore.
still_ok_to_release
<- tracedAtomically "pre_release" $
modifyJobResources jobs_tvar \ jobs ->
if guardRelease jobs
-- TODO: should this also debounce?
then return (True , removeOwnedToken $ removeFreeToken jobs)
else return (False, jobs)
if not still_ok_to_release
then return Idle
else do
tid <- forkIO $ do
x <- MC.try $ releaseSemaphore sem 1
tracedAtomically_ "post-release" $ do
(r, jobs) <- case x of
Left (e :: MC.SomeException) -> do
modifyJobResources jobs_tvar \ jobs ->
return (Just e, addToken jobs)
Right _ -> do
return (Nothing, Nothing)
putTMVar threadFinished_tmvar r
return jobs
labelThread tid "release_thread"
return Idle
-- | When there are pending jobs but no free tokens,
-- spawn a thread to acquire a new token from the semaphore.
--
-- See 'acquireThread'.
tryAcquire :: JobserverOptions
-> Jobserver
-> JobserverState
-> STM (IO JobserverState)
tryAcquire opts js@( Jobserver { jobs = jobs_tvar })
st@( JobserverState { jobserverAction = Idle } )
= do
jobs <- readTVar jobs_tvar
guard $ guardAcquire jobs
return do
action <- acquireThread js
-- Set a debounce after acquiring a token.
can_release_tvar <- registerDelay $ (releaseDebounce opts * 1000)
return $ st { jobserverAction = action
, canReleaseToken = can_release_tvar }
tryAcquire _ _ _ = retry
-- | When there are free tokens and no pending jobs,
-- spawn a thread to release a token from the semamphore.
--
-- See 'releaseThread'.
tryRelease :: Jobserver
-> JobserverState
-> STM (IO JobserverState)
tryRelease sjs@( Jobserver { jobs = jobs_tvar } )
st@( JobserverState
{ jobserverAction = Idle
, canReleaseToken = can_release_tvar } )
= do
jobs <- readTVar jobs_tvar
guard $ guardRelease jobs
can_release <- readTVar can_release_tvar
guard can_release
return do
action <- releaseThread sjs
return $ st { jobserverAction = action }
tryRelease _ _ = retry
-- | Wait for an active thread to finish. Once it finishes:
--
-- - set the 'JobserverAction' to 'Idle',
-- - update the number of capabilities to reflect the number
-- of owned tokens from the semaphore.
tryNoticeIdle :: JobserverOptions
-> TVar JobResources
-> JobserverState
-> STM (IO JobserverState)
tryNoticeIdle opts jobs_tvar jobserver_state
| Just threadFinished_tmvar <- activeThread_maybe $ jobserverAction jobserver_state
= sync_num_caps (canChangeNumCaps jobserver_state) threadFinished_tmvar
| otherwise
= retry -- no active thread: wait until jobserver isn't idle
where
sync_num_caps :: TVar Bool
-> TMVar (Maybe MC.SomeException)
-> STM (IO JobserverState)
sync_num_caps can_change_numcaps_tvar threadFinished_tmvar = do
mb_ex <- takeTMVar threadFinished_tmvar
for_ mb_ex MC.throwM
Jobs { tokensOwned } <- readTVar jobs_tvar
can_change_numcaps <- readTVar can_change_numcaps_tvar
guard can_change_numcaps
return do
x <- getNumCapabilities
can_change_numcaps_tvar_2 <-
if x == tokensOwned
then return can_change_numcaps_tvar
else do
setNumCapabilities tokensOwned
registerDelay $ (setNumCapsDebounce opts * 1000)
return $
jobserver_state
{ jobserverAction = Idle
, canChangeNumCaps = can_change_numcaps_tvar_2 }
-- | Try to stop the current thread which is acquiring/releasing resources
-- if that operation is no longer relevant.
tryStopThread :: TVar JobResources
-> JobserverState
-> STM (IO JobserverState)
tryStopThread jobs_tvar jsj = do
case jobserverAction jsj of
Acquiring { activeWaitId = wait_id } -> do
jobs <- readTVar jobs_tvar
guard $ null (jobsWaiting jobs)
return do
interruptWaitOnSemaphore wait_id
return $ jsj { jobserverAction = Idle }
_ -> retry
-- | Main jobserver loop: acquire/release resources as
-- needed for the pending jobs and available semaphore tokens.
jobserverLoop :: JobserverOptions -> Jobserver -> IO ()
jobserverLoop opts sjs@(Jobserver { jobs = jobs_tvar })
= do
true_tvar <- newTVarIO True
let init_state :: JobserverState
init_state =
JobserverState
{ jobserverAction = Idle
, canChangeNumCaps = true_tvar
, canReleaseToken = true_tvar }
loop init_state
where
loop s = do
action <- atomically $ asum $ (\x -> x s) <$>
[ tryRelease sjs
, tryAcquire opts sjs
, tryNoticeIdle opts jobs_tvar
, tryStopThread jobs_tvar
]
s <- action
loop s
-- | Create a new jobserver using the given semaphore handle.
makeJobserver :: SemaphoreName -> IO (AbstractSem, IO ())
makeJobserver sem_name = do
semaphore <- openSemaphore sem_name
let
init_jobs =
Jobs { tokensOwned = 1
, tokensFree = 1
, jobsWaiting = NilOL
}
jobs_tvar <- newTVarIO init_jobs
let
opts = defaultJobserverOptions -- TODO: allow this to be configured
sjs = Jobserver { jSemaphore = semaphore
, jobs = jobs_tvar }
loop_finished_mvar <- newEmptyMVar
loop_tid <- forkIOWithUnmask \ unmask -> do
r <- try $ unmask $ jobserverLoop opts sjs
putMVar loop_finished_mvar $
case r of
Left e
| Just ThreadKilled <- fromException e
-> Nothing
| otherwise
-> Just e
Right () -> Nothing
labelThread loop_tid "job_server"
let
acquireSem = acquireJob jobs_tvar
releaseSem = releaseJob jobs_tvar
cleanupSem = do
-- this is interruptible
cleanupJobserver sjs
killThread loop_tid
mb_ex <- takeMVar loop_finished_mvar
for_ mb_ex MC.throwM
return (AbstractSem{..}, cleanupSem)
-- | Implement an abstract semaphore using a semaphore 'Jobserver'
-- which queries the system semaphore of the given name for resources.
runJSemAbstractSem :: SemaphoreName -- ^ the system semaphore to use
-> (AbstractSem -> IO a) -- ^ the operation to run
-- which requires a semaphore
-> IO a
runJSemAbstractSem sem action = MC.mask \ unmask -> do
(abs, cleanup) <- makeJobserver sem
r <- try $ unmask $ action abs
case r of
Left (e1 :: MC.SomeException) -> do
(_ :: Either MC.SomeException ()) <- MC.try cleanup
MC.throwM e1
Right x -> cleanup $> x
{- Note [Architecture of the Job Server]
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
In `-jsem` mode, the amount of parallelism that GHC can use is controlled by a
system semaphore. We take resources from the semaphore when we need them, and
give them back if we don't have enough to do.
A naive implementation would just take and release the semaphore around performing
the action, but this leads to two issues:
* When taking a token in the semaphore, we must call `setNumCapabilities` in order
to adjust how many capabilities are available for parallel garbage collection.
This causes unnecessary synchronisations.
* We want to implement a debounce, so that whilst there is pending work in the
current process we prefer to keep hold of resources from the semaphore.
This reduces overall memory usage, as there are fewer live GHC processes at once.
Therefore, the obtention of semaphore resources is separated away from the
request for the resource in the driver.
A token from the semaphore is requested using `acquireJob`. This creates a pending
job, which is a MVar that can be filled in to signal that the requested token is ready.
When the job is finished, the token is released by calling `releaseJob`, which just
increases the number of `free` jobs. If there are more pending jobs when the free count
is increased, the token is immediately reused (see `modifyJobResources`).
The `jobServerLoop` interacts with the system semaphore: when there are pending
jobs, `acquireThread` blocks, waiting for a token from the semaphore. Once a
token is obtained, it increases the owned count.
When GHC has free tokens (tokens from the semaphore that it is not using),
no pending jobs, and the debounce has expired, then `releaseThread` will
release tokens back to the global semaphore.
`tryStopThread` attempts to kill threads which are waiting to acquire a resource
when we no longer need it. For example, consider that we attempt to acquire two
tokens, but the first job finishes before we acquire the second token.
This second token is no longer needed, so we should cancel the wait
(as it would not be used to do any work, and not be returned until the debounce).
We only need to kill `acquireJob`, because `releaseJob` never blocks.
Note [Eventlog Messages for jsem]
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
It can be tricky to verify that the work is shared adequately across different
processes. To help debug this, we output the values of `JobResource` to the
eventlog whenever the global state changes. There are some scripts which can be used
to analyse this output and report statistics about core saturation in the
GitHub repo (https://github.com/mpickering/ghc-jsem-analyse).
-}
|