summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJulien Danjou <julien@danjou.info>2014-10-03 20:53:10 +0200
committerJulien Danjou <julien@danjou.info>2014-10-03 21:53:36 +0200
commitd00413cdfa02749e8e5d7ee83ac957ecc74cb1be (patch)
treef99adebc2880eb910f6be7b00cafa4b921ac9875
parentebe41afc52f4b77f1590f378596cb2c6f90c5efa (diff)
downloadtooz-d00413cdfa02749e8e5d7ee83ac957ecc74cb1be.tar.gz
Make lock blocking with no time out by default0.6
The lock_timeout option that has been copied from memcached actually does not mean the same thing at all. In memcached, it's used to expire the lock when a client dies, whereas in IPC and ZK it's meant to be a timeout on acquire(). This means that by default acquire(blocking=True) will have different behaviours between the different drivers. We now avoid that by removing lock_timeout from ZK and IPC, and allowing the blocking argument from acquire() to be a numeric value. Change-Id: Idb68e2ff945403f7ee24d86ea480531122b19ff8
-rw-r--r--tooz/drivers/ipc.py27
-rw-r--r--tooz/drivers/memcached.py35
-rw-r--r--tooz/drivers/zookeeper.py16
-rw-r--r--tooz/locking.py5
4 files changed, 44 insertions, 39 deletions
diff --git a/tooz/drivers/ipc.py b/tooz/drivers/ipc.py
index 2c5727a..c89bdf8 100644
--- a/tooz/drivers/ipc.py
+++ b/tooz/drivers/ipc.py
@@ -42,10 +42,9 @@ class IPCLock(locking.Lock):
"""
_LOCK_PROJECT = b'__TOOZ_LOCK_'
- def __init__(self, name, timeout):
+ def __init__(self, name):
super(IPCLock, self).__init__(name)
self.key = self.ftok(name, self._LOCK_PROJECT)
- self.timeout = timeout
self._lock = None
@staticmethod
@@ -62,11 +61,17 @@ class IPCLock(locking.Lock):
return (int(h.hexdigest(), 16) % _KEY_RANGE) + sysv_ipc.KEY_MIN
def acquire(self, blocking=True):
- timeout = self.timeout if blocking else 0
- if ((blocking is False or timeout is not None)
+ if (blocking is not True
and sysv_ipc.SEMAPHORE_TIMEOUT_SUPPORTED is False):
raise tooz.NotImplemented(
"This system does not support semaphore timeout")
+ # Convert blocking argument to a valid timeout value
+ if blocking is True:
+ timeout = None
+ elif blocking is False:
+ timeout = 0
+ else:
+ timeout = blocking
while True:
try:
self._lock = sysv_ipc.Semaphore(self.key,
@@ -104,18 +109,12 @@ class IPCLock(locking.Lock):
class IPCDriver(coordination.CoordinationDriver):
def __init__(self, member_id, parsed_url, options):
- """Initialize the IPC driver.
-
- :param lock_timeout: how many seconds to wait when trying to acquire
- a lock in blocking mode. None means forever, 0
- means don't wait, any other value means wait
- this amount of seconds.
- """
+ """Initialize the IPC driver."""
super(IPCDriver, self).__init__()
- self.lock_timeout = int(options.get('lock_timeout', ['30'])[-1])
- def get_lock(self, name):
- return IPCLock(name, self.lock_timeout)
+ @staticmethod
+ def get_lock(name):
+ return IPCLock(name)
@staticmethod
def watch_join_group(group_id, callback):
diff --git a/tooz/drivers/memcached.py b/tooz/drivers/memcached.py
index d3bdaba..e0016ad 100644
--- a/tooz/drivers/memcached.py
+++ b/tooz/drivers/memcached.py
@@ -41,10 +41,15 @@ def retry_if_retry_raised(exception):
return isinstance(exception, Retry)
+_RETRYING_KWARGS = dict(
+ retry_on_exception=retry_if_retry_raised,
+ wait='exponential_sleep',
+ wait_exponential_max=1,
+)
+
+
def retry(f):
- return retrying.retry(
- retry_on_exception=retry_if_retry_raised,
- wait='exponential_sleep', wait_exponential_max=1)(f)
+ return retrying.retry(**_RETRYING_KWARGS)(f)
class MemcachedLock(locking.Lock):
@@ -57,16 +62,20 @@ class MemcachedLock(locking.Lock):
@retry
def acquire(self, blocking=True):
- if self.coord.client.add(
- self.name,
- self.coord._member_id,
- expire=self.timeout,
- noreply=False):
- self.coord._acquired_locks.append(self)
- return True
- if not blocking:
- return False
- raise Retry
+ def _acquire():
+ if self.coord.client.add(
+ self.name,
+ self.coord._member_id,
+ expire=self.timeout,
+ noreply=False):
+ self.coord._acquired_locks.append(self)
+ return True
+ if blocking is False:
+ return False
+ raise Retry
+ kwargs = _RETRYING_KWARGS.copy()
+ kwargs['stop_max_delay'] = blocking
+ return retrying.Retrying(**kwargs).call(_acquire)
def release(self):
if self.coord.client.delete(self.name, noreply=False):
diff --git a/tooz/drivers/zookeeper.py b/tooz/drivers/zookeeper.py
index 504ae04..c4fa4e8 100644
--- a/tooz/drivers/zookeeper.py
+++ b/tooz/drivers/zookeeper.py
@@ -28,15 +28,13 @@ from tooz import utils
class ZooKeeperLock(locking.Lock):
- def __init__(self, name, lock, timeout):
+ def __init__(self, name, lock):
super(ZooKeeperLock, self).__init__(name)
self._lock = lock
- self.timeout = timeout
def acquire(self, blocking=True):
- timeout = self.timeout if blocking else None
- return self._lock.acquire(blocking=blocking,
- timeout=timeout)
+ return self._lock.acquire(blocking=bool(blocking),
+ timeout=blocking)
def release(self):
return self._lock.release()
@@ -47,10 +45,6 @@ class BaseZooKeeperDriver(coordination.CoordinationDriver):
:param timeout: connection timeout to wait when first connecting to the
zookeeper server
- :param lock_timeout: how many seconds to wait when trying to acquire
- a lock in blocking mode. None means forever, 0
- means don't wait, any other value means wait
- this amount of seconds.
"""
_TOOZ_NAMESPACE = b"tooz"
@@ -59,7 +53,6 @@ class BaseZooKeeperDriver(coordination.CoordinationDriver):
super(BaseZooKeeperDriver, self).__init__()
self._member_id = member_id
self.timeout = int(options.get('timeout', ['10'])[-1])
- self.lock_timeout = int(options.get('lock_timeout', ['30'])[-1])
def start(self):
try:
@@ -347,8 +340,7 @@ class KazooDriver(BaseZooKeeperDriver):
name,
self._coord.Lock(
self.paths_join(b"/", self._TOOZ_NAMESPACE, b"locks", name),
- self._member_id.decode('ascii')),
- self.lock_timeout)
+ self._member_id.decode('ascii')))
def run_watchers(self):
ret = []
diff --git a/tooz/locking.py b/tooz/locking.py
index 27b193a..10cf16e 100644
--- a/tooz/locking.py
+++ b/tooz/locking.py
@@ -47,6 +47,11 @@ class Lock(object):
def acquire(self, blocking=True):
"""Attempts to acquire the lock.
+ :param blocking: If True, blocks until the lock is acquired. If False,
+ returns right away. Otherwise, the value is used as a
+ timeout value and the call returns maximum after this
+ number of seonds.
:returns: returns true if acquired (false if not)
:rtype: bool
+
"""