1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
|
try:
import threading
except ImportError:
import dummy_threading as threading
class ReadWriteMutex(object):
"""A mutex which allows multiple readers, single writer."""
def __init__(self):
# counts how many asynchronous methods are executing
self.async = 0
# pointer to thread that is the current sync operation
self.current_sync_operation = None
# condition object to lock on
self.condition = threading.Condition(threading.Lock())
def acquire_read_lock(self, wait = True):
self.condition.acquire()
try:
# see if a synchronous operation is waiting to start
# or is already running, in which case we wait (or just
# give up and return)
if wait:
while self.current_sync_operation is not None:
self.condition.wait()
else:
if self.current_sync_operation is not None:
return False
self.async += 1
finally:
self.condition.release()
if not wait:
return True
def release_read_lock(self):
self.condition.acquire()
try:
self.async -= 1
# check if we are the last asynchronous reader thread
# out the door.
if self.async == 0:
# yes. so if a sync operation is waiting, notifyAll to wake
# it up
if self.current_sync_operation is not None:
self.condition.notifyAll()
elif self.async < 0:
raise LockError("Synchronizer error - too many "
"release_read_locks called")
finally:
self.condition.release()
def acquire_write_lock(self, wait = True):
self.condition.acquire()
try:
# here, we are not a synchronous reader, and after returning,
# assuming waiting or immediate availability, we will be.
if wait:
# if another sync is working, wait
while self.current_sync_operation is not None:
self.condition.wait()
else:
# if another sync is working,
# we dont want to wait, so forget it
if self.current_sync_operation is not None:
return False
# establish ourselves as the current sync
# this indicates to other read/write operations
# that they should wait until this is None again
self.current_sync_operation = threading.currentThread()
# now wait again for asyncs to finish
if self.async > 0:
if wait:
# wait
self.condition.wait()
else:
# we dont want to wait, so forget it
self.current_sync_operation = None
return False
finally:
self.condition.release()
if not wait:
return True
def release_write_lock(self):
self.condition.acquire()
try:
if self.current_sync_operation is not threading.currentThread():
raise Exception("Synchronizer error - current thread doesn't "
"have the write lock")
# reset the current sync operation so
# another can get it
self.current_sync_operation = None
# tell everyone to get ready
self.condition.notifyAll()
finally:
# everyone go !!
self.condition.release()
|