diff options
author | Julien Danjou <julien@danjou.info> | 2014-10-03 20:53:10 +0200 |
---|---|---|
committer | Julien Danjou <julien@danjou.info> | 2014-10-03 21:53:36 +0200 |
commit | d00413cdfa02749e8e5d7ee83ac957ecc74cb1be (patch) | |
tree | f99adebc2880eb910f6be7b00cafa4b921ac9875 | |
parent | ebe41afc52f4b77f1590f378596cb2c6f90c5efa (diff) | |
download | tooz-d00413cdfa02749e8e5d7ee83ac957ecc74cb1be.tar.gz |
Make lock blocking with no time out by default0.6
The lock_timeout option that has been copied from memcached actually
does not mean the same thing at all. In memcached, it's used to expire
the lock when a client dies, whereas in IPC and ZK it's meant to be a
timeout on acquire().
This means that by default acquire(blocking=True) will have different
behaviours between the different drivers.
We now avoid that by removing lock_timeout from ZK and IPC, and allowing
the blocking argument from acquire() to be a numeric value.
Change-Id: Idb68e2ff945403f7ee24d86ea480531122b19ff8
-rw-r--r-- | tooz/drivers/ipc.py | 27 | ||||
-rw-r--r-- | tooz/drivers/memcached.py | 35 | ||||
-rw-r--r-- | tooz/drivers/zookeeper.py | 16 | ||||
-rw-r--r-- | tooz/locking.py | 5 |
4 files changed, 44 insertions, 39 deletions
diff --git a/tooz/drivers/ipc.py b/tooz/drivers/ipc.py index 2c5727a..c89bdf8 100644 --- a/tooz/drivers/ipc.py +++ b/tooz/drivers/ipc.py @@ -42,10 +42,9 @@ class IPCLock(locking.Lock): """ _LOCK_PROJECT = b'__TOOZ_LOCK_' - def __init__(self, name, timeout): + def __init__(self, name): super(IPCLock, self).__init__(name) self.key = self.ftok(name, self._LOCK_PROJECT) - self.timeout = timeout self._lock = None @staticmethod @@ -62,11 +61,17 @@ class IPCLock(locking.Lock): return (int(h.hexdigest(), 16) % _KEY_RANGE) + sysv_ipc.KEY_MIN def acquire(self, blocking=True): - timeout = self.timeout if blocking else 0 - if ((blocking is False or timeout is not None) + if (blocking is not True and sysv_ipc.SEMAPHORE_TIMEOUT_SUPPORTED is False): raise tooz.NotImplemented( "This system does not support semaphore timeout") + # Convert blocking argument to a valid timeout value + if blocking is True: + timeout = None + elif blocking is False: + timeout = 0 + else: + timeout = blocking while True: try: self._lock = sysv_ipc.Semaphore(self.key, @@ -104,18 +109,12 @@ class IPCLock(locking.Lock): class IPCDriver(coordination.CoordinationDriver): def __init__(self, member_id, parsed_url, options): - """Initialize the IPC driver. - - :param lock_timeout: how many seconds to wait when trying to acquire - a lock in blocking mode. None means forever, 0 - means don't wait, any other value means wait - this amount of seconds. - """ + """Initialize the IPC driver.""" super(IPCDriver, self).__init__() - self.lock_timeout = int(options.get('lock_timeout', ['30'])[-1]) - def get_lock(self, name): - return IPCLock(name, self.lock_timeout) + @staticmethod + def get_lock(name): + return IPCLock(name) @staticmethod def watch_join_group(group_id, callback): diff --git a/tooz/drivers/memcached.py b/tooz/drivers/memcached.py index d3bdaba..e0016ad 100644 --- a/tooz/drivers/memcached.py +++ b/tooz/drivers/memcached.py @@ -41,10 +41,15 @@ def retry_if_retry_raised(exception): return isinstance(exception, Retry) +_RETRYING_KWARGS = dict( + retry_on_exception=retry_if_retry_raised, + wait='exponential_sleep', + wait_exponential_max=1, +) + + def retry(f): - return retrying.retry( - retry_on_exception=retry_if_retry_raised, - wait='exponential_sleep', wait_exponential_max=1)(f) + return retrying.retry(**_RETRYING_KWARGS)(f) class MemcachedLock(locking.Lock): @@ -57,16 +62,20 @@ class MemcachedLock(locking.Lock): @retry def acquire(self, blocking=True): - if self.coord.client.add( - self.name, - self.coord._member_id, - expire=self.timeout, - noreply=False): - self.coord._acquired_locks.append(self) - return True - if not blocking: - return False - raise Retry + def _acquire(): + if self.coord.client.add( + self.name, + self.coord._member_id, + expire=self.timeout, + noreply=False): + self.coord._acquired_locks.append(self) + return True + if blocking is False: + return False + raise Retry + kwargs = _RETRYING_KWARGS.copy() + kwargs['stop_max_delay'] = blocking + return retrying.Retrying(**kwargs).call(_acquire) def release(self): if self.coord.client.delete(self.name, noreply=False): diff --git a/tooz/drivers/zookeeper.py b/tooz/drivers/zookeeper.py index 504ae04..c4fa4e8 100644 --- a/tooz/drivers/zookeeper.py +++ b/tooz/drivers/zookeeper.py @@ -28,15 +28,13 @@ from tooz import utils class ZooKeeperLock(locking.Lock): - def __init__(self, name, lock, timeout): + def __init__(self, name, lock): super(ZooKeeperLock, self).__init__(name) self._lock = lock - self.timeout = timeout def acquire(self, blocking=True): - timeout = self.timeout if blocking else None - return self._lock.acquire(blocking=blocking, - timeout=timeout) + return self._lock.acquire(blocking=bool(blocking), + timeout=blocking) def release(self): return self._lock.release() @@ -47,10 +45,6 @@ class BaseZooKeeperDriver(coordination.CoordinationDriver): :param timeout: connection timeout to wait when first connecting to the zookeeper server - :param lock_timeout: how many seconds to wait when trying to acquire - a lock in blocking mode. None means forever, 0 - means don't wait, any other value means wait - this amount of seconds. """ _TOOZ_NAMESPACE = b"tooz" @@ -59,7 +53,6 @@ class BaseZooKeeperDriver(coordination.CoordinationDriver): super(BaseZooKeeperDriver, self).__init__() self._member_id = member_id self.timeout = int(options.get('timeout', ['10'])[-1]) - self.lock_timeout = int(options.get('lock_timeout', ['30'])[-1]) def start(self): try: @@ -347,8 +340,7 @@ class KazooDriver(BaseZooKeeperDriver): name, self._coord.Lock( self.paths_join(b"/", self._TOOZ_NAMESPACE, b"locks", name), - self._member_id.decode('ascii')), - self.lock_timeout) + self._member_id.decode('ascii'))) def run_watchers(self): ret = [] diff --git a/tooz/locking.py b/tooz/locking.py index 27b193a..10cf16e 100644 --- a/tooz/locking.py +++ b/tooz/locking.py @@ -47,6 +47,11 @@ class Lock(object): def acquire(self, blocking=True): """Attempts to acquire the lock. + :param blocking: If True, blocks until the lock is acquired. If False, + returns right away. Otherwise, the value is used as a + timeout value and the call returns maximum after this + number of seonds. :returns: returns true if acquired (false if not) :rtype: bool + """ |