diff options
| author | Patrick Palka <patrick@parcs.ath.cx> | 2013-08-21 16:55:52 -0400 | 
|---|---|---|
| committer | Patrick Palka <patrick@parcs.ath.cx> | 2013-08-26 22:21:16 -0400 | 
| commit | 8d9edfed74e8fd03933d4e3540f6372c269de538 (patch) | |
| tree | 7c311bd22ec0a353cf55af0a7b8ff9716f3eb006 | |
| parent | e8d0dc7e00e7c55bd23bcccd059683f8a7eadfd1 (diff) | |
| download | haskell-8d9edfed74e8fd03933d4e3540f6372c269de538.tar.gz | |
Implement the parallel upsweep (#910)
The parallel upsweep is the parallel counterpart to the default
sequential upsweep. It attempts to compile modules in parallel by
subdividing the work of the upsweep into parts that can be executed
concurrently by multiple Haskell threads.
In order to enable the parallel upsweep, the user has to pass the -jN
flag to GHC, where N is an optional number denoting the number of jobs,
or modules, to compile in parallel, like with GNU make. In GHC this just
sets the number of capabilities to N.
| -rw-r--r-- | compiler/main/DynFlags.hs | 8 | ||||
| -rw-r--r-- | compiler/main/GhcMake.hs | 354 | 
2 files changed, 359 insertions, 3 deletions
| diff --git a/compiler/main/DynFlags.hs b/compiler/main/DynFlags.hs index cb7d43c3f0..89ba319238 100644 --- a/compiler/main/DynFlags.hs +++ b/compiler/main/DynFlags.hs @@ -579,6 +579,10 @@ data DynFlags = DynFlags {    ruleCheck             :: Maybe String,    strictnessBefore      :: [Int],       -- ^ Additional demand analysis +  parUpsweepNum         :: Maybe Int,   -- ^ The number of modules to compile in parallel +                                        --   during the upsweep, where Nothing ==> compile as +                                        --   many in parallel as there are CPUs. +    simplTickFactor       :: Int,         -- ^ Multiplier for simplifier ticks    specConstrThreshold   :: Maybe Int,   -- ^ Threshold for SpecConstr    specConstrCount       :: Maybe Int,   -- ^ Max number of specialisations for any one function @@ -1254,6 +1258,8 @@ defaultDynFlags mySettings =          historySize             = 20,          strictnessBefore        = [], +        parUpsweepNum           = Just 1, +          cmdlineHcIncludes       = [],          importPaths             = ["."],          mainModIs               = mAIN, @@ -2012,6 +2018,8 @@ dynamic_flags = [                             addWarn "-#include and INCLUDE pragmas are deprecated: They no longer have any effect"))    , Flag "v"        (OptIntSuffix setVerbosity) +  , Flag "j"        (OptIntSuffix (\n -> upd (\d -> d {parUpsweepNum = n}))) +          ------- ways --------------------------------------------------------    , Flag "prof"           (NoArg (addWay WayProf))    , Flag "eventlog"       (NoArg (addWay WayEventLog)) diff --git a/compiler/main/GhcMake.hs b/compiler/main/GhcMake.hs index c43b18a62a..c4b63b675a 100644 --- a/compiler/main/GhcMake.hs +++ b/compiler/main/GhcMake.hs @@ -40,9 +40,10 @@ import TcRnMonad        ( initIfaceCheck )  import Bag              ( listToBag )  import BasicTypes  import Digraph -import Exception        ( evaluate, tryIO ) +import Exception        ( tryIO, gbracket, gfinally )  import FastString  import Maybes           ( expectJust, mapCatMaybes ) +import MonadUtils       ( allM )  import Outputable  import Panic  import SrcLoc @@ -54,17 +55,24 @@ import Util  import qualified Data.Map as Map  import qualified FiniteMap as Map ( insertListWith ) +import Control.Concurrent ( forkIOWithUnmask, killThread ) +import Control.Concurrent.MVar +import Control.Concurrent.QSem +import Control.Exception  import Control.Monad  import Data.IORef  import Data.List  import qualified Data.List as List  import Data.Maybe +import Data.Ord ( comparing )  import Data.Time  import System.Directory  import System.FilePath  import System.IO        ( fixIO )  import System.IO.Error  ( isDoesNotExistError ) +import GHC.Conc ( getNumProcessors, getNumCapabilities, setNumCapabilities ) +  -- -----------------------------------------------------------------------------  -- Loading the program @@ -253,16 +261,22 @@ load how_much = do          mg = stable_mg ++ unstable_mg      -- clean up between compilations -    let cleanup hsc_env = intermediateCleanTempFiles dflags +    let cleanup hsc_env = intermediateCleanTempFiles (hsc_dflags hsc_env)                                (flattenSCCs mg2_with_srcimps)                                hsc_env      liftIO $ debugTraceMsg dflags 2 (hang (text "Ready for upsweep")                                 2 (ppr mg)) +    n_jobs <- case parUpsweepNum dflags of +                    Nothing -> liftIO getNumProcessors +                    Just n  -> return n +    let upsweep_fn | n_jobs > 1 = parUpsweep n_jobs +                   | otherwise  = upsweep +      setSession hsc_env{ hsc_HPT = emptyHomePackageTable }      (upsweep_ok, modsUpswept) -       <- upsweep pruned_hpt stable_mods cleanup mg +       <- upsweep_fn pruned_hpt stable_mods cleanup mg      -- Make modsDone be the summaries for each home module now      -- available; this should equal the domain of hpt3. @@ -595,6 +609,340 @@ checkStability hpt sccs all_home_mods = foldl checkSCC ([],[]) sccs                          linkableTime l >= ms_hs_date ms                  _other  -> False +{- Parallel Upsweep + - + - The parallel upsweep attempts to concurrently compile the modules in the + - compilation graph using multiple Haskell threads. + - + - The Algorithm + - + - A Haskell thread is spawned for each module in the module graph, waiting for + - its direct dependencies to finish building before it itself begins to build. + - + - Each module is associated with an initially empty MVar that stores the + - result of that particular module's compile. If the compile succeeded, then + - the HscEnv (synchronized by an MVar) is updated with the fresh HMI of that + - module, and the module's HMI is deleted from the old HPT (synchronized by an + - IORef) to save space. + - + - Instead of immediately outputting messages to the standard handles, all + - compilation output is deferred to a per-module TQueue. A QSem is used to + - limit the number of workers that are compiling simultaneously. + - + - Meanwhile, the main thread sequentially loops over all the modules in the + - module graph, outputting the messages stored in each module's TQueue. +-} + +-- | Each module is given a unique 'LogQueue' to redirect compilation messages +-- to. A 'Nothing' value contains the result of compilation, and denotes the +-- end of the message queue. +data LogQueue = LogQueue !(IORef [Maybe (Severity, SrcSpan, PprStyle, MsgDoc)]) +                         !(MVar ()) + +-- | The graph of modules to compile and their corresponding result 'MVar' and +-- 'LogQueue'. +type CompilationGraph = [(ModSummary, MVar SuccessFlag, LogQueue)] + +-- | Build a 'CompilationGraph' out of a list of strongly-connected modules, +-- also returning the first, if any, encountered module cycle. +buildCompGraph :: [SCC ModSummary] -> IO (CompilationGraph, Maybe [ModSummary]) +buildCompGraph [] = return ([], Nothing) +buildCompGraph (scc:sccs) = case scc of +    AcyclicSCC ms -> do +        mvar <- newEmptyMVar +        log_queue <- do +            ref <- newIORef [] +            sem <- newEmptyMVar +            return (LogQueue ref sem) +        (rest,cycle) <- buildCompGraph sccs +        return ((ms,mvar,log_queue):rest, cycle) +    CyclicSCC mss -> return ([], Just mss) + +-- | The entry point to the parallel upsweep. +-- +-- See also the simpler, sequential 'upsweep'. +parUpsweep +    :: GhcMonad m +    => Int +    -- ^ The number of workers we wish to run in parallel +    -> HomePackageTable +    -> ([ModuleName],[ModuleName]) +    -> (HscEnv -> IO ()) +    -> [SCC ModSummary] +    -> m (SuccessFlag, +          [ModSummary]) +parUpsweep n_jobs old_hpt stable_mods cleanup sccs = do +    hsc_env <- getSession +    let dflags = hsc_dflags hsc_env + +    -- The bits of shared state we'll be using: + +    -- The global HscEnv is updated with the module's HMI when a module +    -- successfully compiles. +    hsc_env_var <- liftIO $ newMVar hsc_env + +    -- The old HPT is used for recompilation checking in upsweep_mod. When a +    -- module sucessfully gets compiled, its HMI is pruned from the old HPT. +    old_hpt_var <- liftIO $ newIORef old_hpt + +    -- The list of modules that have so far been successfully compiled. This is +    -- used to re-typecheck module loops after the last module in the loop has +    -- been compiled (see reTypecheckLoop). +    mods_done_var <- liftIO $ newIORef [] + +    -- What we use to limit parallelism with. +    par_sem <- liftIO $ newQSem n_jobs + + +    let updNumCapabilities = liftIO $ do +            n_capabilities <- getNumCapabilities +            unless (n_capabilities /= 1) $ setNumCapabilities n_jobs +            return n_capabilities +    -- Reset the number of capabilities once the upsweep ends. +    let resetNumCapabilities orig_n = liftIO $ setNumCapabilities orig_n + +    gbracket updNumCapabilities resetNumCapabilities $ \_ -> do + +    -- Sync the global session with the latest HscEnv once the upsweep ends. +    let finallySyncSession io = io `gfinally` do +            hsc_env <- liftIO $ readMVar hsc_env_var +            setSession hsc_env + +    finallySyncSession $ do + +    -- Build the compilation graph out of the list of SCCs. Module cycles are +    -- handled at the very end, after some useful work gets done. Note that +    -- this list is topologically sorted (by virtue of 'sccs' being sorted so). +    (comp_graph,cycle) <- liftIO $ buildCompGraph sccs +    let comp_graph_w_idx = zip comp_graph [1..] + +    -- Build a Map out of the compilation graph with which we can efficiently +    -- look up the result MVar associated with a particular home module. +    let mod_map :: Map.Map (Module,Bool) (MVar SuccessFlag, Int) +        mod_map = Map.fromList [ ((ms_mod ms, isBootSummary ms), (mvar,idx)) +                               | ((ms,mvar,_),idx) <- comp_graph_w_idx ] + +    -- For each module in the module graph, spawn a worker thread that will +    -- compile this module. +    let { spawnWorkers = forM comp_graph_w_idx $ \((mod,!mvar,!log_queue),!mod_idx) -> +            forkIOWithUnmask $ \unmask -> do +                -- Replace the default log_action with one that writes each +                -- message to the module's log_queue. The main thread will +                -- deal with synchronously printing these messages. +                -- +                -- Use a local filesToClean var so that we can clean up +                -- intermediate files in a timely fashion (as soon as +                -- compilation for that module is finished) without having to +                -- worry about accidentally deleting a simultaneous compile's +                -- important files. +                lcl_files_to_clean <- newIORef [] +                let lcl_dflags = dflags { log_action = parLogAction log_queue +                                        , filesToClean = lcl_files_to_clean } + +                -- Unmask asynchronous exceptions and perform the thread-local +                -- work to compile the module (see parUpsweep_one). +                m_res <- try $ unmask $ prettyPrintGhcErrors lcl_dflags $ +                        parUpsweep_one mod mod_map lcl_dflags cleanup par_sem +                                       hsc_env_var old_hpt_var mods_done_var +                                       stable_mods mod_idx (length sccs) + +                res <- case m_res of +                    Right flag -> return flag +                    Left exc -> do +                        -- Don't print ThreadKilled exceptions: they are used +                        -- to kill the worker thread in the event of a user +                        -- interrupt, and the user doesn't have to be informed +                        -- about that. +                        when (fromException exc /= Just ThreadKilled) +                             (errorMsg lcl_dflags (text (show exc))) +                        return Failed + +                -- Populate the result MVar. +                putMVar mvar res + +                -- Write the end marker to the message queue, telling the main +                -- thread that it can stop waiting for messages from this +                -- particular compile. +                writeLogQueue log_queue Nothing + +                -- Add the remaining files that weren't cleaned up to the +                -- global filesToClean ref, for cleanup later. +                files_kept <- readIORef (filesToClean lcl_dflags) +                addFilesToClean dflags files_kept + + +        -- Kill all the workers, masking interrupts (since killThread is +        -- interruptible). XXX: This is not ideal. +        ; killWorkers = uninterruptibleMask_ . mapM_ killThread } + + +    -- Spawn the workers, making sure to kill them later. Collect the results +    -- of each compile. +    results <- liftIO $ bracket spawnWorkers killWorkers $ \_ -> +        -- Loop over each module in the compilation graph in order, printing +        -- each message from its log_queue. +        forM comp_graph $ \(mod,mvar,log_queue) -> do +            printLogs dflags log_queue +            result <- readMVar mvar +            if succeeded result then return (Just mod) else return Nothing + + +    -- Collect and return the ModSummaries of all the successful compiles. +    -- NB: Reverse this list to maintain output parity with the sequential upsweep. +    let ok_results = reverse (catMaybes results) + +    -- Handle any cycle in the original compilation graph and return the result +    -- of the upsweep. +    case cycle of +        Just mss -> do +            liftIO $ fatalErrorMsg dflags (cyclicModuleErr mss) +            return (Failed,ok_results) +        Nothing  -> do +            let success_flag = successIf (all isJust results) +            return (success_flag,ok_results) + +  where +    writeLogQueue :: LogQueue -> Maybe (Severity,SrcSpan,PprStyle,MsgDoc) -> IO () +    writeLogQueue (LogQueue ref sem) msg = do +        atomicModifyIORef ref $ \msgs -> (msg:msgs,()) +        _ <- tryPutMVar sem () +        return () + +    -- The log_action callback that is used to synchronize messages from a +    -- worker thread. +    parLogAction :: LogQueue -> LogAction +    parLogAction log_queue _dflags !severity !srcSpan !style !msg = do +        writeLogQueue log_queue (Just (severity,srcSpan,style,msg)) + +    -- Print each message from the log_queue using the log_action from the +    -- session's DynFlags. +    printLogs :: DynFlags -> LogQueue -> IO () +    printLogs !dflags (LogQueue ref sem) = read_msgs +      where read_msgs = do +                takeMVar sem +                msgs <- atomicModifyIORef ref $ \xs -> ([], reverse xs) +                print_loop msgs + +            print_loop [] = read_msgs +            print_loop (x:xs) = case x of +                Just (severity,srcSpan,style,msg) -> do +                    log_action dflags dflags severity srcSpan style msg +                    print_loop xs +                -- Exit the loop once we encounter the end marker. +                Nothing -> return () + +-- The interruptible subset of the worker threads' work. +parUpsweep_one +    :: ModSummary +    -- ^ The module we wish to compile +    -> Map.Map (Module,Bool) (MVar SuccessFlag, Int) +    -- ^ The map of home modules and their result MVar +    -> DynFlags +    -- ^ The thread-local DynFlags +    -> (HscEnv -> IO ()) +    -- ^ The callback for cleaning up intermediate files +    -> QSem +    -- ^ The semaphore for limiting the number of simultaneous compiles +    -> MVar HscEnv +    -- ^ The MVar that synchronizes updates to the global HscEnv +    -> IORef HomePackageTable +    -- ^ The old HPT +    -> IORef [ModSummary] +    -- ^ The list of modules that have successfully compiled +    -> ([ModuleName],[ModuleName]) +    -- ^ Lists of stable objects and BCOs +    -> Int +    -- ^ The index of this module +    -> Int +    -- ^ The total number of modules +    -> IO SuccessFlag +    -- ^ The result of this compile +parUpsweep_one mod mod_map lcl_dflags cleanup par_sem hsc_env_var +               old_hpt_var mods_done_var stable_mods mod_index num_mods = do +    let home_imps     = map unLoc $ ms_home_imps mod +        home_src_imps = map unLoc $ ms_home_srcimps mod +        all_imps      = zip home_imps (repeat False) ++ +                        zip home_src_imps (repeat True) + +        -- The module's home-module dependencies. +        dependencies_w_idx = +              [ (mvar,idx) | (imp_name,is_boot) <- all_imps +                           , let imp = mkModule (thisPackage lcl_dflags) imp_name +                           , Just (mvar,idx) <- [Map.lookup (imp,is_boot) mod_map] ] + +        -- Sort the list of dependencies in reverse-topological order. This +        -- way, by the time we get woken up by the result of an earlier +        -- dependency, subsequent dependencies are more likely to have +        -- finished. This step effectively reduces the number of MVars that +        -- each thread blocks on. +        dependencies = map fst $ sortBy (flip (comparing snd)) dependencies_w_idx + +    -- Wait for the all the module's dependencies to finish building. +    deps_ok <- allM (fmap succeeded . readMVar) dependencies + +    -- We can't build this module if any of its dependencies failed to build. +    if not deps_ok +      then return Failed +      else do +        -- Any hsc_env at this point is OK to use since we only really require +        -- that the HPT contains the HMIs of our dependencies. +        hsc_env <- readMVar hsc_env_var +        old_hpt <- readIORef old_hpt_var + +        let logger err = printBagOfErrors lcl_dflags (srcErrorMessages err) + +        -- Limit the number of parallel compiles. +        let withSem sem = bracket_ (waitQSem sem) (signalQSem sem) +        mb_mod_info <- withSem par_sem $ +            handleSourceError (\err -> do logger err; return Nothing) $ do +                -- Have the ModSummary and HscEnv point to our local log_action +                -- and filesToClean var. +                let lcl_mod = localize_mod mod +                let lcl_hsc_env = localize_hsc_env hsc_env + +                -- Compile the module. +                mod_info <- upsweep_mod lcl_hsc_env old_hpt stable_mods lcl_mod +                                        mod_index num_mods +                return (Just mod_info) + +        case mb_mod_info of +            Nothing -> return Failed +            Just mod_info -> do +                let this_mod = ms_mod_name mod + +                -- Prune the old HPT unless this is an hs-boot module. +                unless (isBootSummary mod) $ +                    atomicModifyIORef old_hpt_var $ \old_hpt -> +                        (delFromUFM old_hpt this_mod, ()) + +                -- Update and fetch the list of completed modules. +                mods_done <- atomicModifyIORef mods_done_var $ \mods_done -> +                                let mods_done' = mod:mods_done +                                in (mods_done',mods_done') + +                -- Update and fetch the global HscEnv, and re-typecheck any +                -- module loops. +                lcl_hsc_env' <- modifyMVar hsc_env_var $ \hsc_env -> do +                    let hsc_env' = hsc_env { hsc_HPT = addToUFM (hsc_HPT hsc_env) +                                                                this_mod mod_info } +                    hsc_env'' <- reTypecheckLoop hsc_env' mod mods_done +                    return (hsc_env'', localize_hsc_env hsc_env'') + +                -- Clean up any intermediate files. +                cleanup lcl_hsc_env' +                return Succeeded + +  where +    localize_mod mod +        = mod { ms_hspp_opts = (ms_hspp_opts mod) +                 { log_action = log_action lcl_dflags +                 , filesToClean = filesToClean lcl_dflags } } + +    localize_hsc_env hsc_env +        = hsc_env { hsc_dflags = (hsc_dflags hsc_env) +                     { log_action = log_action lcl_dflags +                     , filesToClean = filesToClean lcl_dflags } } +  -- -----------------------------------------------------------------------------  --  -- | The upsweep | 
