diff options
Diffstat (limited to 'ghc/lib/concurrent')
-rw-r--r-- | ghc/lib/concurrent/Channel.lhs | 126 | ||||
-rw-r--r-- | ghc/lib/concurrent/ChannelVar.lhs | 55 | ||||
-rw-r--r-- | ghc/lib/concurrent/Concurrent.lhs | 179 | ||||
-rw-r--r-- | ghc/lib/concurrent/Makefile | 81 | ||||
-rw-r--r-- | ghc/lib/concurrent/Merge.lhs | 19 | ||||
-rw-r--r-- | ghc/lib/concurrent/Parallel.lhs | 44 | ||||
-rw-r--r-- | ghc/lib/concurrent/SampleVar.lhs | 86 | ||||
-rw-r--r-- | ghc/lib/concurrent/Semaphore.lhs | 111 |
8 files changed, 0 insertions, 701 deletions
diff --git a/ghc/lib/concurrent/Channel.lhs b/ghc/lib/concurrent/Channel.lhs deleted file mode 100644 index 18dd20e57c..0000000000 --- a/ghc/lib/concurrent/Channel.lhs +++ /dev/null @@ -1,126 +0,0 @@ -% -% (c) The GRASP/AQUA Project, Glasgow University, 1995-97 -% -\section[Channel]{Unbounded Channels} - -Standard, unbounded channel abstraction. - -\begin{code} -module Channel - ( - {- abstract type defined -} - Chan, - - {- creator -} - newChan, -- :: IO (Chan a) - - {- operators -} - writeChan, -- :: Chan a -> a -> IO () - readChan, -- :: Chan a -> IO a - dupChan, -- :: Chan a -> IO (Chan a) - unGetChan, -- :: Chan a -> a -> IO () - - isEmptyChan, -- :: Chan a -> IO Bool - - {- stream interface -} - getChanContents, -- :: Chan a -> IO [a] - writeList2Chan -- :: Chan a -> [a] -> IO () - - ) where - -import Prelude -import PrelConc -import PrelST -import PrelIOBase ( unsafeInterleaveIO ) -\end{code} - -A channel is represented by two @MVar@s keeping track of the two ends -of the channel contents,i.e., the read- and write ends. Empty @MVar@s -are used to handle consumers trying to read from an empty channel. - -\begin{code} -data Chan a - = Chan (MVar (Stream a)) - (MVar (Stream a)) - -type Stream a = MVar (ChItem a) - -data ChItem a = ChItem a (Stream a) -\end{code} - -See the Concurrent Haskell paper for a diagram explaining the -how the different channel operations proceed. - -@newChan@ sets up the read and write end of a channel by initialising -these two @MVar@s with an empty @MVar@. - -\begin{code} -newChan :: IO (Chan a) -newChan = do - hole <- newEmptyMVar - read <- newMVar hole - write <- newMVar hole - return (Chan read write) -\end{code} - -To put an element on a channel, a new hole at the write end is created. -What was previously the empty @MVar@ at the back of the channel is then -filled in with a new stream element holding the entered value and the -new hole. - -\begin{code} -writeChan :: Chan a -> a -> IO () -writeChan (Chan _read write) val = do - new_hole <- newEmptyMVar - old_hole <- takeMVar write - putMVar write new_hole - putMVar old_hole (ChItem val new_hole) - -readChan :: Chan a -> IO a -readChan (Chan read _write) = do - read_end <- takeMVar read - (ChItem val new_read_end) <- takeMVar read_end - putMVar read new_read_end - return val - - -dupChan :: Chan a -> IO (Chan a) -dupChan (Chan _read write) = do - new_read <- newEmptyMVar - hole <- readMVar write - putMVar new_read hole - return (Chan new_read write) - -unGetChan :: Chan a -> a -> IO () -unGetChan (Chan read _write) val = do - new_read_end <- newEmptyMVar - read_end <- takeMVar read - putMVar new_read_end (ChItem val read_end) - putMVar read new_read_end - -isEmptyChan :: Chan a -> IO Bool -isEmptyChan (Chan read write) = do - r <- takeMVar read - w <- readMVar write - let eq = r == w - eq `seq` putMVar read r - return eq - -\end{code} - -Operators for interfacing with functional streams. - -\begin{code} -getChanContents :: Chan a -> IO [a] -getChanContents ch - = unsafeInterleaveIO (do - x <- readChan ch - xs <- getChanContents ch - return (x:xs) - ) - -------------- -writeList2Chan :: Chan a -> [a] -> IO () -writeList2Chan ch ls = sequence_ (map (writeChan ch) ls) - -\end{code} diff --git a/ghc/lib/concurrent/ChannelVar.lhs b/ghc/lib/concurrent/ChannelVar.lhs deleted file mode 100644 index 50c893cb03..0000000000 --- a/ghc/lib/concurrent/ChannelVar.lhs +++ /dev/null @@ -1,55 +0,0 @@ -% -% (c) The GRASP/AQUA Project, Glasgow University, 1995 -% -\section[ChannelVar]{Channel variables} - -Channel variables, are one-element channels described in the Concurrent -Haskell paper (available from @ftp://ftp.dcs.gla.ac.uk/pub/glasgow-fp/drafts@) - -\begin{code} -module ChannelVar - ( - {- abstract -} - CVar, - newCVar, -- :: IO (CVar a) - writeCVar, -- :: CVar a -> a -> IO () - readCVar, -- :: CVar a -> IO a - MVar - - ) where - -import Prelude -import PrelConc -\end{code} - -@MVars@ provide the basic mechanisms for synchronising access to a shared -resource. @CVars@, or channel variables, provide an abstraction that guarantee -that the producer is not allowed to run riot, but enforces the interleaved -access to the channel variable,i.e., a producer is forced to wait up for -a consumer to remove the previous value before it can deposit a new one in the @CVar@. - -\begin{code} - -data CVar a - = CVar (MVar a) -- prod -> cons - (MVar ()) -- cons -> prod - -newCVar :: IO (CVar a) -writeCVar :: CVar a -> a -> IO () -readCVar :: CVar a -> IO a - -newCVar - = newEmptyMVar >>= \ datum -> - newMVar () >>= \ ack -> - return (CVar datum ack) - -writeCVar (CVar datum ack) val - = takeMVar ack >> - putMVar datum val >> - return () - -readCVar (CVar datum ack) - = takeMVar datum >>= \ val -> - putMVar ack () >> - return val -\end{code} diff --git a/ghc/lib/concurrent/Concurrent.lhs b/ghc/lib/concurrent/Concurrent.lhs deleted file mode 100644 index 132922ef45..0000000000 --- a/ghc/lib/concurrent/Concurrent.lhs +++ /dev/null @@ -1,179 +0,0 @@ -% -% (c) The AQUA Project, Glasgow University, 1994-1996 -% - -\section[Concurrent]{Concurrent Haskell constructs} - -A common interface to a collection of useful concurrency abstractions. -Currently, the collection only contains the abstractions found in the -{\em Concurrent Haskell} paper (presented at the Haskell Workshop -1995, draft available via \tr{ftp} from -\tr{ftp.dcs.gla.ac.uk/pub/glasgow-fp/drafts}.) plus a couple of -others. See the paper and the individual files containing the module -definitions for explanation on what they do. - -\begin{code} -module Concurrent ( - module ChannelVar, - module Channel, - module Semaphore, - module SampleVar - - , ThreadId - - -- Forking and suchlike - , forkIO -- :: IO () -> IO ThreadId - , myThreadId -- :: IO ThreadId - , killThread -- :: ThreadId -> IO () - , raiseInThread -- :: ThreadId -> Exception -> IO () - , par -- :: a -> b -> b - , seq -- :: a -> b -> b - , fork -- :: a -> b -> b - , yield -- :: IO () - - , threadDelay -- :: Int -> IO () - , threadWaitRead -- :: Int -> IO () - , threadWaitWrite -- :: Int -> IO () - - -- MVars - , MVar -- abstract - , newMVar -- :: a -> IO (MVar a) - , newEmptyMVar -- :: IO (MVar a) - , takeMVar -- :: MVar a -> IO a - , putMVar -- :: MVar a -> a -> IO () - , readMVar -- :: MVar a -> IO a - , swapMVar -- :: MVar a -> a -> IO a - , isEmptyMVar -- :: MVar a -> IO Bool - - -- merging of streams - , mergeIO -- :: [a] -> [a] -> IO [a] - , nmergeIO -- :: [[a]] -> IO [a] - ) where - -import Parallel -import ChannelVar -import Channel -import Semaphore -import SampleVar -import PrelConc -import PrelHandle ( topHandler ) -import PrelException -import PrelIOBase ( IO(..) ) -import IO -import PrelAddr ( Addr ) -import PrelArr ( ByteArray ) -import PrelPack ( packString ) -import PrelIOBase ( unsafePerformIO , unsafeInterleaveIO ) -import PrelBase ( fork# ) -import PrelGHC ( Addr#, unsafeCoerce# ) - -infixr 0 `fork` -\end{code} - -Thread Ids, specifically the instances of Eq and Ord for these things. -The ThreadId type itself is defined in std/PrelConc.lhs. - -Rather than define a new primitve, we use a little helper function -cmp_thread in the RTS. - -\begin{code} -foreign import ccall "cmp_thread" unsafe cmp_thread :: Addr# -> Addr# -> Int --- Returns -1, 0, 1 - -cmpThread :: ThreadId -> ThreadId -> Ordering -cmpThread (ThreadId t1) (ThreadId t2) = - case cmp_thread (unsafeCoerce# t1) (unsafeCoerce# t2) of - -1 -> LT - 0 -> EQ - 1 -> GT - -instance Eq ThreadId where - t1 == t2 = - case t1 `cmpThread` t2 of - EQ -> True - _ -> False - -instance Ord ThreadId where - compare = cmpThread -\end{code} - -\begin{code} -forkIO :: IO () -> IO ThreadId -forkIO action = IO $ \ s -> - case (fork# action_plus s) of (# s1, id #) -> (# s1, ThreadId id #) - where - action_plus = - catchException action - (topHandler False{-don't quit on exception raised-}) - -{-# INLINE fork #-} -fork :: a -> b -> b -fork x y = unsafePerformIO (forkIO (x `seq` return ())) `seq` y -\end{code} - - -\begin{code} -max_buff_size :: Int -max_buff_size = 1 - -mergeIO :: [a] -> [a] -> IO [a] -nmergeIO :: [[a]] -> IO [a] - -mergeIO ls rs - = newEmptyMVar >>= \ tail_node -> - newMVar tail_node >>= \ tail_list -> - newQSem max_buff_size >>= \ e -> - newMVar 2 >>= \ branches_running -> - let - buff = (tail_list,e) - in - forkIO (suckIO branches_running buff ls) >> - forkIO (suckIO branches_running buff rs) >> - takeMVar tail_node >>= \ val -> - signalQSem e >> - return val - -type Buffer a - = (MVar (MVar [a]), QSem) - -suckIO :: MVar Int -> Buffer a -> [a] -> IO () - -suckIO branches_running buff@(tail_list,e) vs - = case vs of - [] -> takeMVar branches_running >>= \ val -> - if val == 1 then - takeMVar tail_list >>= \ node -> - putMVar node [] >> - putMVar tail_list node - else - putMVar branches_running (val-1) - (x:xs) -> - waitQSem e >> - takeMVar tail_list >>= \ node -> - newEmptyMVar >>= \ next_node -> - unsafeInterleaveIO ( - takeMVar next_node >>= \ x -> - signalQSem e >> - return x) >>= \ next_node_val -> - putMVar node (x:next_node_val) >> - putMVar tail_list next_node >> - suckIO branches_running buff xs - -nmergeIO lss - = let - len = length lss - in - newEmptyMVar >>= \ tail_node -> - newMVar tail_node >>= \ tail_list -> - newQSem max_buff_size >>= \ e -> - newMVar len >>= \ branches_running -> - let - buff = (tail_list,e) - in - mapIO (\ x -> forkIO (suckIO branches_running buff x)) lss >> - takeMVar tail_node >>= \ val -> - signalQSem e >> - return val - where - mapIO f xs = sequence (map f xs) -\end{code} diff --git a/ghc/lib/concurrent/Makefile b/ghc/lib/concurrent/Makefile deleted file mode 100644 index 4aa7428936..0000000000 --- a/ghc/lib/concurrent/Makefile +++ /dev/null @@ -1,81 +0,0 @@ -# $Id: Makefile,v 1.9 1999/10/29 13:55:40 sof Exp $ -# -# Makefile for concurrent libraries. -# - -TOP = ../.. -include $(TOP)/mk/boilerplate.mk - -WAYS=$(GhcLibWays) - -HC = $(GHC) - -#----------------------------------------------------------------------------- -# Setting the standard variables -# - -LIBRARY = libHSconcurrent$(_way).a -HS_SRCS = $(wildcard *.lhs) -HS_OBJS = $(HS_SRCS:.lhs=.$(way_)o) -LIBOBJS = $(HS_OBJS) -HS_IFACES= $(HS_SRCS:.lhs=.$(way_)hi) - - -#----------------------------------------------------------------------------- -# Setting the GHC compile options - -SRC_HC_OPTS += -recomp -cpp -fglasgow-exts -fvia-C -Rghc-timing $(GhcLibHcOpts) -SRC_MKDEPENDHS_OPTS += -optdep--include-prelude - -# -# Profiling options -WAY_p_HC_OPTS += -GPrelude -WAY_mr_HC_OPTS += -GPrelude - -# -# Object and interface files have suffixes tagged with their ways -# -ifneq "$(way)" "" -SRC_HC_OPTS += -hisuf $(way_)hi -endif - -Parallel_HC_OPTS += -fglasgow-exts - -#----------------------------------------------------------------------------- -# Dependency generation - -SRC_MKDEPENDHS_OPTS += -I$(GHC_INCLUDE_DIR) - -DLL_NAME = HSconc.dll -DLL_IMPLIB_NAME = libHSconcurrent_imp.a -SRC_BLD_DLL_OPTS += --export-all --output-def=HSconc.def DllVersionInfo.o -SRC_BLD_DLL_OPTS += -lwinmm -lHSrts_imp -lHScbits_imp -lHS_imp -lgmp -L. -L../../rts/gmp -L../../rts -L../std -L../std/cbits - -ifeq "$(way)" "dll" -all :: DllVersionInfo.o -endif - -#----------------------------------------------------------------------------- -# Installation; need to install .hi files as well as libraries -# -# The interface files are put inside the $(libdir), since they -# might (potentially) be platform specific.. -# -# override is used here because for binary distributions, datadir is -# set on the command line. sigh. -# -override datadir:=$(libdir)/imports/concurrent - -# -# Files to install from here -# -INSTALL_LIBS += $(LIBRARY) -INSTALL_DATAS += $(HS_IFACES) -ifeq "$(EnableWin32DLLs)" "YES" -INSTALL_PROGS += $(DLL_NAME) -INSTALL_LIBS += $(patsubst %.a, %_imp.a, $(LIBRARY)) -INSTALL_DATAS += dLL_ifs.hi -endif - -include $(TOP)/mk/target.mk - diff --git a/ghc/lib/concurrent/Merge.lhs b/ghc/lib/concurrent/Merge.lhs deleted file mode 100644 index 395bd2ff05..0000000000 --- a/ghc/lib/concurrent/Merge.lhs +++ /dev/null @@ -1,19 +0,0 @@ -% -% (c) The GRASP/AQUA Project, Glasgow University, 1995 -% -\section[Merge]{Mergeing streams} - -Avoiding the loss of ref. transparency by attaching the merge to the -IO monad. - -(The ops. are now defined in Concurrent to avoid module loop trouble). - -\begin{code} -module Merge - ( - mergeIO - , nmergeIO - ) where - -import Concurrent -\end{code} diff --git a/ghc/lib/concurrent/Parallel.lhs b/ghc/lib/concurrent/Parallel.lhs deleted file mode 100644 index 2089219aae..0000000000 --- a/ghc/lib/concurrent/Parallel.lhs +++ /dev/null @@ -1,44 +0,0 @@ -% -% (c) The GRASP/AQUA Project, Glasgow University, 1995-1996 -% -\section[Parallel]{Parallel Constructs} - -\begin{code} -module Parallel (par, seq -- re-exported -#if defined(__GRANSIM__) - , parGlobal, parLocal, parAt, parAtAbs, parAtRel, parAtForNow -#endif - ) where - -import PrelConc ( par ) - -#if defined(__GRANSIM__) -import PrelBase -import PrelErr ( parError ) -import PrelGHC ( parGlobal#, parLocal#, parAt#, parAtAbs#, parAtRel#, parAtForNow# ) - -{-# INLINE parGlobal #-} -{-# INLINE parLocal #-} -{-# INLINE parAt #-} -{-# INLINE parAtAbs #-} -{-# INLINE parAtRel #-} -{-# INLINE parAtForNow #-} -parGlobal :: Int -> Int -> Int -> Int -> a -> b -> b -parLocal :: Int -> Int -> Int -> Int -> a -> b -> b -parAt :: Int -> Int -> Int -> Int -> a -> b -> c -> c -parAtAbs :: Int -> Int -> Int -> Int -> Int -> a -> b -> b -parAtRel :: Int -> Int -> Int -> Int -> Int -> a -> b -> b -parAtForNow :: Int -> Int -> Int -> Int -> a -> b -> c -> c - -parGlobal (I# w) (I# g) (I# s) (I# p) x y = case (parGlobal# x w g s p y) of { 0# -> parError; _ -> y } -parLocal (I# w) (I# g) (I# s) (I# p) x y = case (parLocal# x w g s p y) of { 0# -> parError; _ -> y } - -parAt (I# w) (I# g) (I# s) (I# p) v x y = case (parAt# x v w g s p y) of { 0# -> parError; _ -> y } -parAtAbs (I# w) (I# g) (I# s) (I# p) (I# q) x y = case (parAtAbs# x q w g s p y) of { 0# -> parError; _ -> y } -parAtRel (I# w) (I# g) (I# s) (I# p) (I# q) x y = case (parAtRel# x q w g s p y) of { 0# -> parError; _ -> y } -parAtForNow (I# w) (I# g) (I# s) (I# p) v x y = case (parAtForNow# x v w g s p y) of { 0# -> parError; _ -> y } - -#endif - --- Maybe parIO and the like could be added here later. -\end{code} diff --git a/ghc/lib/concurrent/SampleVar.lhs b/ghc/lib/concurrent/SampleVar.lhs deleted file mode 100644 index 75476b6d58..0000000000 --- a/ghc/lib/concurrent/SampleVar.lhs +++ /dev/null @@ -1,86 +0,0 @@ -% -% (c) The GRASP/AQUA Project, Glasgow University, 1995 -% -\section[SampleVar]{Sample variables} - -Sample variables are slightly different from a normal @MVar@: - -\begin{itemize} -\item Reading an empty @SampleVar@ causes the reader to block. - (same as @takeMVar@ on empty @MVar@) -\item Reading a filled @SampleVar@ empties it and returns value. - (same as @takeMVar@) -\item Writing to an empty @SampleVar@ fills it with a value, and -potentially, wakes up a blocked reader (same as for @putMVar@ on empty @MVar@). -\item Writing to a filled @SampleVar@ overwrites the current value. - (different from @putMVar@ on full @MVar@.) -\end{itemize} - -\begin{code} -module SampleVar - ( - SampleVar, -- :: type _ = - - newEmptySampleVar, -- :: IO (SampleVar a) - newSampleVar, -- :: a -> IO (SampleVar a) - emptySampleVar, -- :: SampleVar a -> IO () - readSampleVar, -- :: SampleVar a -> IO a - writeSampleVar -- :: SampleVar a -> a -> IO () - - ) where - -import PrelConc - - -type SampleVar a - = MVar (Int, -- 1 == full - -- 0 == empty - -- <0 no of readers blocked - MVar a) - --- Initally, a @SampleVar@ is empty/unfilled. - -newEmptySampleVar :: IO (SampleVar a) -newEmptySampleVar = do - v <- newEmptyMVar - newMVar (0,v) - -newSampleVar :: a -> IO (SampleVar a) -newSampleVar a = do - v <- newEmptyMVar - putMVar v a - newMVar (1,v) - -emptySampleVar :: SampleVar a -> IO () -emptySampleVar v = do - (readers, var) <- takeMVar v - if readers >= 0 then - putMVar v (0,var) - else - putMVar v (readers,var) - --- --- filled => make empty and grab sample --- not filled => try to grab value, empty when read val. --- -readSampleVar :: SampleVar a -> IO a -readSampleVar svar = do - (readers,val) <- takeMVar svar - putMVar svar (readers-1,val) - takeMVar val - --- --- filled => overwrite --- not filled => fill, write val --- -writeSampleVar :: SampleVar a -> a -> IO () -writeSampleVar svar v = do - (readers,val) <- takeMVar svar - case readers of - 1 -> - swapMVar val v >> - putMVar svar (1,val) - _ -> - putMVar val v >> - putMVar svar (min 1 (readers+1), val) -\end{code} diff --git a/ghc/lib/concurrent/Semaphore.lhs b/ghc/lib/concurrent/Semaphore.lhs deleted file mode 100644 index 76f847d512..0000000000 --- a/ghc/lib/concurrent/Semaphore.lhs +++ /dev/null @@ -1,111 +0,0 @@ -% -% (c) The GRASP/AQUA Project, Glasgow University, 1995 -% -\section[Semaphore]{Quantity semaphores} - -General/quantity semaphores - -\begin{code} -module Semaphore - ( - {- abstract -} - QSem, - - newQSem, -- :: Int -> IO QSem - waitQSem, -- :: QSem -> IO () - signalQSem, -- :: QSem -> IO () - - {- abstract -} - QSemN, - newQSemN, -- :: Int -> IO QSemN - waitQSemN, -- :: QSemN -> Int -> IO () - signalQSemN -- :: QSemN -> Int -> IO () - - ) where - -import PrelConc -\end{code} - -General semaphores are also implemented readily in terms of shared -@MVar@s, only have to catch the case when the semaphore is tried -waited on when it is empty (==0). Implement this in the same way as -shared variables are implemented - maintaining a list of @MVar@s -representing threads currently waiting. The counter is a shared -variable, ensuring the mutual exclusion on its access. - -\begin{code} -newtype QSem = QSem (MVar (Int, [MVar ()])) - -newQSem :: Int -> IO QSem -newQSem init = do - sem <- newMVar (init,[]) - return (QSem sem) - -waitQSem :: QSem -> IO () -waitQSem (QSem sem) = do - (avail,blocked) <- takeMVar sem -- gain ex. access - if avail > 0 then - putMVar sem (avail-1,[]) - else do - block <- newEmptyMVar - {- - Stuff the reader at the back of the queue, - so as to preserve waiting order. A signalling - process then only have to pick the MVar at the - front of the blocked list. - - The version of waitQSem given in the paper could - lead to starvation. - -} - putMVar sem (0, blocked++[block]) - takeMVar block - -signalQSem :: QSem -> IO () -signalQSem (QSem sem) = do - (avail,blocked) <- takeMVar sem - case blocked of - [] -> putMVar sem (avail+1,[]) - - (block:blocked') -> do - putMVar sem (0,blocked') - putMVar block () - -\end{code} - - -\begin{code} -newtype QSemN = QSemN (MVar (Int,[(Int,MVar ())])) - -newQSemN :: Int -> IO QSemN -newQSemN init = do - sem <- newMVar (init,[]) - return (QSemN sem) - -waitQSemN :: QSemN -> Int -> IO () -waitQSemN (QSemN sem) sz = do - (avail,blocked) <- takeMVar sem -- gain ex. access - if (avail - sz) > 0 then - -- discharging 'sz' still leaves the semaphore - -- in an 'unblocked' state. - putMVar sem (avail-sz,[]) - else do - block <- newEmptyMVar - putMVar sem (avail, blocked++[(sz,block)]) - takeMVar block - -signalQSemN :: QSemN -> Int -> IO () -signalQSemN (QSemN sem) n = do - (avail,blocked) <- takeMVar sem - (avail',blocked') <- free (avail+n) blocked - putMVar sem (avail',blocked') - where - free avail [] = return (avail,[]) - free avail ((req,block):blocked) - | avail >= req = do - putMVar block () - free (avail-req) blocked - | otherwise = do - (avail',blocked') <- free avail blocked - return (avail',(req,block):blocked') - -\end{code} |