summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2017-07-05 12:50:16 +0000
committerGerrit Code Review <review@openstack.org>2017-07-05 12:50:16 +0000
commit03dbf70f701c322d422ea65dacdc05d08e83d303 (patch)
tree39825e1516e813d249e86dbfeb6204ddb8b4a3a8
parentd88f0f4c84f2c120dcbde2df8c7066d334a1eda6 (diff)
parent7987f4455a7ff4e75d64c46a42d50017f53bfc9e (diff)
downloadtooz-03dbf70f701c322d422ea65dacdc05d08e83d303.tar.gz
Merge "Make sure Lock.heartbeat() returns True/False"
-rw-r--r--tooz/drivers/etcd.py12
-rw-r--r--tooz/drivers/etcd3.py15
-rw-r--r--tooz/drivers/etcd3gw.py23
-rw-r--r--tooz/drivers/memcached.py10
-rw-r--r--tooz/drivers/redis.py2
-rw-r--r--tooz/tests/test_coordination.py6
6 files changed, 49 insertions, 19 deletions
diff --git a/tooz/drivers/etcd.py b/tooz/drivers/etcd.py
index 1745d62..87c77fe 100644
--- a/tooz/drivers/etcd.py
+++ b/tooz/drivers/etcd.py
@@ -188,12 +188,14 @@ class EtcdLock(locking.Lock):
poked = self.client.put(self._lock_url,
data={"ttl": self.ttl,
"prevExist": "true"}, make_url=False)
- errorcode = poked.get("errorCode")
- if errorcode:
- LOG.warning("Unable to heartbeat by updating key '%s' with "
- "extended expiry of %s seconds: %d, %s", self.name,
- self.ttl, errorcode, poked.get("message"))
self._node = poked['node']
+ errorcode = poked.get("errorCode")
+ if not errorcode:
+ return True
+ LOG.warning("Unable to heartbeat by updating key '%s' with "
+ "extended expiry of %s seconds: %d, %s", self.name,
+ self.ttl, errorcode, poked.get("message"))
+ return False
class EtcdDriver(coordination.CoordinationDriver):
diff --git a/tooz/drivers/etcd3.py b/tooz/drivers/etcd3.py
index 8740b22..7c19673 100644
--- a/tooz/drivers/etcd3.py
+++ b/tooz/drivers/etcd3.py
@@ -13,6 +13,7 @@
# under the License.
from __future__ import absolute_import
+import threading
import etcd3
from etcd3 import exceptions as etcd3_exc
@@ -61,6 +62,7 @@ class Etcd3Lock(locking.Lock):
super(Etcd3Lock, self).__init__(name)
self._coord = coord
self._lock = coord.client.lock(name.decode(), timeout)
+ self._exclusive_access = threading.Lock()
@_translate_failures
def acquire(self, blocking=True, shared=False):
@@ -83,14 +85,19 @@ class Etcd3Lock(locking.Lock):
@_translate_failures
def release(self):
- if self.acquired and self._lock.release():
- self._coord._acquired_locks.discard(self)
- return True
+ with self._exclusive_access:
+ if self.acquired and self._lock.release():
+ self._coord._acquired_locks.discard(self)
+ return True
return False
@_translate_failures
def heartbeat(self):
- self._lock.refresh()
+ with self._exclusive_access:
+ if self.acquired:
+ self._lock.refresh()
+ return True
+ return False
class Etcd3Driver(coordination.CoordinationDriver):
diff --git a/tooz/drivers/etcd3gw.py b/tooz/drivers/etcd3gw.py
index 1370f1c..c3367ca 100644
--- a/tooz/drivers/etcd3gw.py
+++ b/tooz/drivers/etcd3gw.py
@@ -14,6 +14,7 @@
from __future__ import absolute_import
import base64
+import threading
import uuid
import etcd3gw
@@ -68,6 +69,7 @@ class Etcd3Lock(locking.Lock):
self._key_b64 = base64.b64encode(self._key).decode("ascii")
self._uuid = base64.b64encode(uuid.uuid4().bytes).decode("ascii")
self._lease = self._coord.client.lease(self._timeout)
+ self._exclusive_access = threading.Lock()
@_translate_failures
def acquire(self, blocking=True, shared=False):
@@ -126,11 +128,12 @@ class Etcd3Lock(locking.Lock):
}]
}
- result = self._coord.client.transaction(txn)
- success = result.get('succeeded', False)
- if success:
- self._coord._acquired_locks.remove(self)
- return True
+ with self._exclusive_access:
+ result = self._coord.client.transaction(txn)
+ success = result.get('succeeded', False)
+ if success:
+ self._coord._acquired_locks.remove(self)
+ return True
return False
@_translate_failures
@@ -140,9 +143,17 @@ class Etcd3Lock(locking.Lock):
return True
return False
+ @property
+ def acquired(self):
+ return self in self._coord._acquired_locks
+
@_translate_failures
def heartbeat(self):
- self._lease.refresh()
+ with self._exclusive_access:
+ if self.acquired:
+ self._lease.refresh()
+ return True
+ return False
class Etcd3Driver(coordination.CoordinationDriver):
diff --git a/tooz/drivers/memcached.py b/tooz/drivers/memcached.py
index f2dd108..99fbcd4 100644
--- a/tooz/drivers/memcached.py
+++ b/tooz/drivers/memcached.py
@@ -165,10 +165,12 @@ class MemcachedLock(locking.Lock):
poked = self.coord.client.touch(self.name,
expire=self.timeout,
noreply=False)
- if not poked:
- LOG.warning("Unable to heartbeat by updating key '%s' with "
- "extended expiry of %s seconds", self.name,
- self.timeout)
+ if poked:
+ return True
+ LOG.warning("Unable to heartbeat by updating key '%s' with "
+ "extended expiry of %s seconds", self.name,
+ self.timeout)
+ return False
@_translate_failures
def get_owner(self):
diff --git a/tooz/drivers/redis.py b/tooz/drivers/redis.py
index c2cf824..8d6be82 100644
--- a/tooz/drivers/redis.py
+++ b/tooz/drivers/redis.py
@@ -109,6 +109,8 @@ class RedisLock(locking.Lock):
if self.acquired:
with _translate_failures():
self._lock.extend(self._lock.timeout)
+ return True
+ return False
@property
def acquired(self):
diff --git a/tooz/tests/test_coordination.py b/tooz/tests/test_coordination.py
index 9535df9..d5db96d 100644
--- a/tooz/tests/test_coordination.py
+++ b/tooz/tests/test_coordination.py
@@ -712,6 +712,12 @@ class TestAPI(tests.TestWithCoordinator):
with lock:
pass
+ def test_heartbeat_lock_not_acquired(self):
+ lock = self._coord.get_lock(tests.get_random_uuid())
+ # Not all locks need heartbeat
+ if hasattr(lock, "heartbeat"):
+ self.assertFalse(lock.heartbeat())
+
def test_get_shared_lock(self):
lock = self._coord.get_lock(tests.get_random_uuid())
self.assertTrue(lock.acquire(shared=True))