1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
|
%
% (c) The AQUA Project, Glasgow University, 1994-1996
%
\section[Concurrent]{Concurrent Haskell constructs}
A common interface to a collection of useful concurrency abstractions.
Currently, the collection only contains the abstractions found in the
{\em Concurrent Haskell} paper (presented at the Haskell Workshop
1995, draft available via \tr{ftp} from
\tr{ftp.dcs.gla.ac.uk/pub/glasgow-fp/drafts}.) plus a couple of
others. See the paper and the individual files containing the module
definitions for explanation on what they do.
\begin{code}
module Concurrent (
module ChannelVar,
module Channel,
module Semaphore,
module SampleVar
, ThreadId
-- Forking and suchlike
, forkIO -- :: IO () -> IO ThreadId
, myThreadId -- :: IO ThreadId
, killThread -- :: ThreadId -> IO ()
, raiseInThread -- :: ThreadId -> Exception -> IO ()
, par -- :: a -> b -> b
, seq -- :: a -> b -> b
, fork -- :: a -> b -> b
, yield -- :: IO ()
{-threadDelay, threadWaitRead, threadWaitWrite,-}
-- MVars
, MVar -- abstract
, newMVar -- :: a -> IO (MVar a)
, newEmptyMVar -- :: IO (MVar a)
, takeMVar -- :: MVar a -> IO a
, putMVar -- :: MVar a -> a -> IO ()
, readMVar -- :: MVar a -> IO a
, swapMVar -- :: MVar a -> a -> IO a
, isEmptyMVar -- :: MVar a -> IO Bool
-- merging of streams
, mergeIO -- :: [a] -> [a] -> IO [a]
, nmergeIO -- :: [[a]] -> IO [a]
) where
import Parallel
import ChannelVar
import Channel
import Semaphore
import SampleVar
import PrelConc
import PrelHandle ( topHandler )
import PrelException
import PrelIOBase ( IO(..) )
import IO
import PrelAddr ( Addr )
import PrelArr ( ByteArray )
import PrelPack ( packString )
import PrelIOBase ( unsafePerformIO , unsafeInterleaveIO )
import PrelBase ( fork# )
infixr 0 `fork`
\end{code}
\begin{code}
forkIO :: IO () -> IO ThreadId
forkIO action = IO $ \ s ->
case (fork# action_plus s) of (# s1, id #) -> (# s1, ThreadId id #)
where
action_plus =
catchException action
(topHandler False{-don't quit on exception raised-})
{-# INLINE fork #-}
fork :: a -> b -> b
fork x y = unsafePerformIO (forkIO (x `seq` return ())) `seq` y
\end{code}
\begin{code}
max_buff_size :: Int
max_buff_size = 1
mergeIO :: [a] -> [a] -> IO [a]
nmergeIO :: [[a]] -> IO [a]
mergeIO ls rs
= newEmptyMVar >>= \ tail_node ->
newMVar tail_node >>= \ tail_list ->
newQSem max_buff_size >>= \ e ->
newMVar 2 >>= \ branches_running ->
let
buff = (tail_list,e)
in
forkIO (suckIO branches_running buff ls) >>
forkIO (suckIO branches_running buff rs) >>
takeMVar tail_node >>= \ val ->
signalQSem e >>
return val
type Buffer a
= (MVar (MVar [a]), QSem)
suckIO :: MVar Int -> Buffer a -> [a] -> IO ()
suckIO branches_running buff@(tail_list,e) vs
= case vs of
[] -> takeMVar branches_running >>= \ val ->
if val == 1 then
takeMVar tail_list >>= \ node ->
putMVar node [] >>
putMVar tail_list node
else
putMVar branches_running (val-1)
(x:xs) ->
waitQSem e >>
takeMVar tail_list >>= \ node ->
newEmptyMVar >>= \ next_node ->
unsafeInterleaveIO (
takeMVar next_node >>= \ x ->
signalQSem e >>
return x) >>= \ next_node_val ->
putMVar node (x:next_node_val) >>
putMVar tail_list next_node >>
suckIO branches_running buff xs
nmergeIO lss
= let
len = length lss
in
newEmptyMVar >>= \ tail_node ->
newMVar tail_node >>= \ tail_list ->
newQSem max_buff_size >>= \ e ->
newMVar len >>= \ branches_running ->
let
buff = (tail_list,e)
in
mapIO (\ x -> forkIO (suckIO branches_running buff x)) lss >>
takeMVar tail_node >>= \ val ->
signalQSem e >>
return val
where
mapIO f xs = sequence (map f xs)
\end{code}
|