diff options
Diffstat (limited to 'dogpile/core/readwrite_lock.py')
-rw-r--r-- | dogpile/core/readwrite_lock.py | 130 |
1 files changed, 130 insertions, 0 deletions
diff --git a/dogpile/core/readwrite_lock.py b/dogpile/core/readwrite_lock.py new file mode 100644 index 0000000..da83215 --- /dev/null +++ b/dogpile/core/readwrite_lock.py @@ -0,0 +1,130 @@ +from util import threading + +import logging +log = logging.getLogger(__name__) + +class LockError(Exception): + pass + +class ReadWriteMutex(object): + """A mutex which allows multiple readers, single writer. + + :class:`.ReadWriteMutex` uses a Python ``threading.Condition`` + to provide this functionality across threads within a process. + + The Beaker package also contained a file-lock based version + of this concept, so that readers/writers could be synchronized + across processes with a common filesystem. A future Dogpile + release may include this additional class at some point. + + """ + + def __init__(self): + # counts how many asynchronous methods are executing + self.async = 0 + + # pointer to thread that is the current sync operation + self.current_sync_operation = None + + # condition object to lock on + self.condition = threading.Condition(threading.Lock()) + + def acquire_read_lock(self, wait = True): + """Acquire the 'read' lock.""" + self.condition.acquire() + try: + # see if a synchronous operation is waiting to start + # or is already running, in which case we wait (or just + # give up and return) + if wait: + while self.current_sync_operation is not None: + self.condition.wait() + else: + if self.current_sync_operation is not None: + return False + + self.async += 1 + log.debug("%s acquired read lock", self) + finally: + self.condition.release() + + if not wait: + return True + + def release_read_lock(self): + """Release the 'read' lock.""" + self.condition.acquire() + try: + self.async -= 1 + + # check if we are the last asynchronous reader thread + # out the door. + if self.async == 0: + # yes. so if a sync operation is waiting, notifyAll to wake + # it up + if self.current_sync_operation is not None: + self.condition.notifyAll() + elif self.async < 0: + raise LockError("Synchronizer error - too many " + "release_read_locks called") + log.debug("%s released read lock", self) + finally: + self.condition.release() + + def acquire_write_lock(self, wait = True): + """Acquire the 'write' lock.""" + self.condition.acquire() + try: + # here, we are not a synchronous reader, and after returning, + # assuming waiting or immediate availability, we will be. + + if wait: + # if another sync is working, wait + while self.current_sync_operation is not None: + self.condition.wait() + else: + # if another sync is working, + # we dont want to wait, so forget it + if self.current_sync_operation is not None: + return False + + # establish ourselves as the current sync + # this indicates to other read/write operations + # that they should wait until this is None again + self.current_sync_operation = threading.currentThread() + + # now wait again for asyncs to finish + if self.async > 0: + if wait: + # wait + self.condition.wait() + else: + # we dont want to wait, so forget it + self.current_sync_operation = None + return False + log.debug("%s acquired write lock", self) + finally: + self.condition.release() + + if not wait: + return True + + def release_write_lock(self): + """Release the 'write' lock.""" + self.condition.acquire() + try: + if self.current_sync_operation is not threading.currentThread(): + raise LockError("Synchronizer error - current thread doesn't " + "have the write lock") + + # reset the current sync operation so + # another can get it + self.current_sync_operation = None + + # tell everyone to get ready + self.condition.notifyAll() + + log.debug("%s released write lock", self) + finally: + # everyone go !! + self.condition.release() |