summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorCharles-Henri de Boysson <ceache@users.noreply.github.com>2020-04-15 20:58:36 -0400
committerCharles-Henri de Boysson <ceache@users.noreply.github.com>2020-04-24 22:37:57 -0400
commit225eeecbe66c10d46dc7928681783d17f389f13a (patch)
treece820eb18aa1eaf62d69480be18d0abb3962d9a8
parenta4efaac6cf4269a5e322a45c5d650ac2227952d4 (diff)
downloadkazoo-225eeecbe66c10d46dc7928681783d17f389f13a.tar.gz
feat(core): Support additionaal lock contenter patterns
Allows configurable multi-implementations cooperations in locks (e.g. Zookeeper python & go clients contending for the same lock).
-rw-r--r--kazoo/recipe/lock.py84
-rw-r--r--kazoo/tests/test_lock.py108
2 files changed, 125 insertions, 67 deletions
diff --git a/kazoo/recipe/lock.py b/kazoo/recipe/lock.py
index 9470299..982c12e 100644
--- a/kazoo/recipe/lock.py
+++ b/kazoo/recipe/lock.py
@@ -15,6 +15,7 @@ and/or the lease has been lost.
"""
import sys
+
try:
from time import monotonic as now
except ImportError:
@@ -27,13 +28,13 @@ from kazoo.exceptions import (
CancelledError,
KazooException,
LockTimeout,
- NoNodeError
+ NoNodeError,
)
from kazoo.protocol.states import KazooState
from kazoo.retry import (
ForceRetryError,
KazooRetry,
- RetryFailedError
+ RetryFailedError,
)
@@ -80,20 +81,33 @@ class Lock(object):
# Node names which exclude this contender when present at a lower
# sequence number. Involved in read/write locks.
- _EXCLUDE_NAMES = ["__lock__", "-lock-"]
+ _EXCLUDE_NAMES = ["__lock__"]
- def __init__(self, client, path, identifier=None):
+ def __init__(
+ self, client, path, identifier=None, additional_lock_patterns=()
+ ):
"""Create a Kazoo lock.
:param client: A :class:`~kazoo.client.KazooClient` instance.
:param path: The lock path to use.
- :param identifier: Name to use for this lock contender. This
- can be useful for querying to see who the
- current lock contenders are.
-
+ :param identifier: Name to use for this lock contender. This can be
+ useful for querying to see who the current lock
+ contenders are.
+ :param additional_lock_patterns: Strings that will be used to
+ identify other znode in the path
+ that should be considered contenders
+ for this lock.
+ Use this for cross-implementation
+ compatibility.
+
+ .. versionadded:: 2.7.1
+ The additional_lock_patterns option.
"""
self.client = client
self.path = path
+ self._exclude_names = set(
+ self._EXCLUDE_NAMES + list(additional_lock_patterns)
+ )
# some data is written to the node. this can be queried via
# contenders() to see who is contending for the lock
@@ -113,8 +127,9 @@ class Lock(object):
self.is_acquired = False
self.assured_path = False
self.cancelled = False
- self._retry = KazooRetry(max_tries=None,
- sleep_func=client.handler.sleep_func)
+ self._retry = KazooRetry(
+ max_tries=None, sleep_func=client.handler.sleep_func
+ )
self._lock = client.handler.lock_object()
def _ensure_path(self):
@@ -179,9 +194,12 @@ class Lock(object):
try:
gotten = False
try:
- gotten = retry(self._inner_acquire,
- blocking=blocking, timeout=timeout,
- ephemeral=ephemeral)
+ gotten = retry(
+ self._inner_acquire,
+ blocking=blocking,
+ timeout=timeout,
+ ephemeral=ephemeral,
+ )
except RetryFailedError:
pass
except KazooException:
@@ -222,8 +240,9 @@ class Lock(object):
self.create_tried = True
if not node:
- node = self.client.create(self.create_path, self.data,
- ephemeral=ephemeral, sequence=True)
+ node = self.client.create(
+ self.create_path, self.data, ephemeral=ephemeral, sequence=True
+ )
# strip off path to node
node = node[len(self.path) + 1:]
@@ -263,14 +282,16 @@ class Lock(object):
else:
self.wake_event.wait(timeout)
if not self.wake_event.isSet():
- raise LockTimeout("Failed to acquire lock on %s after "
- "%s seconds" % (self.path, timeout))
+ raise LockTimeout(
+ "Failed to acquire lock on %s after %s seconds"
+ % (self.path, timeout)
+ )
finally:
self.client.remove_listener(self._watch_session)
def predecessor(self, children, index):
for c in reversed(children[:index]):
- if any(n in c for n in self._EXCLUDE_NAMES):
+ if any(n in c for n in self._exclude_names):
return c
return None
@@ -289,12 +310,13 @@ class Lock(object):
# (eg. in case of a lease), just sort them last ('~' sorts after all
# ASCII digits).
def _seq(c):
- for name in ["__lock__", "-lock-", "__rlock__"]:
+ for name in self._exclude_names:
idx = c.find(name)
if idx != -1:
return c[idx + len(name):]
# Sort unknown node names eg. "lease_holder" last.
return '~'
+
children.sort(key=_seq)
return children
@@ -391,8 +413,9 @@ class WriteLock(Lock):
shared lock.
"""
+
_NODE_NAME = "__lock__"
- _EXCLUDE_NAMES = ["__lock__", "-lock-", "__rlock__"]
+ _EXCLUDE_NAMES = ["__lock__", "__rlock__"]
class ReadLock(Lock):
@@ -420,8 +443,9 @@ class ReadLock(Lock):
shared lock.
"""
+
_NODE_NAME = "__rlock__"
- _EXCLUDE_NAMES = ["__lock__", "-lock-"]
+ _EXCLUDE_NAMES = ["__lock__"]
class Semaphore(object):
@@ -458,6 +482,7 @@ class Semaphore(object):
The max_leases check.
"""
+
def __init__(self, client, path, identifier=None, max_leases=1):
"""Create a Kazoo Lock
@@ -509,8 +534,8 @@ class Semaphore(object):
else:
if leases != self.max_leases:
raise ValueError(
- "Inconsistent max leases: %s, expected: %s" %
- (leases, self.max_leases)
+ "Inconsistent max leases: %s, expected: %s"
+ % (leases, self.max_leases)
)
else:
self.client.set(self.path, str(self.max_leases).encode('utf-8'))
@@ -548,7 +573,8 @@ class Semaphore(object):
try:
self.is_acquired = self.client.retry(
- self._inner_acquire, blocking=blocking, timeout=timeout)
+ self._inner_acquire, blocking=blocking, timeout=timeout
+ )
except KazooException:
# if we did ultimately fail, attempt to clean up
self._best_effort_cleanup()
@@ -590,8 +616,9 @@ class Semaphore(object):
self.wake_event.wait(w.leftover())
if not self.wake_event.isSet():
raise LockTimeout(
- "Failed to acquire semaphore on %s "
- "after %s seconds" % (self.path, timeout))
+ "Failed to acquire semaphore on %s"
+ " after %s seconds" % (self.path, timeout)
+ )
else:
return False
finally:
@@ -612,8 +639,9 @@ class Semaphore(object):
# Get a list of the current potential lock holders. If they change,
# notify our wake_event object. This is used to unblock a blocking
# self._inner_acquire call.
- children = self.client.get_children(self.path,
- self._watch_lease_change)
+ children = self.client.get_children(
+ self.path, self._watch_lease_change
+ )
# If there are leases available, acquire one
if len(children) < self.max_leases:
diff --git a/kazoo/tests/test_lock.py b/kazoo/tests/test_lock.py
index 1b6849e..33691e4 100644
--- a/kazoo/tests/test_lock.py
+++ b/kazoo/tests/test_lock.py
@@ -100,8 +100,10 @@ class KazooLockTests(KazooTestCase):
lock_name = uuid.uuid4().hex
lock = self.client.Lock(self.lockpath, lock_name)
event = self.make_event()
- thread = self.make_thread(target=self._thread_lock_acquire_til_event,
- args=(lock_name, lock, event))
+ thread = self.make_thread(
+ target=self._thread_lock_acquire_til_event,
+ args=(lock_name, lock, event),
+ )
thread.start()
lock2_name = uuid.uuid4().hex
@@ -134,8 +136,9 @@ class KazooLockTests(KazooTestCase):
for name in names:
e = self.make_event()
l = self.client.Lock(self.lockpath, name)
- t = self.make_thread(target=self._thread_lock_acquire_til_event,
- args=(name, l, e))
+ t = self.make_thread(
+ target=self._thread_lock_acquire_til_event, args=(name, l, e)
+ )
contender_bits[name] = (t, e)
threads.append(t)
@@ -179,9 +182,11 @@ class KazooLockTests(KazooTestCase):
def test_lock_reconnect(self):
event = self.make_event()
- other_lock = self.client.Lock(self.lockpath, 'contender')
- thread = self.make_thread(target=self._thread_lock_acquire_til_event,
- args=('contender', other_lock, event))
+ other_lock = self.client.Lock(self.lockpath, "contender")
+ thread = self.make_thread(
+ target=self._thread_lock_acquire_til_event,
+ args=("contender", other_lock, event),
+ )
# acquire the lock ourselves first to make the contender line up
lock = self.client.Lock(self.lockpath, "test")
@@ -191,7 +196,7 @@ class KazooLockTests(KazooTestCase):
# wait for the contender to line up on the lock
wait = self.make_wait()
wait(lambda: len(lock.contenders()) == 2)
- assert lock.contenders() == ['test', 'contender']
+ assert lock.contenders() == ["test", "contender"]
self.expire_session(self.make_event)
@@ -200,7 +205,7 @@ class KazooLockTests(KazooTestCase):
with self.condition:
while not self.active_thread:
self.condition.wait()
- assert self.active_thread == 'contender'
+ assert self.active_thread == "contender"
event.set()
thread.join()
@@ -210,8 +215,10 @@ class KazooLockTests(KazooTestCase):
lock = self.client.Lock(self.lockpath, lock_name)
event = self.make_event()
- thread = self.make_thread(target=self._thread_lock_acquire_til_event,
- args=(lock_name, lock, event))
+ thread = self.make_thread(
+ target=self._thread_lock_acquire_til_event,
+ args=(lock_name, lock, event),
+ )
thread.start()
lock1 = self.client.Lock(self.lockpath, lock_name)
@@ -230,8 +237,10 @@ class KazooLockTests(KazooTestCase):
def test_lock_fail_first_call(self):
event1 = self.make_event()
lock1 = self.client.Lock(self.lockpath, "one")
- thread1 = self.make_thread(target=self._thread_lock_acquire_til_event,
- args=("one", lock1, event1))
+ thread1 = self.make_thread(
+ target=self._thread_lock_acquire_til_event,
+ args=("one", lock1, event1),
+ )
thread1.start()
# wait for this thread to acquire the lock
@@ -246,8 +255,10 @@ class KazooLockTests(KazooTestCase):
def test_lock_cancel(self):
event1 = self.make_event()
lock1 = self.client.Lock(self.lockpath, "one")
- thread1 = self.make_thread(target=self._thread_lock_acquire_til_event,
- args=("one", lock1, event1))
+ thread1 = self.make_thread(
+ target=self._thread_lock_acquire_til_event,
+ args=("one", lock1, event1),
+ )
thread1.start()
# wait for this thread to acquire the lock
@@ -260,8 +271,10 @@ class KazooLockTests(KazooTestCase):
client2.start()
event2 = self.make_event()
lock2 = client2.Lock(self.lockpath, "two")
- thread2 = self.make_thread(target=self._thread_lock_acquire_til_event,
- args=("two", lock2, event2))
+ thread2 = self.make_thread(
+ target=self._thread_lock_acquire_til_event,
+ args=("two", lock2, event2),
+ )
thread2.start()
# this one should block in acquire. check that it is a contender
@@ -364,7 +377,7 @@ class KazooLockTests(KazooTestCase):
client1.start()
lock = client1.Lock(self.lockpath, "ephemeral")
lock.acquire(ephemeral=False)
- znode = self.lockpath + '/' + lock.node
+ znode = self.lockpath + "/" + lock.node
client1.stop()
try:
self.client.get(znode)
@@ -521,28 +534,33 @@ class TestSemaphore(KazooTestCase):
def test_non_blocking(self):
sem1 = self.client.Semaphore(
- self.lockpath, identifier='sem1', max_leases=2)
+ self.lockpath, identifier="sem1", max_leases=2
+ )
sem2 = self.client.Semaphore(
- self.lockpath, identifier='sem2', max_leases=2)
+ self.lockpath, identifier="sem2", max_leases=2
+ )
sem3 = self.client.Semaphore(
- self.lockpath, identifier='sem3', max_leases=2)
+ self.lockpath, identifier="sem3", max_leases=2
+ )
sem1.acquire()
sem2.acquire()
assert not sem3.acquire(blocking=False)
- assert set(sem1.lease_holders()) == set(['sem1', 'sem2'])
+ assert set(sem1.lease_holders()) == set(["sem1", "sem2"])
sem2.release()
# the next line isn't required, but avoids timing issues in tests
sem3.acquire()
- assert set(sem1.lease_holders()) == set(['sem1', 'sem3'])
+ assert set(sem1.lease_holders()) == set(["sem1", "sem3"])
sem1.release()
sem3.release()
def test_non_blocking_release(self):
sem1 = self.client.Semaphore(
- self.lockpath, identifier='sem1', max_leases=1)
+ self.lockpath, identifier="sem1", max_leases=1
+ )
sem2 = self.client.Semaphore(
- self.lockpath, identifier='sem2', max_leases=1)
+ self.lockpath, identifier="sem2", max_leases=1
+ )
sem1.acquire()
sem2.acquire(blocking=False)
@@ -555,7 +573,7 @@ class TestSemaphore(KazooTestCase):
event = self.make_event()
def sema_one():
- with self.client.Semaphore(self.lockpath, 'fred', max_leases=1):
+ with self.client.Semaphore(self.lockpath, "fred", max_leases=1):
started.set()
event.wait()
@@ -564,13 +582,13 @@ class TestSemaphore(KazooTestCase):
started.wait()
sem1 = self.client.Semaphore(self.lockpath)
holders = sem1.lease_holders()
- assert holders == ['fred']
+ assert holders == ["fred"]
event.set()
thread.join()
def test_semaphore_cancel(self):
- sem1 = self.client.Semaphore(self.lockpath, 'fred', max_leases=1)
- sem2 = self.client.Semaphore(self.lockpath, 'george', max_leases=1)
+ sem1 = self.client.Semaphore(self.lockpath, "fred", max_leases=1)
+ sem2 = self.client.Semaphore(self.lockpath, "george", max_leases=1)
sem1.acquire()
started = self.make_event()
event = self.make_event()
@@ -586,7 +604,7 @@ class TestSemaphore(KazooTestCase):
thread = self.make_thread(target=sema_one, args=())
thread.start()
started.wait()
- assert sem1.lease_holders() == ['fred']
+ assert sem1.lease_holders() == ["fred"]
assert not event.is_set()
sem2.cancel()
event.wait()
@@ -594,7 +612,7 @@ class TestSemaphore(KazooTestCase):
thread.join()
def test_multiple_acquire_and_release(self):
- sem1 = self.client.Semaphore(self.lockpath, 'fred', max_leases=1)
+ sem1 = self.client.Semaphore(self.lockpath, "fred", max_leases=1)
sem1.acquire()
sem1.acquire()
@@ -602,12 +620,13 @@ class TestSemaphore(KazooTestCase):
assert not sem1.release()
def test_handle_session_loss(self):
- expire_semaphore = self.client.Semaphore(self.lockpath, 'fred',
- max_leases=1)
+ expire_semaphore = self.client.Semaphore(
+ self.lockpath, "fred", max_leases=1
+ )
client = self._get_client()
client.start()
- lh_semaphore = client.Semaphore(self.lockpath, 'george', max_leases=1)
+ lh_semaphore = client.Semaphore(self.lockpath, "george", max_leases=1)
lh_semaphore.acquire()
started = self.make_event()
@@ -624,7 +643,7 @@ class TestSemaphore(KazooTestCase):
thread1.start()
started.wait()
- assert lh_semaphore.lease_holders() == ['george']
+ assert lh_semaphore.lease_holders() == ["george"]
# Fired in a separate thread to make sure we can see the effect
expired = self.make_event()
@@ -642,7 +661,7 @@ class TestSemaphore(KazooTestCase):
client.stop()
event.wait(15)
- assert expire_semaphore.lease_holders() == ['fred']
+ assert expire_semaphore.lease_holders() == ["fred"]
event2.set()
for t in (thread1, thread2):
@@ -661,7 +680,7 @@ class TestSemaphore(KazooTestCase):
sem2 = self.client.Semaphore(self.lockpath, max_leases=2)
self.client.ensure_path(self.lockpath)
- self.client.set(self.lockpath, b'a$')
+ self.client.set(self.lockpath, b"a$")
sem1.acquire()
# sem2 thinks it's ok to have two lease holders
@@ -722,13 +741,24 @@ class TestSemaphore(KazooTestCase):
class TestSequence(unittest.TestCase):
-
def test_get_sorted_children(self):
goLock = "_c_8eb60557ba51e0da67eefc47467d3f34-lock-0000000031"
pyLock = "514e5a831836450cb1a56c741e990fd8__lock__0000000032"
- children = [goLock, pyLock]
+ children = ["hello", goLock, "world", pyLock]
client = mock.MagicMock()
client.get_children.return_value = children
lock = Lock(client, "test")
sorted_children = lock._get_sorted_children()
+ assert len(sorted_children) == 4
+ assert sorted_children[0] == pyLock
+
+ def test_get_sorted_children_go(self):
+ goLock = "_c_8eb60557ba51e0da67eefc47467d3f34-lock-0000000031"
+ pyLock = "514e5a831836450cb1a56c741e990fd8__lock__0000000032"
+ children = ["hello", goLock, "world", pyLock]
+ client = mock.MagicMock()
+ client.get_children.return_value = children
+ lock = Lock(client, "test", additional_lock_patterns=["-lock-"])
+ sorted_children = lock._get_sorted_children()
+ assert len(sorted_children) == 4
assert sorted_children[0] == goLock