diff options
author | Kavon Farvardin <kavon@farvard.in> | 2018-09-23 15:29:37 -0500 |
---|---|---|
committer | Kavon Farvardin <kavon@farvard.in> | 2018-09-23 15:29:37 -0500 |
commit | 84c2ad99582391005b5e873198b15e9e9eb4f78d (patch) | |
tree | caa8c2f2ec7e97fbb4977263c6817c9af5025cf4 /testsuite/tests/concurrent/should_run | |
parent | 8ddb47cfcf5776e9a3c55fd37947c8a95e00fa12 (diff) | |
parent | e68b439fe5de61b9a2ca51af472185c62ccb8b46 (diff) | |
download | haskell-wip/T13904.tar.gz |
update to current master againwip/T13904
Diffstat (limited to 'testsuite/tests/concurrent/should_run')
6 files changed, 177 insertions, 10 deletions
diff --git a/testsuite/tests/concurrent/should_run/T13916.hs b/testsuite/tests/concurrent/should_run/T13916.hs new file mode 100755 index 0000000000..e81aabb5a8 --- /dev/null +++ b/testsuite/tests/concurrent/should_run/T13916.hs @@ -0,0 +1,33 @@ +module Main where + +import Data.IORef +import System.IO.Unsafe +import Control.Concurrent.STM +import Control.Concurrent.Async +import Control.Concurrent +import System.IO +import System.Directory +import System.FilePath +import T13916_Bracket + +type Thing = MVar Bool + +main :: IO () +main = do + withEnvCache limit spawner $ \cache -> + forConcurrently_ [1..1000 :: Int] $ \n -> withEnv cache (\handle -> put handle n) + where + limit :: Limit + limit = Hard 1 + + put handle n = return () + +spawner :: Spawner Thing +spawner = Spawner + { maker = mkhandle + , killer = \thing -> takeMVar thing >> putMVar thing True + , isDead = \thing -> readMVar thing + } + +mkhandle :: IO Thing +mkhandle = newMVar False diff --git a/testsuite/tests/concurrent/should_run/T13916_Bracket.hs b/testsuite/tests/concurrent/should_run/T13916_Bracket.hs new file mode 100755 index 0000000000..b09adfc860 --- /dev/null +++ b/testsuite/tests/concurrent/should_run/T13916_Bracket.hs @@ -0,0 +1,135 @@ +{-# LANGUAGE RankNTypes #-} +{-# LANGUAGE RecordWildCards #-} +{- | +Module : Bracket +Description : Handling multiple environments with bracket-like apis +Maintainer : robertkennedy@clearwateranalytics.com +Stability : stable + +This module is meant for ie Sql or mongo connections, where you may wish for some number of easy to grab +environments. In particular, this assumes your connection has some initialization/release functions + +This module creates bugs with any optimizations enabled. The bugs do not occur if the program is in the same +module. +-} +module T13916_Bracket ( + -- * Data Types + Spawner(..), Limit(..), Cache, + -- * Usage + withEnvCache, withEnv + ) where + +import Control.Concurrent.STM +import Control.Concurrent.STM.TSem +import Control.Exception hiding (handle) +import Control.Monad +import Data.Vector (Vector) +import qualified Data.Vector as Vector + +-- * Data Types +-- | Tells the program how many environments it is allowed to spawn. +-- A `Lax` limit will spawn extra connections if the `Cache` is empty, +-- while a `Hard` limit will not spawn any more than the given number of connections simultaneously. +-- +-- @since 0.3.7 +data Limit = Hard {getLimit :: {-# unpack #-} !Int} + +data Spawner env = Spawner + { maker :: IO env + , killer :: env -> IO () + , isDead :: env -> IO Bool + } + +type VCache env = Vector (TMVar env) +data Cache env = Unlimited { spawner :: Spawner env + , vcache :: !(VCache env) + } + | Limited { spawner :: Spawner env + , vcache :: !(VCache env) + , envsem :: TSem + } + +-- ** Initialization +withEnvCache :: Limit -> Spawner env -> (Cache env -> IO a) -> IO a +withEnvCache limit spawner = bracket starter releaseCache + where starter = case limit of + Hard n -> Limited spawner <$> initializeEmptyCache n <*> atomically (newTSem n) + +-- ** Using a single value +withEnv :: Cache env -> (env -> IO a) -> IO a +withEnv cache = case cache of + Unlimited{..} -> withEnvUnlimited spawner vcache + Limited{..} -> withEnvLimited spawner vcache envsem + +-- *** Unlimited +-- | Takes an env and returns it on completion of the function. +-- If all envs are already taken or closed, this will spin up a new env. +-- When the function finishes, this will attempt to put the env into the cache. If it cannot, +-- it will kill the env. Note this can lead to many concurrent connections. +-- +-- @since 0.3.5 +withEnvUnlimited :: Spawner env -> VCache env -> (env -> IO a) -> IO a +withEnvUnlimited Spawner{..} cache = bracket taker putter + where + taker = do + mpipe <- atomically $ tryTakeEnv cache + case mpipe of + Nothing -> maker + Just env -> isDead env >>= \b -> if not b then return env else killer env >> maker + + putter env = do + accepted <- atomically $ tryPutEnv cache env + unless accepted $ killer env + +-- *** Limited +-- | Takes an env and returns it on completion of the function. +-- If all envs are already taken, this will wait. This should have a constant number of environments +-- +-- @since 0.3.6 +withEnvLimited :: Spawner env -> VCache env -> TSem -> (env -> IO a) -> IO a +withEnvLimited spawner vcache envsem = bracket taker putter + where + taker = limitMakeEnv spawner vcache envsem + putter env = atomically $ putEnv vcache env + +limitMakeEnv :: Spawner env -> VCache env -> TSem -> IO env +limitMakeEnv Spawner{..} vcache envsem = go + where + go = do + eenvpermission <- atomically $ ( Left <$> takeEnv vcache ) + `orElse` ( Right <$> waitTSem envsem ) + case eenvpermission of + Right () -> maker + Left env -> do + -- Given our env, we check if it's dead. If it's not, we are done and return it. + -- If it is dead, we release it, signal that a new env can be created, and then recurse + isdead <- isDead env + if not isdead then return env + else do + killer env + atomically $ signalTSem envsem + go + +-- * Low level +initializeEmptyCache :: Int -> IO (VCache env) +initializeEmptyCache n | n < 1 = return mempty + | otherwise = Vector.replicateM n newEmptyTMVarIO + +takeEnv :: VCache env -> STM env +takeEnv = Vector.foldl folding retry + where folding m stmenv = m `orElse` takeTMVar stmenv + +tryTakeEnv :: VCache env -> STM (Maybe env) +tryTakeEnv cache = (Just <$> takeEnv cache) `orElse` pure Nothing + +putEnv :: VCache env -> env -> STM () +putEnv cache env = Vector.foldl folding retry cache + where folding m stmenv = m `orElse` putTMVar stmenv env + +tryPutEnv :: VCache env -> env -> STM Bool +tryPutEnv cache env = (putEnv cache env *> return True) `orElse` pure False + +releaseCache :: Cache env -> IO () +releaseCache cache = Vector.mapM_ qkRelease (vcache cache) + where qkRelease tenv = atomically (tryTakeTMVar tenv) + >>= maybe (return ()) (killer $ spawner cache) diff --git a/testsuite/tests/concurrent/should_run/T5611.stderr.mingw32 b/testsuite/tests/concurrent/should_run/T5611.stderr.mingw32 new file mode 100644 index 0000000000..c034e20430 --- /dev/null +++ b/testsuite/tests/concurrent/should_run/T5611.stderr.mingw32 @@ -0,0 +1 @@ +T5611: <stdout>: commitBuffer: user error (Exception delivered successfully) diff --git a/testsuite/tests/concurrent/should_run/all.T b/testsuite/tests/concurrent/should_run/all.T index 69b8ad7a1e..08f439c34e 100644 --- a/testsuite/tests/concurrent/should_run/all.T +++ b/testsuite/tests/concurrent/should_run/all.T @@ -40,8 +40,7 @@ test('T3429', [ extra_run_opts('+RTS -C0.001 -RTS'), compile_and_run, ['']) # without -O, goes into an infinite loop -# GHCi does not detect the infinite loop. We should really fix this. -test('T4030', omit_ways('ghci'), compile_and_run, ['-O']) +test('T4030', normal, compile_and_run, ['-O']) # each of these runs for about a second test('throwto001', [reqlib('random'), extra_run_opts('1000 2000')], @@ -119,10 +118,7 @@ setTestOpts(when(fast(), skip)) test('conc001', normal, compile_and_run, ['']) test('conc002', normal, compile_and_run, ['']) - -# Omit GHCi way - it blows up to 0.5G. Something to do with the threaded RTS? -test('conc004', omit_ways(['ghci']), compile_and_run, ['']) - +test('conc004', normal, compile_and_run, ['']) test('conc007', extra_run_opts('+RTS -H128M -RTS'), compile_and_run, ['']) test('conc008', normal, compile_and_run, ['']) test('conc009', exit_code(1), compile_and_run, ['']) @@ -158,7 +154,7 @@ else: conc023_ways = normal test('conc023', [when(fast(), skip), - reqlib('random'), + reqlib('random'), multi_cpu_race, conc023_ways], compile_and_run, ['']) test('conc024', normal, compile_and_run, ['']) @@ -284,3 +280,5 @@ test('hs_try_putmvar003', # Check forkIO exception determinism under optimization test('T13330', normal, compile_and_run, ['-O']) +test('T13916', [reqlib('vector'), reqlib('stm'), reqlib('async')], + compile_and_run, ['-O2']) diff --git a/testsuite/tests/concurrent/should_run/conc065.hs b/testsuite/tests/concurrent/should_run/conc065.hs index 8f6c18b79d..7e75381a26 100644 --- a/testsuite/tests/concurrent/should_run/conc065.hs +++ b/testsuite/tests/concurrent/should_run/conc065.hs @@ -4,7 +4,7 @@ import Control.Concurrent import Control.Exception -- This loop spends most of its time printing stuff, and very occasionally --- pops outside 'block'. This test ensures that an thread trying to +-- pops outside 'block'. This test ensures that a thread trying to -- throwTo this thread will eventually succeed. loop = mask_ (print "alive") >> loop diff --git a/testsuite/tests/concurrent/should_run/setnumcapabilities001.hs b/testsuite/tests/concurrent/should_run/setnumcapabilities001.hs index 27685f0894..a18d75aeef 100644 --- a/testsuite/tests/concurrent/should_run/setnumcapabilities001.hs +++ b/testsuite/tests/concurrent/should_run/setnumcapabilities001.hs @@ -15,7 +15,7 @@ main = do forM_ (cycle ([n,n-1..1] ++ [2..n-1])) $ \m -> do setNumCapabilities m threadDelay t - printf "%d" (nqueens q) + printf "%d\n" (nqueens q) killThread t -- If we don't kill the child thread, it might be about to -- call setNumCapabilities() in C when the main thread exits, @@ -34,7 +34,7 @@ nqueens nq = length (pargen 0 []) pargen :: Int -> [Int] -> [[Int]] pargen n b | n >= threshold = iterate gen [b] !! (nq - n) - | otherwise = concat bs + | otherwise = concat bs where bs = map (pargen (n+1)) (gen [b]) `using` parList rdeepseq threshold = 3 |