summaryrefslogtreecommitdiff
path: root/compiler/GHC/Driver/MakeSem.hs
blob: 4e36a26c8667d2e938f8f6fba12b3b23a239974e (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
{-# LANGUAGE BlockArguments #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE TupleSections #-}
{-# LANGUAGE NumericUnderscores #-}

-- | Implementation of a jobserver using system semaphores.
--
--
module GHC.Driver.MakeSem
  ( -- * JSem: parallelism semaphore backed
    -- by a system semaphore (Posix/Windows)
    runJSemAbstractSem

  -- * System semaphores
  , Semaphore, SemaphoreName(..)

  -- * Abstract semaphores
  , AbstractSem(..)
  , withAbstractSem
  )
  where

import GHC.Prelude
import GHC.Conc
import GHC.Data.OrdList
import GHC.IO.Exception
import GHC.Utils.Outputable
import GHC.Utils.Panic
import GHC.Utils.Json

import System.Semaphore

import Control.Monad
import qualified Control.Monad.Catch as MC
import Control.Concurrent.MVar
import Control.Concurrent.STM
import Data.Foldable
import Data.Functor
import GHC.Stack
import Debug.Trace

---------------------------------------
-- Semaphore jobserver

-- | A jobserver based off a system 'Semaphore'.
--
-- Keeps track of the pending jobs and resources
-- available from the semaphore.
data Jobserver
  = Jobserver
  { jSemaphore :: !Semaphore
    -- ^ The semaphore which controls available resources
  , jobs :: !(TVar JobResources)
    -- ^ The currently pending jobs, and the resources
    -- obtained from the semaphore
  }

data JobserverOptions
  = JobserverOptions
  { releaseDebounce    :: !Int
     -- ^ Minimum delay, in milliseconds, between acquiring a token
     -- and releasing a token.
  , setNumCapsDebounce :: !Int
    -- ^ Minimum delay, in milliseconds, between two consecutive
    -- calls of 'setNumCapabilities'.
  }

defaultJobserverOptions :: JobserverOptions
defaultJobserverOptions =
  JobserverOptions
    { releaseDebounce    = 1000 -- 1 second
    , setNumCapsDebounce = 1000 -- 1 second
    }

-- | Resources available for running jobs, i.e.
-- tokens obtained from the parallelism semaphore.
data JobResources
  = Jobs
  { tokensOwned :: !Int
    -- ^ How many tokens have been claimed from the semaphore
  , tokensFree  :: !Int
    -- ^ How many tokens are not currently being used
  , jobsWaiting :: !(OrdList (TMVar ()))
    -- ^ Pending jobs waiting on a token, the job will be blocked on the TMVar so putting into
    -- the TMVar will allow the job to continue.
  }

instance Outputable JobResources where
  ppr Jobs{..}
    = text "JobResources" <+>
        ( braces $ hsep
          [ text "owned=" <> ppr tokensOwned
          , text "free=" <> ppr tokensFree
          , text "num_waiting=" <> ppr (length jobsWaiting)
          ] )

-- | Add one new token.
addToken :: JobResources -> JobResources
addToken jobs@( Jobs { tokensOwned = owned, tokensFree = free })
  = jobs { tokensOwned = owned + 1, tokensFree = free + 1 }

-- | Free one token.
addFreeToken :: JobResources -> JobResources
addFreeToken jobs@( Jobs { tokensFree = free })
  = assertPpr (tokensOwned jobs > free)
      (text "addFreeToken:" <+> ppr (tokensOwned jobs) <+> ppr free)
  $ jobs { tokensFree = free + 1 }

-- | Use up one token.
removeFreeToken :: JobResources -> JobResources
removeFreeToken jobs@( Jobs { tokensFree = free })
  = assertPpr (free > 0)
      (text "removeFreeToken:" <+> ppr free)
  $ jobs { tokensFree = free - 1 }

-- | Return one owned token.
removeOwnedToken :: JobResources -> JobResources
removeOwnedToken jobs@( Jobs { tokensOwned = owned })
  = assertPpr (owned > 1)
      (text "removeOwnedToken:" <+> ppr owned)
  $ jobs { tokensOwned = owned - 1 }

-- | Add one new job to the end of the list of pending jobs.
addJob :: TMVar () -> JobResources -> JobResources
addJob job jobs@( Jobs { jobsWaiting = wait })
  = jobs { jobsWaiting = wait `SnocOL` job }

-- | The state of the semaphore job server.
data JobserverState
  = JobserverState
    { jobserverAction  :: !JobserverAction
      -- ^ The current action being performed by the
      -- job server.
    , canChangeNumCaps :: !(TVar Bool)
      -- ^ A TVar that signals whether it has been long
      -- enough since we last changed 'numCapabilities'.
    , canReleaseToken  :: !(TVar Bool)
      -- ^ A TVar that signals whether we last acquired
      -- a token long enough ago that we can now release
      -- a token.
    }
data JobserverAction
  -- | The jobserver is idle: no thread is currently
  -- interacting with the semaphore.
  = Idle
  -- | A thread is waiting for a token on the semaphore.
  | Acquiring
    { activeWaitId   :: WaitId
    , threadFinished :: TMVar (Maybe MC.SomeException) }

-- | Retrieve the 'TMVar' that signals if the current thread has finished,
-- if any thread is currently active in the jobserver.
activeThread_maybe :: JobserverAction -> Maybe (TMVar (Maybe MC.SomeException))
activeThread_maybe Idle                                   = Nothing
activeThread_maybe (Acquiring { threadFinished = tmvar }) = Just tmvar

-- | Whether we should try to acquire a new token from the semaphore:
-- there is a pending job and no free tokens.
guardAcquire :: JobResources -> Bool
guardAcquire ( Jobs { tokensFree, jobsWaiting } )
  = tokensFree == 0 && not (null jobsWaiting)

-- | Whether we should release a token from the semaphore:
-- there are no pending jobs and we can release a token.
guardRelease :: JobResources -> Bool
guardRelease ( Jobs { tokensFree, tokensOwned, jobsWaiting } )
  = null jobsWaiting && tokensFree > 0 && tokensOwned > 1

---------------------------------------
-- Semaphore jobserver implementation

-- | Add one pending job to the jobserver.
--
-- Blocks, waiting on the jobserver to supply a free token.
acquireJob :: TVar JobResources -> IO ()
acquireJob jobs_tvar = do
  (job_tmvar, _jobs0) <- tracedAtomically "acquire" $
    modifyJobResources jobs_tvar \ jobs -> do
      job_tmvar <- newEmptyTMVar
      return ((job_tmvar, jobs), addJob job_tmvar jobs)
  atomically $ takeTMVar job_tmvar

-- | Signal to the job server that one job has completed,
-- releasing its corresponding token.
releaseJob :: TVar JobResources -> IO ()
releaseJob jobs_tvar = do
  tracedAtomically "release" do
    modifyJobResources jobs_tvar \ jobs -> do
      massertPpr (tokensFree jobs < tokensOwned jobs)
        (text "releaseJob: more free jobs than owned jobs!")
      return ((), addFreeToken jobs)


-- | Release all tokens owned from the semaphore (to clean up
-- the jobserver at the end).
cleanupJobserver :: Jobserver -> IO ()
cleanupJobserver (Jobserver { jSemaphore = sem
                            , jobs       = jobs_tvar })
  = do
    Jobs { tokensOwned = owned } <- readTVarIO jobs_tvar
    let toks_to_release = owned - 1
      -- Subtract off the implicit token: whoever spawned the ghc process
      -- in the first place is responsible for that token.
    releaseSemaphore sem toks_to_release

-- | Dispatch the available tokens acquired from the semaphore
-- to the pending jobs in the job server.
dispatchTokens :: JobResources -> STM JobResources
dispatchTokens jobs@( Jobs { tokensFree = toks_free, jobsWaiting = wait } )
  | toks_free > 0
  , next `ConsOL` rest <- wait
  -- There's a pending job and a free token:
  -- pass on the token to that job, and recur.
  = do
      putTMVar next ()
      let jobs' = jobs { tokensFree = toks_free - 1, jobsWaiting = rest }
      dispatchTokens jobs'
  | otherwise
  = return jobs

-- | Update the available resources used from a semaphore, dispatching
-- any newly acquired resources.
--
-- Invariant: if the number of available resources decreases, there
-- must be no pending jobs.
--
-- All modifications should go through this function to ensure the contents
-- of the 'TVar' remains in normal form.
modifyJobResources :: HasCallStack => TVar JobResources
                   -> (JobResources -> STM (a, JobResources))
                   -> STM (a, Maybe JobResources)
modifyJobResources jobs_tvar action = do
  old_jobs  <- readTVar jobs_tvar
  (a, jobs) <- action old_jobs

  -- Check the invariant: if the number of free tokens has decreased,
  -- there must be no pending jobs.
  massertPpr (null (jobsWaiting jobs) || tokensFree jobs >= tokensFree old_jobs) $
    vcat [ text "modiyJobResources: pending jobs but fewer free tokens" ]
  dispatched_jobs <- dispatchTokens jobs
  writeTVar jobs_tvar dispatched_jobs
  return (a, Just dispatched_jobs)


tracedAtomically_ :: String -> STM (Maybe JobResources) -> IO ()
tracedAtomically_ s act = tracedAtomically s (((),) <$> act)

tracedAtomically :: String -> STM (a, Maybe JobResources) -> IO a
tracedAtomically origin act = do
  (a, mjr) <- atomically act
  forM_ mjr $ \ jr -> do
    -- Use the "jsem:" prefix to identify where the write traces are
    traceEventIO ("jsem:" ++ renderJobResources origin jr)
  return a

renderJobResources :: String -> JobResources -> String
renderJobResources origin (Jobs own free pending) = showSDocUnsafe $ renderJSON $
  JSObject [ ("name", JSString origin)
           , ("owned", JSInt own)
           , ("free", JSInt free)
           , ("pending", JSInt (length pending) )
           ]


-- | Spawn a new thread that waits on the semaphore in order to acquire
-- an additional token.
acquireThread :: Jobserver -> IO JobserverAction
acquireThread (Jobserver { jSemaphore = sem, jobs = jobs_tvar }) = do
    threadFinished_tmvar <- newEmptyTMVarIO
    let
      wait_result_action :: Either MC.SomeException Bool -> IO ()
      wait_result_action wait_res =
        tracedAtomically_ "acquire_thread" do
          (r, jb) <- case wait_res of
            Left (e :: MC.SomeException) -> do
              return $ (Just e, Nothing)
            Right success -> do
              if success
                then do
                  modifyJobResources jobs_tvar \ jobs ->
                    return (Nothing, addToken jobs)
                else
                  return (Nothing, Nothing)
          putTMVar threadFinished_tmvar r
          return jb
    wait_id <- forkWaitOnSemaphoreInterruptible sem wait_result_action
    labelThread (waitingThreadId wait_id) "acquire_thread"
    return $ Acquiring { activeWaitId   = wait_id
                       , threadFinished = threadFinished_tmvar }

-- | Spawn a thread to release ownership of one resource from the semaphore,
-- provided we have spare resources and no pending jobs.
releaseThread :: Jobserver -> IO JobserverAction
releaseThread (Jobserver { jSemaphore = sem, jobs = jobs_tvar }) = do
  threadFinished_tmvar <- newEmptyTMVarIO
  MC.mask_ do
    -- Pre-release the resource so that another thread doesn't take control of it
    -- just as we release the lock on the semaphore.
    still_ok_to_release
      <- tracedAtomically "pre_release" $
         modifyJobResources jobs_tvar \ jobs ->
           if guardRelease jobs
               -- TODO: should this also debounce?
           then return (True , removeOwnedToken $ removeFreeToken jobs)
           else return (False, jobs)
    if not still_ok_to_release
    then return Idle
    else do
      tid <- forkIO $ do
        x <- MC.try $ releaseSemaphore sem 1
        tracedAtomically_ "post-release" $ do
          (r, jobs) <- case x of
            Left (e :: MC.SomeException) -> do
              modifyJobResources jobs_tvar \ jobs ->
                return (Just e, addToken jobs)
            Right _ -> do
              return (Nothing, Nothing)
          putTMVar threadFinished_tmvar r
          return jobs
      labelThread tid "release_thread"
      return Idle

-- | When there are pending jobs but no free tokens,
-- spawn a thread to acquire a new token from the semaphore.
--
-- See 'acquireThread'.
tryAcquire :: JobserverOptions
           -> Jobserver
           -> JobserverState
           -> STM (IO JobserverState)
tryAcquire opts js@( Jobserver { jobs = jobs_tvar })
  st@( JobserverState { jobserverAction = Idle } )
  = do
    jobs <- readTVar jobs_tvar
    guard $ guardAcquire jobs
    return do
      action           <- acquireThread js
      -- Set a debounce after acquiring a token.
      can_release_tvar <- registerDelay $ (releaseDebounce opts * 1000)
      return $ st { jobserverAction = action
                  , canReleaseToken = can_release_tvar }
tryAcquire _ _ _ = retry

-- | When there are free tokens and no pending jobs,
-- spawn a thread to release a token from the semamphore.
--
-- See 'releaseThread'.
tryRelease :: Jobserver
           -> JobserverState
           -> STM (IO JobserverState)
tryRelease sjs@( Jobserver { jobs = jobs_tvar } )
  st@( JobserverState
      { jobserverAction = Idle
      , canReleaseToken = can_release_tvar } )
  = do
    jobs <- readTVar jobs_tvar
    guard  $ guardRelease jobs
    can_release <- readTVar can_release_tvar
    guard can_release
    return do
      action <- releaseThread sjs
      return $ st { jobserverAction = action }
tryRelease _ _ = retry

-- | Wait for an active thread to finish. Once it finishes:
--
--  - set the 'JobserverAction' to 'Idle',
--  - update the number of capabilities to reflect the number
--    of owned tokens from the semaphore.
tryNoticeIdle :: JobserverOptions
              -> TVar JobResources
              -> JobserverState
              -> STM (IO JobserverState)
tryNoticeIdle opts jobs_tvar jobserver_state
  | Just threadFinished_tmvar <- activeThread_maybe $ jobserverAction jobserver_state
  = sync_num_caps (canChangeNumCaps jobserver_state) threadFinished_tmvar
  | otherwise
  = retry -- no active thread: wait until jobserver isn't idle
  where
    sync_num_caps :: TVar Bool
                  -> TMVar (Maybe MC.SomeException)
                  -> STM (IO JobserverState)
    sync_num_caps can_change_numcaps_tvar threadFinished_tmvar = do
      mb_ex <- takeTMVar threadFinished_tmvar
      for_ mb_ex MC.throwM
      Jobs { tokensOwned } <- readTVar jobs_tvar
      can_change_numcaps <- readTVar can_change_numcaps_tvar
      guard can_change_numcaps
      return do
        x <- getNumCapabilities
        can_change_numcaps_tvar_2 <-
          if x == tokensOwned
          then return can_change_numcaps_tvar
          else do
            setNumCapabilities tokensOwned
            registerDelay $ (setNumCapsDebounce opts * 1000)
        return $
          jobserver_state
            { jobserverAction  = Idle
            , canChangeNumCaps = can_change_numcaps_tvar_2 }

-- | Try to stop the current thread which is acquiring/releasing resources
-- if that operation is no longer relevant.
tryStopThread :: TVar JobResources
              -> JobserverState
              -> STM (IO JobserverState)
tryStopThread jobs_tvar jsj = do
  case jobserverAction jsj of
    Acquiring { activeWaitId = wait_id } -> do
     jobs <- readTVar jobs_tvar
     guard $ null (jobsWaiting jobs)
     return do
       interruptWaitOnSemaphore wait_id
       return $ jsj { jobserverAction = Idle }
    _ -> retry

-- | Main jobserver loop: acquire/release resources as
-- needed for the pending jobs and available semaphore tokens.
jobserverLoop :: JobserverOptions -> Jobserver -> IO ()
jobserverLoop opts sjs@(Jobserver { jobs = jobs_tvar })
  = do
      true_tvar <- newTVarIO True
      let init_state :: JobserverState
          init_state =
            JobserverState
              { jobserverAction  = Idle
              , canChangeNumCaps = true_tvar
              , canReleaseToken  = true_tvar }
      loop init_state
  where
    loop s = do
      action <- atomically $ asum $ (\x -> x s) <$>
        [ tryRelease    sjs
        , tryAcquire    opts sjs
        , tryNoticeIdle opts jobs_tvar
        , tryStopThread jobs_tvar
        ]
      s <- action
      loop s

-- | Create a new jobserver using the given semaphore handle.
makeJobserver :: SemaphoreName -> IO (AbstractSem, IO ())
makeJobserver sem_name = do
  semaphore <- openSemaphore sem_name
  let
    init_jobs =
      Jobs { tokensOwned = 1
           , tokensFree  = 1
           , jobsWaiting = NilOL
           }
  jobs_tvar <- newTVarIO init_jobs
  let
    opts = defaultJobserverOptions -- TODO: allow this to be configured
    sjs = Jobserver { jSemaphore = semaphore
                    , jobs       = jobs_tvar }
  loop_finished_mvar <- newEmptyMVar
  loop_tid <- forkIOWithUnmask \ unmask -> do
    r <- try $ unmask $ jobserverLoop opts sjs
    putMVar loop_finished_mvar $
      case r of
        Left e
          | Just ThreadKilled <- fromException e
          -> Nothing
          | otherwise
          -> Just e
        Right () -> Nothing
  labelThread loop_tid "job_server"
  let
    acquireSem = acquireJob jobs_tvar
    releaseSem = releaseJob jobs_tvar
    cleanupSem = do
      -- this is interruptible
      cleanupJobserver sjs
      killThread loop_tid
      mb_ex <- takeMVar loop_finished_mvar
      for_ mb_ex MC.throwM

  return (AbstractSem{..}, cleanupSem)

-- | Implement an abstract semaphore using a semaphore 'Jobserver'
-- which queries the system semaphore of the given name for resources.
runJSemAbstractSem :: SemaphoreName         -- ^ the system semaphore to use
                   -> (AbstractSem -> IO a) -- ^ the operation to run
                                            -- which requires a semaphore
                   -> IO a
runJSemAbstractSem sem action = MC.mask \ unmask -> do
  (abs, cleanup) <- makeJobserver sem
  r <- try $ unmask $ action abs
  case r of
    Left (e1 :: MC.SomeException) -> do
      (_ :: Either MC.SomeException ()) <- MC.try cleanup
      MC.throwM e1
    Right x -> cleanup $> x

{- Note [Architecture of the Job Server]
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
In `-jsem` mode, the amount of parallelism that GHC can use is controlled by a
system semaphore. We take resources from the semaphore when we need them, and
give them back if we don't have enough to do.

A naive implementation would just take and release the semaphore around performing
the action, but this leads to two issues:

* When taking a token in the semaphore, we must call `setNumCapabilities` in order
  to adjust how many capabilities are available for parallel garbage collection.
  This causes unnecessary synchronisations.
* We want to implement a debounce, so that whilst there is pending work in the
  current process we prefer to keep hold of resources from the semaphore.
  This reduces overall memory usage, as there are fewer live GHC processes at once.

Therefore, the obtention of semaphore resources is separated away from the
request for the resource in the driver.

A token from the semaphore is requested using `acquireJob`. This creates a pending
job, which is a MVar that can be filled in to signal that the requested token is ready.

When the job is finished, the token is released by calling `releaseJob`, which just
increases the number of `free` jobs. If there are more pending jobs when the free count
is increased, the token is immediately reused (see `modifyJobResources`).

The `jobServerLoop` interacts with the system semaphore: when there are pending
jobs, `acquireThread` blocks, waiting for a token from the semaphore. Once a
token is obtained, it increases the owned count.

When GHC has free tokens (tokens from the semaphore that it is not using),
no pending jobs, and the debounce has expired, then `releaseThread` will
release tokens back to the global semaphore.

`tryStopThread` attempts to kill threads which are waiting to acquire a resource
when we no longer need it. For example, consider that we attempt to acquire two
tokens, but the first job finishes before we acquire the second token.
This second token is no longer needed, so we should cancel the wait
(as it would not be used to do any work, and not be returned until the debounce).
We only need to kill `acquireJob`, because `releaseJob` never blocks.

Note [Eventlog Messages for jsem]
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
It can be tricky to verify that the work is shared adequately across different
processes. To help debug this, we output the values of `JobResource` to the
eventlog whenever the global state changes. There are some scripts which can be used
to analyse this output and report statistics about core saturation in the
GitHub repo (https://github.com/mpickering/ghc-jsem-analyse).

-}