summaryrefslogtreecommitdiff
path: root/dogpile/readwrite_lock.py
blob: 51498f6a61428b2dfb4644cffb660b30df3dbe86 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
try:
    import threading
except ImportError:
    import dummy_threading as threading

class ReadWriteMutex(object):
    """A mutex which allows multiple readers, single writer."""

    def __init__(self):
        # counts how many asynchronous methods are executing
        self.async = 0

        # pointer to thread that is the current sync operation
        self.current_sync_operation = None

        # condition object to lock on
        self.condition = threading.Condition(threading.Lock())

    def acquire_read_lock(self, wait = True):
        self.condition.acquire()
        try:
            # see if a synchronous operation is waiting to start
            # or is already running, in which case we wait (or just
            # give up and return)
            if wait:
                while self.current_sync_operation is not None:
                    self.condition.wait()
            else:
                if self.current_sync_operation is not None:
                    return False

            self.async += 1
        finally:
            self.condition.release()

        if not wait: 
            return True

    def release_read_lock(self):
        self.condition.acquire()
        try:
            self.async -= 1

            # check if we are the last asynchronous reader thread 
            # out the door.
            if self.async == 0:
                # yes. so if a sync operation is waiting, notifyAll to wake
                # it up
                if self.current_sync_operation is not None:
                    self.condition.notifyAll()
            elif self.async < 0:
                raise LockError("Synchronizer error - too many "
                                "release_read_locks called")
        finally:
            self.condition.release()

    def acquire_write_lock(self, wait = True):
        self.condition.acquire()
        try:
            # here, we are not a synchronous reader, and after returning,
            # assuming waiting or immediate availability, we will be.

            if wait:
                # if another sync is working, wait
                while self.current_sync_operation is not None:
                    self.condition.wait()
            else:
                # if another sync is working,
                # we dont want to wait, so forget it
                if self.current_sync_operation is not None:
                    return False

            # establish ourselves as the current sync 
            # this indicates to other read/write operations
            # that they should wait until this is None again
            self.current_sync_operation = threading.currentThread()

            # now wait again for asyncs to finish
            if self.async > 0:
                if wait:
                    # wait
                    self.condition.wait()
                else:
                    # we dont want to wait, so forget it
                    self.current_sync_operation = None
                    return False
        finally:
            self.condition.release()

        if not wait: 
            return True

    def release_write_lock(self):
        self.condition.acquire()
        try:
            if self.current_sync_operation is not threading.currentThread():
                raise Exception("Synchronizer error - current thread doesn't "
                                "have the write lock")

            # reset the current sync operation so 
            # another can get it
            self.current_sync_operation = None

            # tell everyone to get ready
            self.condition.notifyAll()
        finally:
            # everyone go !!
            self.condition.release()