summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorCharles-Henri de Boysson <ceache@users.noreply.github.com>2020-04-29 08:52:48 -0400
committerGitHub <noreply@github.com>2020-04-29 08:52:48 -0400
commitb880ae66b9f921394e2e756f2466b5e110e040bb (patch)
tree633d91caef9002e2e263dcc6cd261c33c21c25ac
parentb20c929421ff72421ad3770fb3436d583df5e5b0 (diff)
parent75f62a0dd40a9f9bdefcb04cea5ee5fcd3438c0e (diff)
downloadkazoo-b880ae66b9f921394e2e756f2466b5e110e040bb.tar.gz
Merge pull request #599 from pmazzini/lock
feat(core): interoperate with Go client
-rw-r--r--.flake82
-rw-r--r--.hound.yml1
-rw-r--r--kazoo/recipe/lock.py192
-rw-r--r--kazoo/tests/test_lock.py123
4 files changed, 209 insertions, 109 deletions
diff --git a/.flake8 b/.flake8
new file mode 100644
index 0000000..20d6db4
--- /dev/null
+++ b/.flake8
@@ -0,0 +1,2 @@
+[flake8]
+ignore = BLK100
diff --git a/.hound.yml b/.hound.yml
index 5de48c8..5f00754 100644
--- a/.hound.yml
+++ b/.hound.yml
@@ -1,3 +1,4 @@
fail_on_violations: true
python:
enabled: true
+ config_file: .flake8
diff --git a/kazoo/recipe/lock.py b/kazoo/recipe/lock.py
index 9bd5dc0..7722a97 100644
--- a/kazoo/recipe/lock.py
+++ b/kazoo/recipe/lock.py
@@ -14,7 +14,9 @@ changes and re-act appropriately. In the event that a
and/or the lease has been lost.
"""
+import re
import sys
+
try:
from time import monotonic as now
except ImportError:
@@ -27,13 +29,13 @@ from kazoo.exceptions import (
CancelledError,
KazooException,
LockTimeout,
- NoNodeError
+ NoNodeError,
)
from kazoo.protocol.states import KazooState
from kazoo.retry import (
ForceRetryError,
KazooRetry,
- RetryFailedError
+ RetryFailedError,
)
@@ -82,22 +84,38 @@ class Lock(object):
# sequence number. Involved in read/write locks.
_EXCLUDE_NAMES = ["__lock__"]
- def __init__(self, client, path, identifier=None):
+ def __init__(self, client, path, identifier=None, extra_lock_patterns=()):
"""Create a Kazoo lock.
:param client: A :class:`~kazoo.client.KazooClient` instance.
:param path: The lock path to use.
- :param identifier: Name to use for this lock contender. This
- can be useful for querying to see who the
- current lock contenders are.
-
+ :param identifier: Name to use for this lock contender. This can be
+ useful for querying to see who the current lock
+ contenders are.
+ :param extra_lock_patterns: Strings that will be used to
+ identify other znode in the path
+ that should be considered contenders
+ for this lock.
+ Use this for cross-implementation
+ compatibility.
+
+ .. versionadded:: 2.7.1
+ The extra_lock_patterns option.
"""
self.client = client
self.path = path
+ self._exclude_names = set(
+ self._EXCLUDE_NAMES + list(extra_lock_patterns)
+ )
+ self._contenders_re = re.compile(
+ r"(?:{patterns})(-?\d{{10}})$".format(
+ patterns="|".join(self._exclude_names)
+ )
+ )
# some data is written to the node. this can be queried via
# contenders() to see who is contending for the lock
- self.data = str(identifier or "").encode('utf-8')
+ self.data = str(identifier or "").encode("utf-8")
self.node = None
self.wake_event = client.handler.event_object()
@@ -113,8 +131,9 @@ class Lock(object):
self.is_acquired = False
self.assured_path = False
self.cancelled = False
- self._retry = KazooRetry(max_tries=None,
- sleep_func=client.handler.sleep_func)
+ self._retry = KazooRetry(
+ max_tries=None, sleep_func=client.handler.sleep_func
+ )
self._lock = client.handler.lock_object()
def _ensure_path(self):
@@ -171,6 +190,7 @@ class Lock(object):
return False
if not locked:
# Lock acquire doesn't take a timeout, so simulate it...
+ # XXX: This is not true in Py3 >= 3.2
try:
locked = retry(_acquire_lock)
except RetryFailedError:
@@ -179,9 +199,12 @@ class Lock(object):
try:
gotten = False
try:
- gotten = retry(self._inner_acquire,
- blocking=blocking, timeout=timeout,
- ephemeral=ephemeral)
+ gotten = retry(
+ self._inner_acquire,
+ blocking=blocking,
+ timeout=timeout,
+ ephemeral=ephemeral,
+ )
except RetryFailedError:
pass
except KazooException:
@@ -222,8 +245,9 @@ class Lock(object):
self.create_tried = True
if not node:
- node = self.client.create(self.create_path, self.data,
- ephemeral=ephemeral, sequence=True)
+ node = self.client.create(
+ self.create_path, self.data, ephemeral=ephemeral, sequence=True
+ )
# strip off path to node
node = node[len(self.path) + 1:]
@@ -236,18 +260,8 @@ class Lock(object):
if self.cancelled:
raise CancelledError()
- children = self._get_sorted_children()
-
- try:
- our_index = children.index(node)
- except ValueError: # pragma: nocover
- # somehow we aren't in the children -- probably we are
- # recovering from a session failure and our ephemeral
- # node was removed
- raise ForceRetryError()
-
- predecessor = self.predecessor(children, our_index)
- if not predecessor:
+ predecessor = self._get_predecessor(node)
+ if predecessor is None:
return True
if not blocking:
@@ -263,40 +277,51 @@ class Lock(object):
else:
self.wake_event.wait(timeout)
if not self.wake_event.isSet():
- raise LockTimeout("Failed to acquire lock on %s after "
- "%s seconds" % (self.path, timeout))
+ raise LockTimeout(
+ "Failed to acquire lock on %s after %s seconds"
+ % (self.path, timeout)
+ )
finally:
self.client.remove_listener(self._watch_session)
- def predecessor(self, children, index):
- for c in reversed(children[:index]):
- if any(n in c for n in self._EXCLUDE_NAMES):
- return c
- return None
-
def _watch_predecessor(self, event):
self.wake_event.set()
- def _get_sorted_children(self):
+ def _get_predecessor(self, node):
+ """returns `node`'s predecessor or None
+
+ Note: This handle the case where the current lock is not a contender
+ (e.g. rlock), this and also edge cases where the lock's ephemeral node
+ is gone.
+ """
children = self.client.get_children(self.path)
+ found_self = False
+ # Filter out the contenders using the computed regex
+ contender_matches = []
+ for child in children:
+ match = self._contenders_re.search(child)
+ if match is not None:
+ contender_matches.append(match)
+ if child == node:
+ # Remember the node's match object so we can short circuit
+ # below.
+ found_self = match
+
+ if found_self is False: # pragma: nocover
+ # somehow we aren't in the childrens -- probably we are
+ # recovering from a session failure and our ephemeral
+ # node was removed.
+ raise ForceRetryError()
+
+ predecessor = None
+ # Sort the contenders using the sequence number extracted by the regex,
+ # then extract the original string.
+ for match in sorted(contender_matches, key=lambda m: m.groups()):
+ if match is found_self:
+ break
+ predecessor = match.string
- # Node names are prefixed by a type: strip the prefix first, which may
- # be one of multiple values in case of a read-write lock, and return
- # only the sequence number (as a string since it is padded and will
- # sort correctly anyway).
- #
- # In some cases, the lock path may contain nodes with other prefixes
- # (eg. in case of a lease), just sort them last ('~' sorts after all
- # ASCII digits).
- def _seq(c):
- for name in ["__lock__", "__rlock__"]:
- idx = c.find(name)
- if idx != -1:
- return c[idx + len(name):]
- # Sort unknown node names eg. "lease_holder" last.
- return '~'
- children.sort(key=_seq)
- return children
+ return predecessor
def _find_node(self):
children = self.client.get_children(self.path)
@@ -347,16 +372,37 @@ class Lock(object):
if not self.assured_path:
self._ensure_path()
- children = self._get_sorted_children()
-
- contenders = []
+ children = self.client.get_children(self.path)
+ # We want all contenders, including self (this is especially important
+ # for r/w locks). This is similar to the logic of `_get_predecessor`
+ # except we include our own pattern.
+ all_contenders_re = re.compile(
+ r"(?:{patterns})(-?\d{{10}})$".format(
+ patterns="|".join(self._exclude_names | {self._NODE_NAME})
+ )
+ )
+ # Filter out the contenders using the computed regex
+ contender_matches = []
for child in children:
+ match = all_contenders_re.search(child)
+ if match is not None:
+ contender_matches.append(match)
+ # Sort the contenders using the sequence number extracted by the regex,
+ # then extract the original string.
+ contender_nodes = [
+ match.string
+ for match in sorted(contender_matches, key=lambda m: m.groups())
+ ]
+ # Retrieve all the contender nodes data (preserving order).
+ contenders = []
+ for node in contender_nodes:
try:
- data, stat = self.client.get(self.path + "/" + child)
+ data, stat = self.client.get(self.path + "/" + node)
if data is not None:
- contenders.append(data.decode('utf-8'))
+ contenders.append(data.decode("utf-8"))
except NoNodeError: # pragma: nocover
pass
+
return contenders
def __enter__(self):
@@ -391,6 +437,7 @@ class WriteLock(Lock):
shared lock.
"""
+
_NODE_NAME = "__lock__"
_EXCLUDE_NAMES = ["__lock__", "__rlock__"]
@@ -420,6 +467,7 @@ class ReadLock(Lock):
shared lock.
"""
+
_NODE_NAME = "__rlock__"
_EXCLUDE_NAMES = ["__lock__"]
@@ -458,6 +506,7 @@ class Semaphore(object):
The max_leases check.
"""
+
def __init__(self, client, path, identifier=None, max_leases=1):
"""Create a Kazoo Lock
@@ -483,12 +532,12 @@ class Semaphore(object):
# some data is written to the node. this can be queried via
# contenders() to see who is contending for the lock
- self.data = str(identifier or "").encode('utf-8')
+ self.data = str(identifier or "").encode("utf-8")
self.max_leases = max_leases
self.wake_event = client.handler.event_object()
self.create_path = self.path + "/" + uuid.uuid4().hex
- self.lock_path = path + '-' + '__lock__'
+ self.lock_path = path + "-" + "__lock__"
self.is_acquired = False
self.assured_path = False
self.cancelled = False
@@ -501,7 +550,7 @@ class Semaphore(object):
# node did already exist
data, _ = self.client.get(self.path)
try:
- leases = int(data.decode('utf-8'))
+ leases = int(data.decode("utf-8"))
except (ValueError, TypeError):
# ignore non-numeric data, maybe the node data is used
# for other purposes
@@ -509,11 +558,11 @@ class Semaphore(object):
else:
if leases != self.max_leases:
raise ValueError(
- "Inconsistent max leases: %s, expected: %s" %
- (leases, self.max_leases)
+ "Inconsistent max leases: %s, expected: %s"
+ % (leases, self.max_leases)
)
else:
- self.client.set(self.path, str(self.max_leases).encode('utf-8'))
+ self.client.set(self.path, str(self.max_leases).encode("utf-8"))
def cancel(self):
"""Cancel a pending semaphore acquire."""
@@ -548,7 +597,8 @@ class Semaphore(object):
try:
self.is_acquired = self.client.retry(
- self._inner_acquire, blocking=blocking, timeout=timeout)
+ self._inner_acquire, blocking=blocking, timeout=timeout
+ )
except KazooException:
# if we did ultimately fail, attempt to clean up
self._best_effort_cleanup()
@@ -590,8 +640,9 @@ class Semaphore(object):
self.wake_event.wait(w.leftover())
if not self.wake_event.isSet():
raise LockTimeout(
- "Failed to acquire semaphore on %s "
- "after %s seconds" % (self.path, timeout))
+ "Failed to acquire semaphore on %s"
+ " after %s seconds" % (self.path, timeout)
+ )
else:
return False
finally:
@@ -612,8 +663,9 @@ class Semaphore(object):
# Get a list of the current potential lock holders. If they change,
# notify our wake_event object. This is used to unblock a blocking
# self._inner_acquire call.
- children = self.client.get_children(self.path,
- self._watch_lease_change)
+ children = self.client.get_children(
+ self.path, self._watch_lease_change
+ )
# If there are leases available, acquire one
if len(children) < self.max_leases:
@@ -674,7 +726,7 @@ class Semaphore(object):
for child in children:
try:
data, stat = self.client.get(self.path + "/" + child)
- lease_holders.append(data.decode('utf-8'))
+ lease_holders.append(data.decode("utf-8"))
except NoNodeError: # pragma: nocover
pass
return lease_holders
diff --git a/kazoo/tests/test_lock.py b/kazoo/tests/test_lock.py
index ee0ff3d..0e16949 100644
--- a/kazoo/tests/test_lock.py
+++ b/kazoo/tests/test_lock.py
@@ -1,5 +1,7 @@
import collections
+import mock
import threading
+import unittest
import uuid
import pytest
@@ -7,6 +9,7 @@ import pytest
from kazoo.exceptions import CancelledError
from kazoo.exceptions import LockTimeout
from kazoo.exceptions import NoNodeError
+from kazoo.recipe.lock import Lock
from kazoo.testing import KazooTestCase
from kazoo.tests import util as test_util
@@ -97,8 +100,10 @@ class KazooLockTests(KazooTestCase):
lock_name = uuid.uuid4().hex
lock = self.client.Lock(self.lockpath, lock_name)
event = self.make_event()
- thread = self.make_thread(target=self._thread_lock_acquire_til_event,
- args=(lock_name, lock, event))
+ thread = self.make_thread(
+ target=self._thread_lock_acquire_til_event,
+ args=(lock_name, lock, event),
+ )
thread.start()
lock2_name = uuid.uuid4().hex
@@ -131,8 +136,9 @@ class KazooLockTests(KazooTestCase):
for name in names:
e = self.make_event()
l = self.client.Lock(self.lockpath, name)
- t = self.make_thread(target=self._thread_lock_acquire_til_event,
- args=(name, l, e))
+ t = self.make_thread(
+ target=self._thread_lock_acquire_til_event, args=(name, l, e)
+ )
contender_bits[name] = (t, e)
threads.append(t)
@@ -176,9 +182,11 @@ class KazooLockTests(KazooTestCase):
def test_lock_reconnect(self):
event = self.make_event()
- other_lock = self.client.Lock(self.lockpath, 'contender')
- thread = self.make_thread(target=self._thread_lock_acquire_til_event,
- args=('contender', other_lock, event))
+ other_lock = self.client.Lock(self.lockpath, "contender")
+ thread = self.make_thread(
+ target=self._thread_lock_acquire_til_event,
+ args=("contender", other_lock, event),
+ )
# acquire the lock ourselves first to make the contender line up
lock = self.client.Lock(self.lockpath, "test")
@@ -188,7 +196,7 @@ class KazooLockTests(KazooTestCase):
# wait for the contender to line up on the lock
wait = self.make_wait()
wait(lambda: len(lock.contenders()) == 2)
- assert lock.contenders() == ['test', 'contender']
+ assert lock.contenders() == ["test", "contender"]
self.expire_session(self.make_event)
@@ -197,7 +205,7 @@ class KazooLockTests(KazooTestCase):
with self.condition:
while not self.active_thread:
self.condition.wait()
- assert self.active_thread == 'contender'
+ assert self.active_thread == "contender"
event.set()
thread.join()
@@ -207,8 +215,10 @@ class KazooLockTests(KazooTestCase):
lock = self.client.Lock(self.lockpath, lock_name)
event = self.make_event()
- thread = self.make_thread(target=self._thread_lock_acquire_til_event,
- args=(lock_name, lock, event))
+ thread = self.make_thread(
+ target=self._thread_lock_acquire_til_event,
+ args=(lock_name, lock, event),
+ )
thread.start()
lock1 = self.client.Lock(self.lockpath, lock_name)
@@ -227,8 +237,10 @@ class KazooLockTests(KazooTestCase):
def test_lock_fail_first_call(self):
event1 = self.make_event()
lock1 = self.client.Lock(self.lockpath, "one")
- thread1 = self.make_thread(target=self._thread_lock_acquire_til_event,
- args=("one", lock1, event1))
+ thread1 = self.make_thread(
+ target=self._thread_lock_acquire_til_event,
+ args=("one", lock1, event1),
+ )
thread1.start()
# wait for this thread to acquire the lock
@@ -243,8 +255,10 @@ class KazooLockTests(KazooTestCase):
def test_lock_cancel(self):
event1 = self.make_event()
lock1 = self.client.Lock(self.lockpath, "one")
- thread1 = self.make_thread(target=self._thread_lock_acquire_til_event,
- args=("one", lock1, event1))
+ thread1 = self.make_thread(
+ target=self._thread_lock_acquire_til_event,
+ args=("one", lock1, event1),
+ )
thread1.start()
# wait for this thread to acquire the lock
@@ -257,8 +271,10 @@ class KazooLockTests(KazooTestCase):
client2.start()
event2 = self.make_event()
lock2 = client2.Lock(self.lockpath, "two")
- thread2 = self.make_thread(target=self._thread_lock_acquire_til_event,
- args=("two", lock2, event2))
+ thread2 = self.make_thread(
+ target=self._thread_lock_acquire_til_event,
+ args=("two", lock2, event2),
+ )
thread2.start()
# this one should block in acquire. check that it is a contender
@@ -361,7 +377,7 @@ class KazooLockTests(KazooTestCase):
client1.start()
lock = client1.Lock(self.lockpath, "ephemeral")
lock.acquire(ephemeral=False)
- znode = self.lockpath + '/' + lock.node
+ znode = self.lockpath + "/" + lock.node
client1.stop()
try:
self.client.get(znode)
@@ -418,7 +434,6 @@ class KazooLockTests(KazooTestCase):
# and that it's still not reentrant.
gotten = lock.acquire(blocking=False)
assert gotten is False
-
# Test that a second client we can share the same read lock
client2 = self._get_client()
client2.start()
@@ -428,7 +443,6 @@ class KazooLockTests(KazooTestCase):
assert lock2.is_acquired is True
gotten = lock2.acquire(blocking=False)
assert gotten is False
-
# Test that a writer is unable to share it
client3 = self._get_client()
client3.start()
@@ -518,28 +532,33 @@ class TestSemaphore(KazooTestCase):
def test_non_blocking(self):
sem1 = self.client.Semaphore(
- self.lockpath, identifier='sem1', max_leases=2)
+ self.lockpath, identifier="sem1", max_leases=2
+ )
sem2 = self.client.Semaphore(
- self.lockpath, identifier='sem2', max_leases=2)
+ self.lockpath, identifier="sem2", max_leases=2
+ )
sem3 = self.client.Semaphore(
- self.lockpath, identifier='sem3', max_leases=2)
+ self.lockpath, identifier="sem3", max_leases=2
+ )
sem1.acquire()
sem2.acquire()
assert not sem3.acquire(blocking=False)
- assert set(sem1.lease_holders()) == set(['sem1', 'sem2'])
+ assert set(sem1.lease_holders()) == set(["sem1", "sem2"])
sem2.release()
# the next line isn't required, but avoids timing issues in tests
sem3.acquire()
- assert set(sem1.lease_holders()) == set(['sem1', 'sem3'])
+ assert set(sem1.lease_holders()) == set(["sem1", "sem3"])
sem1.release()
sem3.release()
def test_non_blocking_release(self):
sem1 = self.client.Semaphore(
- self.lockpath, identifier='sem1', max_leases=1)
+ self.lockpath, identifier="sem1", max_leases=1
+ )
sem2 = self.client.Semaphore(
- self.lockpath, identifier='sem2', max_leases=1)
+ self.lockpath, identifier="sem2", max_leases=1
+ )
sem1.acquire()
sem2.acquire(blocking=False)
@@ -552,7 +571,7 @@ class TestSemaphore(KazooTestCase):
event = self.make_event()
def sema_one():
- with self.client.Semaphore(self.lockpath, 'fred', max_leases=1):
+ with self.client.Semaphore(self.lockpath, "fred", max_leases=1):
started.set()
event.wait()
@@ -561,13 +580,13 @@ class TestSemaphore(KazooTestCase):
started.wait()
sem1 = self.client.Semaphore(self.lockpath)
holders = sem1.lease_holders()
- assert holders == ['fred']
+ assert holders == ["fred"]
event.set()
thread.join()
def test_semaphore_cancel(self):
- sem1 = self.client.Semaphore(self.lockpath, 'fred', max_leases=1)
- sem2 = self.client.Semaphore(self.lockpath, 'george', max_leases=1)
+ sem1 = self.client.Semaphore(self.lockpath, "fred", max_leases=1)
+ sem2 = self.client.Semaphore(self.lockpath, "george", max_leases=1)
sem1.acquire()
started = self.make_event()
event = self.make_event()
@@ -583,7 +602,7 @@ class TestSemaphore(KazooTestCase):
thread = self.make_thread(target=sema_one, args=())
thread.start()
started.wait()
- assert sem1.lease_holders() == ['fred']
+ assert sem1.lease_holders() == ["fred"]
assert not event.is_set()
sem2.cancel()
event.wait()
@@ -591,7 +610,7 @@ class TestSemaphore(KazooTestCase):
thread.join()
def test_multiple_acquire_and_release(self):
- sem1 = self.client.Semaphore(self.lockpath, 'fred', max_leases=1)
+ sem1 = self.client.Semaphore(self.lockpath, "fred", max_leases=1)
sem1.acquire()
sem1.acquire()
@@ -599,12 +618,13 @@ class TestSemaphore(KazooTestCase):
assert not sem1.release()
def test_handle_session_loss(self):
- expire_semaphore = self.client.Semaphore(self.lockpath, 'fred',
- max_leases=1)
+ expire_semaphore = self.client.Semaphore(
+ self.lockpath, "fred", max_leases=1
+ )
client = self._get_client()
client.start()
- lh_semaphore = client.Semaphore(self.lockpath, 'george', max_leases=1)
+ lh_semaphore = client.Semaphore(self.lockpath, "george", max_leases=1)
lh_semaphore.acquire()
started = self.make_event()
@@ -621,7 +641,7 @@ class TestSemaphore(KazooTestCase):
thread1.start()
started.wait()
- assert lh_semaphore.lease_holders() == ['george']
+ assert lh_semaphore.lease_holders() == ["george"]
# Fired in a separate thread to make sure we can see the effect
expired = self.make_event()
@@ -639,7 +659,7 @@ class TestSemaphore(KazooTestCase):
client.stop()
event.wait(15)
- assert expire_semaphore.lease_holders() == ['fred']
+ assert expire_semaphore.lease_holders() == ["fred"]
event2.set()
for t in (thread1, thread2):
@@ -658,7 +678,7 @@ class TestSemaphore(KazooTestCase):
sem2 = self.client.Semaphore(self.lockpath, max_leases=2)
self.client.ensure_path(self.lockpath)
- self.client.set(self.lockpath, b'a$')
+ self.client.set(self.lockpath, b"a$")
sem1.acquire()
# sem2 thinks it's ok to have two lease holders
@@ -716,3 +736,28 @@ class TestSemaphore(KazooTestCase):
# Cleanup
t.join()
client2.stop()
+
+
+class TestSequence(unittest.TestCase):
+ def test_get_predecessor(self):
+ """Validate selection of predecessors.
+ """
+ goLock = "_c_8eb60557ba51e0da67eefc47467d3f34-lock-0000000031"
+ pyLock = "514e5a831836450cb1a56c741e990fd8__lock__0000000032"
+ children = ["hello", goLock, "world", pyLock]
+ client = mock.MagicMock()
+ client.get_children.return_value = children
+ lock = Lock(client, "test")
+ assert lock._get_predecessor(pyLock) is None
+
+ def test_get_predecessor_go(self):
+ """Test selection of predecessor when instructed to consider go-zk
+ locks.
+ """
+ goLock = "_c_8eb60557ba51e0da67eefc47467d3f34-lock-0000000031"
+ pyLock = "514e5a831836450cb1a56c741e990fd8__lock__0000000032"
+ children = ["hello", goLock, "world", pyLock]
+ client = mock.MagicMock()
+ client.get_children.return_value = children
+ lock = Lock(client, "test", extra_lock_patterns=["-lock-"])
+ assert lock._get_predecessor(pyLock) == goLock