1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
|
(***********************************************************************)
(* *)
(* OCaml *)
(* *)
(* David Nowak and Xavier Leroy, projet Cristal, INRIA Rocquencourt *)
(* *)
(* Copyright 1996 Institut National de Recherche en Informatique et *)
(* en Automatique. All rights reserved. This file is distributed *)
(* under the terms of the GNU Library General Public License, with *)
(* the special exception on linking described in file ../../LICENSE. *)
(* *)
(***********************************************************************)
(* $Id: event.ml 12858 2012-08-10 14:45:51Z maranget $ *)
(* Events *)
type 'a basic_event =
{ poll: unit -> bool;
(* If communication can take place immediately, return true. *)
suspend: unit -> unit;
(* Offer the communication on the channel and get ready
to suspend current process. *)
result: unit -> 'a }
(* Return the result of the communication *)
type 'a behavior = int ref -> Condition.t -> int -> 'a basic_event
type 'a event =
Communication of 'a behavior
| Choose of 'a event list
| WrapAbort of 'a event * (unit -> unit)
| Guard of (unit -> 'a event)
(* Communication channels *)
type 'a channel =
{ mutable writes_pending: 'a communication Queue.t;
(* All offers to write on it *)
mutable reads_pending: 'a communication Queue.t }
(* All offers to read from it *)
(* Communication offered *)
and 'a communication =
{ performed: int ref; (* -1 if not performed yet, set to the number *)
(* of the matching communication after rendez-vous. *)
condition: Condition.t; (* To restart the blocked thread. *)
mutable data: 'a option; (* The data sent or received. *)
event_number: int } (* Event number in select *)
(* Create a channel *)
let new_channel () =
{ writes_pending = Queue.create();
reads_pending = Queue.create() }
(* Basic synchronization function *)
let masterlock = Mutex.create()
let do_aborts abort_env genev performed =
if abort_env <> [] then begin
if performed >= 0 then begin
let ids_done = snd genev.(performed) in
List.iter
(fun (id,f) -> if not (List.mem id ids_done) then f ())
abort_env
end else begin
List.iter (fun (_,f) -> f ()) abort_env
end
end
let basic_sync abort_env genev =
let performed = ref (-1) in
let condition = Condition.create() in
let bev = Array.create (Array.length genev)
(fst (genev.(0)) performed condition 0) in
for i = 1 to Array.length genev - 1 do
bev.(i) <- (fst genev.(i)) performed condition i
done;
(* See if any of the events is already activable *)
let rec poll_events i =
if i >= Array.length bev
then false
else bev.(i).poll() || poll_events (i+1) in
Mutex.lock masterlock;
if not (poll_events 0) then begin
(* Suspend on all events *)
for i = 0 to Array.length bev - 1 do bev.(i).suspend() done;
(* Wait until the condition is signalled *)
Condition.wait condition masterlock
end;
Mutex.unlock masterlock;
(* Extract the result *)
if abort_env = [] then
(* Preserve tail recursion *)
bev.(!performed).result()
else begin
let num = !performed in
let result = bev.(num).result() in
(* Handle the aborts and return the result *)
do_aborts abort_env genev num;
result
end
(* Apply a random permutation on an array *)
let scramble_array a =
let len = Array.length a in
if len = 0 then invalid_arg "Event.choose";
for i = len - 1 downto 1 do
let j = Random.int (i + 1) in
let temp = a.(i) in a.(i) <- a.(j); a.(j) <- temp
done;
a
(* Main synchronization function *)
let gensym = let count = ref 0 in fun () -> incr count; !count
let rec flatten_event
(abort_list : int list)
(accu : ('a behavior * int list) list)
(accu_abort : (int * (unit -> unit)) list)
ev =
match ev with
Communication bev -> ((bev,abort_list) :: accu) , accu_abort
| WrapAbort (ev,fn) ->
let id = gensym () in
flatten_event (id :: abort_list) accu ((id,fn)::accu_abort) ev
| Choose evl ->
let rec flatten_list accu' accu_abort'= function
ev :: l ->
let (accu'',accu_abort'') =
flatten_event abort_list accu' accu_abort' ev in
flatten_list accu'' accu_abort'' l
| [] -> (accu',accu_abort') in
flatten_list accu accu_abort evl
| Guard fn -> flatten_event abort_list accu accu_abort (fn ())
let sync ev =
let (evl,abort_env) = flatten_event [] [] [] ev in
basic_sync abort_env (scramble_array(Array.of_list evl))
(* Event polling -- like sync, but non-blocking *)
let basic_poll abort_env genev =
let performed = ref (-1) in
let condition = Condition.create() in
let bev = Array.create(Array.length genev)
(fst genev.(0) performed condition 0) in
for i = 1 to Array.length genev - 1 do
bev.(i) <- fst genev.(i) performed condition i
done;
(* See if any of the events is already activable *)
let rec poll_events i =
if i >= Array.length bev
then false
else bev.(i).poll() || poll_events (i+1) in
Mutex.lock masterlock;
let ready = poll_events 0 in
if ready then begin
(* Extract the result *)
Mutex.unlock masterlock;
let result = Some(bev.(!performed).result()) in
do_aborts abort_env genev !performed; result
end else begin
(* Cancel the communication offers *)
performed := 0;
Mutex.unlock masterlock;
do_aborts abort_env genev (-1);
None
end
let poll ev =
let (evl,abort_env) = flatten_event [] [] [] ev in
basic_poll abort_env (scramble_array(Array.of_list evl))
(* Remove all communication opportunities already synchronized *)
let cleanup_queue q =
let q' = Queue.create() in
Queue.iter (fun c -> if !(c.performed) = -1 then Queue.add c q') q;
q'
(* Event construction *)
let always data =
Communication(fun performed condition evnum ->
{ poll = (fun () -> performed := evnum; true);
suspend = (fun () -> ());
result = (fun () -> data) })
let send channel data =
Communication(fun performed condition evnum ->
let wcomm =
{ performed = performed;
condition = condition;
data = Some data;
event_number = evnum } in
{ poll = (fun () ->
let rec poll () =
let rcomm = Queue.take channel.reads_pending in
if !(rcomm.performed) >= 0 then
poll ()
else begin
rcomm.data <- wcomm.data;
performed := evnum;
rcomm.performed := rcomm.event_number;
Condition.signal rcomm.condition
end in
try
poll();
true
with Queue.Empty ->
false);
suspend = (fun () ->
channel.writes_pending <- cleanup_queue channel.writes_pending;
Queue.add wcomm channel.writes_pending);
result = (fun () -> ()) })
let receive channel =
Communication(fun performed condition evnum ->
let rcomm =
{ performed = performed;
condition = condition;
data = None;
event_number = evnum } in
{ poll = (fun () ->
let rec poll () =
let wcomm = Queue.take channel.writes_pending in
if !(wcomm.performed) >= 0 then
poll ()
else begin
rcomm.data <- wcomm.data;
performed := evnum;
wcomm.performed := wcomm.event_number;
Condition.signal wcomm.condition
end in
try
poll();
true
with Queue.Empty ->
false);
suspend = (fun () ->
channel.reads_pending <- cleanup_queue channel.reads_pending;
Queue.add rcomm channel.reads_pending);
result = (fun () ->
match rcomm.data with
None -> invalid_arg "Event.receive"
| Some res -> res) })
let choose evl = Choose evl
let wrap_abort ev fn = WrapAbort(ev,fn)
let guard fn = Guard fn
let rec wrap ev fn =
match ev with
Communication genev ->
Communication(fun performed condition evnum ->
let bev = genev performed condition evnum in
{ poll = bev.poll;
suspend = bev.suspend;
result = (fun () -> fn(bev.result())) })
| Choose evl ->
Choose(List.map (fun ev -> wrap ev fn) evl)
| WrapAbort (ev, f') ->
WrapAbort (wrap ev fn, f')
| Guard gu ->
Guard(fun () -> wrap (gu()) fn)
(* Convenience functions *)
let select evl = sync(Choose evl)
|