diff options
Diffstat (limited to 'libraries/base/GHC/Event/Thread.hs')
| -rw-r--r-- | libraries/base/GHC/Event/Thread.hs | 94 | 
1 files changed, 75 insertions, 19 deletions
| diff --git a/libraries/base/GHC/Event/Thread.hs b/libraries/base/GHC/Event/Thread.hs index a330225622..cf9a769766 100644 --- a/libraries/base/GHC/Event/Thread.hs +++ b/libraries/base/GHC/Event/Thread.hs @@ -18,7 +18,7 @@ module GHC.Event.Thread  -- TODO: Use new Windows I/O manager  import Control.Exception (finally, SomeException, toException)  import Data.Foldable (forM_, mapM_, sequence_) -import Data.IORef (IORef, newIORef, readIORef, writeIORef) +import Data.IORef (IORef, newIORef, readIORef, writeIORef, atomicWriteIORef)  import Data.Maybe (fromMaybe)  import Data.Tuple (snd)  import Foreign.C.Error (eBADF, errnoToIOError) @@ -29,7 +29,8 @@ import GHC.List (zipWith, zipWith3)  import GHC.Conc.Sync (TVar, ThreadId, ThreadStatus(..), atomically, forkIO,                        labelThread, modifyMVar_, withMVar, newTVar, sharedCAF,                        getNumCapabilities, threadCapability, myThreadId, forkOn, -                      threadStatus, writeTVar, newTVarIO, readTVar, retry,throwSTM,STM) +                      threadStatus, writeTVar, newTVarIO, readTVar, retry, +                      throwSTM, STM, yield)  import GHC.IO (mask_, uninterruptibleMask_, onException)  import GHC.IO.Exception (ioError)  import GHC.IOArray (IOArray, newIOArray, readIOArray, writeIOArray, @@ -41,6 +42,7 @@ import GHC.Event.Manager (Event, EventManager, evtRead, evtWrite, loop,                               new, registerFd, unregisterFd_)  import qualified GHC.Event.Manager as M  import qualified GHC.Event.TimerManager as TM +import GHC.Ix (inRange)  import GHC.Num ((-), (+))  import GHC.Real (fromIntegral)  import GHC.Show (showSignedInt) @@ -98,22 +100,44 @@ threadWaitWrite = threadWait evtWrite  closeFdWith :: (Fd -> IO ())        -- ^ Action that performs the close.              -> Fd                   -- ^ File descriptor to close.              -> IO () -closeFdWith close fd = do -  eventManagerArray <- readIORef eventManager -  let (low, high) = boundsIOArray eventManagerArray -  mgrs <- flip mapM [low..high] $ \i -> do -    Just (_,!mgr) <- readIOArray eventManagerArray i -    return mgr -  -- 'takeMVar', and 'M.closeFd_' might block, although for a very short time. -  -- To make 'closeFdWith' safe in presence of asynchronous exceptions we have -  -- to use uninterruptible mask. -  uninterruptibleMask_ $ do -    tables <- flip mapM mgrs $ \mgr -> takeMVar $ M.callbackTableVar mgr fd -    cbApps <- zipWithM (\mgr table -> M.closeFd_ mgr table fd) mgrs tables -    close fd `finally` sequence_ (zipWith3 finish mgrs tables cbApps) +closeFdWith close fd = close_loop    where      finish mgr table cbApp = putMVar (M.callbackTableVar mgr fd) table >> cbApp      zipWithM f xs ys = sequence (zipWith f xs ys) +      -- The array inside 'eventManager' can be swapped out at any time, see +      -- 'ioManagerCapabilitiesChanged'. See #21651. We detect this case by +      -- checking the array bounds before and after. When such a swap has +      -- happened we cleanup and try again +    close_loop = do +      eventManagerArray <- readIORef eventManager +      let ema_bounds@(low, high) = boundsIOArray eventManagerArray +      mgrs <- flip mapM [low..high] $ \i -> do +        Just (_,!mgr) <- readIOArray eventManagerArray i +        return mgr + +      -- 'takeMVar', and 'M.closeFd_' might block, although for a very short time. +      -- To make 'closeFdWith' safe in presence of asynchronous exceptions we have +      -- to use uninterruptible mask. +      join $ uninterruptibleMask_ $ do +        tables <- flip mapM mgrs $ \mgr -> takeMVar $ M.callbackTableVar mgr fd +        new_ema_bounds <- boundsIOArray `fmap` readIORef eventManager +        -- Here we exploit Note [The eventManager Array] +        if new_ema_bounds /= ema_bounds +          then do +            -- the array has been modified. +            -- mgrs still holds the right EventManagers, by the Note. +            -- new_ema_bounds must be larger than ema_bounds, by the note. +            -- return the MVars we took and try again +            sequence_ $ zipWith (\mgr table -> finish mgr table (pure ())) mgrs tables +            pure close_loop +          else do +            -- We surely have taken all the appropriate MVars. Even if the array +            -- has been swapped, our mgrs is still correct. +            -- Remove the Fd from all callback tables, close the Fd, and run all +            -- callbacks. +            cbApps <- zipWithM (\mgr table -> M.closeFd_ mgr table fd) mgrs tables +            close fd `finally` sequence_ (zipWith3 finish mgrs tables cbApps) +            pure (pure ())  threadWait :: Event -> Fd -> IO ()  threadWait evt fd = mask_ $ do @@ -177,10 +201,24 @@ threadWaitWriteSTM = threadWaitSTM evtWrite  getSystemEventManager :: IO (Maybe EventManager)  getSystemEventManager = do    t <- myThreadId -  (cap, _) <- threadCapability t    eventManagerArray <- readIORef eventManager -  mmgr <- readIOArray eventManagerArray cap -  return $ fmap snd mmgr +  let r = boundsIOArray eventManagerArray +  (cap, _) <- threadCapability t +  -- It is possible that we've just increased the number of capabilities and the +  -- new EventManager has not yet been constructed by +  -- 'ioManagerCapabilitiesChanged'. We expect this to happen very rarely. +  -- T21561 exercises this. +  -- Two options to proceed: +  --  1) return the EventManager for capability 0. This is guaranteed to exist, +  --     and "shouldn't" cause any correctness issues. +  --  2) Busy wait, with or without a call to 'yield'. This can't deadlock, +  --     because we must be on a brand capability and there must be a call to +  --     'ioManagerCapabilitiesChanged' pending. +  -- +  -- We take the second option, with the yield, judging it the most robust. +  if not (inRange r cap) +    then yield >> getSystemEventManager +    else fmap snd `fmap` readIOArray eventManagerArray cap  getSystemEventManager_ :: IO EventManager  getSystemEventManager_ = do @@ -191,6 +229,22 @@ getSystemEventManager_ = do  foreign import ccall unsafe "getOrSetSystemEventThreadEventManagerStore"      getOrSetSystemEventThreadEventManagerStore :: Ptr a -> IO (Ptr a) +-- Note [The eventManager Array] +-- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +-- A mutable array holding the current EventManager for each capability +-- An entry is Nothing only while the eventmanagers are initialised, see +-- 'startIOManagerThread' and 'ioManagerCapabilitiesChanged'. +-- The 'ThreadId' at array position 'cap'  will have been 'forkOn'ed capabality +-- 'cap'. +-- The array will be swapped with newer arrays when the number of capabilities +-- changes(via 'setNumCapabilities'). However: +--   * the size of the arrays will never decrease; and +--   * The 'EventManager's in the array are not replaced with other +--     'EventManager' constructors. +-- +-- This is a similar strategy as the rts uses for it's +-- capabilities array (n_capabilities is the size of the array, +-- enabled_capabilities' is the number of active capabilities).  eventManager :: IORef (IOArray Int (Maybe (ThreadId, EventManager)))  eventManager = unsafePerformIO $ do      numCaps <- getNumCapabilities @@ -351,7 +405,9 @@ ioManagerCapabilitiesChanged =                  startIOManagerThread new_eventManagerArray                -- update the event manager array reference: -              writeIORef eventManager new_eventManagerArray +              atomicWriteIORef eventManager new_eventManagerArray +              -- We need an atomic write here because 'eventManager' is accessed +              -- unsynchronized in 'getSystemEventManager' and 'closeFdWith'        else when (new_n_caps > numEnabled) $              forM_ [numEnabled..new_n_caps-1] $ \i -> do                Just (_,mgr) <- readIOArray eventManagerArray i | 
