blob: 2c2ae7746588ba98d1675fdfaa2d321ab0ef76c1 (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
|
{-
%
% (c) The GRASP/AQUA Project, Glasgow University, 1995
%
\section[Merge]{Mergeing streams}
Avoiding the loss of ref. transparency by attaching the merge to the
IO monad.
\begin{code}
-}
module Merge
(
mergeIO, --:: [a] -> [a] -> IO [a]
nmergeIO --:: [[a]] -> IO [a]
) where
import Semaphore
import GHCbase
import GHCio ( stThen )
import Concurrent ( forkIO )
max_buff_size = 1
mergeIO :: [a] -> [a] -> IO [a]
nmergeIO :: [[a]] -> IO [a]
#ifndef __CONCURRENT_HASKELL__
mergeIO _ _ = return []
nmergeIO _ = return []
#else
mergeIO ls rs
= newEmptyMVar >>= \ tail_node ->
newMVar tail_node >>= \ tail_list ->
newQSem max_buff_size >>= \ e ->
newMVar 2 >>= \ branches_running ->
let
buff = (tail_list,e)
in
forkIO (suckIO branches_running buff ls) >>
forkIO (suckIO branches_running buff rs) >>
takeMVar tail_node >>= \ val ->
signalQSem e >>
return val
type Buffer a
= (MVar (MVar [a]), QSem)
suckIO :: MVar Int -> Buffer a -> [a] -> IO ()
suckIO branches_running buff@(tail_list,e) vs
= case vs of
[] -> takeMVar branches_running >>= \ val ->
if val == 1 then
takeMVar tail_list >>= \ node ->
putMVar node [] >>
putMVar tail_list node
else
putMVar branches_running (val-1)
(x:xs) ->
waitQSem e >>
takeMVar tail_list >>= \ node ->
newEmptyMVar >>= \ next_node ->
unsafeInterleavePrimIO ( ioToPrimIO $
takeMVar next_node >>= \ x ->
signalQSem e >>
return x) `stThen` \ next_node_val ->
putMVar node (x:next_node_val) >>
putMVar tail_list next_node >>
suckIO branches_running buff xs
nmergeIO lss
= let
len = length lss
in
newEmptyMVar >>= \ tail_node ->
newMVar tail_node >>= \ tail_list ->
newQSem max_buff_size >>= \ e ->
newMVar len >>= \ branches_running ->
let
buff = (tail_list,e)
in
mapIO (\ x -> forkIO (suckIO branches_running buff x)) lss >>
takeMVar tail_node >>= \ val ->
signalQSem e >>
return val
where
mapIO f xs = accumulate (map f xs)
#endif {- __CONCURRENT_HASKELL__ -}
|