summaryrefslogtreecommitdiff
path: root/ghc/lib/concurrent/Channel.lhs
blob: 18dd20e57c95ea64951d652bfd8bd8a835a06987 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
%
% (c) The GRASP/AQUA Project, Glasgow University, 1995-97
%
\section[Channel]{Unbounded Channels}

Standard, unbounded channel abstraction.

\begin{code}
module Channel
       (
	 {- abstract type defined -}
        Chan,

	 {- creator -}
	newChan,	 -- :: IO (Chan a)

	 {- operators -}
	writeChan,	 -- :: Chan a -> a -> IO ()
	readChan,	 -- :: Chan a -> IO a
	dupChan,	 -- :: Chan a -> IO (Chan a)
	unGetChan,	 -- :: Chan a -> a -> IO ()

	isEmptyChan,     -- :: Chan a -> IO Bool

	 {- stream interface -}
	getChanContents, -- :: Chan a -> IO [a]
	writeList2Chan	 -- :: Chan a -> [a] -> IO ()

       ) where

import Prelude
import PrelConc
import PrelST
import PrelIOBase ( unsafeInterleaveIO )
\end{code}

A channel is represented by two @MVar@s keeping track of the two ends
of the channel contents,i.e.,  the read- and write ends. Empty @MVar@s
are used to handle consumers trying to read from an empty channel.

\begin{code}
data Chan a
 = Chan (MVar (Stream a))
        (MVar (Stream a))

type Stream a = MVar (ChItem a)

data ChItem a = ChItem a (Stream a)
\end{code}

See the Concurrent Haskell paper for a diagram explaining the
how the different channel operations proceed.

@newChan@ sets up the read and write end of a channel by initialising
these two @MVar@s with an empty @MVar@.

\begin{code}
newChan :: IO (Chan a)
newChan = do
   hole  <- newEmptyMVar
   read  <- newMVar hole
   write <- newMVar hole
   return (Chan read write)
\end{code}

To put an element on a channel, a new hole at the write end is created.
What was previously the empty @MVar@ at the back of the channel is then
filled in with a new stream element holding the entered value and the
new hole.

\begin{code}
writeChan :: Chan a -> a -> IO ()
writeChan (Chan _read write) val = do
   new_hole <- newEmptyMVar
   old_hole <- takeMVar write
   putMVar write new_hole
   putMVar old_hole (ChItem val new_hole)

readChan :: Chan a -> IO a
readChan (Chan read _write) = do
  read_end		    <- takeMVar read
  (ChItem val new_read_end) <- takeMVar read_end
  putMVar read new_read_end
  return val


dupChan :: Chan a -> IO (Chan a)
dupChan (Chan _read write) = do
   new_read <- newEmptyMVar
   hole     <- readMVar write
   putMVar new_read hole
   return (Chan new_read write)

unGetChan :: Chan a -> a -> IO ()
unGetChan (Chan read _write) val = do
   new_read_end <- newEmptyMVar
   read_end     <- takeMVar read
   putMVar new_read_end (ChItem val read_end)
   putMVar read new_read_end

isEmptyChan :: Chan a -> IO Bool
isEmptyChan (Chan read write) = do
   r <- takeMVar read
   w <- readMVar write
   let eq = r == w
   eq `seq` putMVar read r
   return eq

\end{code}

Operators for interfacing with functional streams.

\begin{code}
getChanContents :: Chan a -> IO [a]
getChanContents ch
  = unsafeInterleaveIO (do
	x  <- readChan ch
    	xs <- getChanContents ch
    	return (x:xs)
    )

-------------
writeList2Chan :: Chan a -> [a] -> IO ()
writeList2Chan ch ls = sequence_ (map (writeChan ch) ls)

\end{code}