summaryrefslogtreecommitdiff
path: root/kazoo/recipe/lock.py
diff options
context:
space:
mode:
Diffstat (limited to 'kazoo/recipe/lock.py')
-rw-r--r--kazoo/recipe/lock.py192
1 files changed, 122 insertions, 70 deletions
diff --git a/kazoo/recipe/lock.py b/kazoo/recipe/lock.py
index 9bd5dc0..7722a97 100644
--- a/kazoo/recipe/lock.py
+++ b/kazoo/recipe/lock.py
@@ -14,7 +14,9 @@ changes and re-act appropriately. In the event that a
and/or the lease has been lost.
"""
+import re
import sys
+
try:
from time import monotonic as now
except ImportError:
@@ -27,13 +29,13 @@ from kazoo.exceptions import (
CancelledError,
KazooException,
LockTimeout,
- NoNodeError
+ NoNodeError,
)
from kazoo.protocol.states import KazooState
from kazoo.retry import (
ForceRetryError,
KazooRetry,
- RetryFailedError
+ RetryFailedError,
)
@@ -82,22 +84,38 @@ class Lock(object):
# sequence number. Involved in read/write locks.
_EXCLUDE_NAMES = ["__lock__"]
- def __init__(self, client, path, identifier=None):
+ def __init__(self, client, path, identifier=None, extra_lock_patterns=()):
"""Create a Kazoo lock.
:param client: A :class:`~kazoo.client.KazooClient` instance.
:param path: The lock path to use.
- :param identifier: Name to use for this lock contender. This
- can be useful for querying to see who the
- current lock contenders are.
-
+ :param identifier: Name to use for this lock contender. This can be
+ useful for querying to see who the current lock
+ contenders are.
+ :param extra_lock_patterns: Strings that will be used to
+ identify other znode in the path
+ that should be considered contenders
+ for this lock.
+ Use this for cross-implementation
+ compatibility.
+
+ .. versionadded:: 2.7.1
+ The extra_lock_patterns option.
"""
self.client = client
self.path = path
+ self._exclude_names = set(
+ self._EXCLUDE_NAMES + list(extra_lock_patterns)
+ )
+ self._contenders_re = re.compile(
+ r"(?:{patterns})(-?\d{{10}})$".format(
+ patterns="|".join(self._exclude_names)
+ )
+ )
# some data is written to the node. this can be queried via
# contenders() to see who is contending for the lock
- self.data = str(identifier or "").encode('utf-8')
+ self.data = str(identifier or "").encode("utf-8")
self.node = None
self.wake_event = client.handler.event_object()
@@ -113,8 +131,9 @@ class Lock(object):
self.is_acquired = False
self.assured_path = False
self.cancelled = False
- self._retry = KazooRetry(max_tries=None,
- sleep_func=client.handler.sleep_func)
+ self._retry = KazooRetry(
+ max_tries=None, sleep_func=client.handler.sleep_func
+ )
self._lock = client.handler.lock_object()
def _ensure_path(self):
@@ -171,6 +190,7 @@ class Lock(object):
return False
if not locked:
# Lock acquire doesn't take a timeout, so simulate it...
+ # XXX: This is not true in Py3 >= 3.2
try:
locked = retry(_acquire_lock)
except RetryFailedError:
@@ -179,9 +199,12 @@ class Lock(object):
try:
gotten = False
try:
- gotten = retry(self._inner_acquire,
- blocking=blocking, timeout=timeout,
- ephemeral=ephemeral)
+ gotten = retry(
+ self._inner_acquire,
+ blocking=blocking,
+ timeout=timeout,
+ ephemeral=ephemeral,
+ )
except RetryFailedError:
pass
except KazooException:
@@ -222,8 +245,9 @@ class Lock(object):
self.create_tried = True
if not node:
- node = self.client.create(self.create_path, self.data,
- ephemeral=ephemeral, sequence=True)
+ node = self.client.create(
+ self.create_path, self.data, ephemeral=ephemeral, sequence=True
+ )
# strip off path to node
node = node[len(self.path) + 1:]
@@ -236,18 +260,8 @@ class Lock(object):
if self.cancelled:
raise CancelledError()
- children = self._get_sorted_children()
-
- try:
- our_index = children.index(node)
- except ValueError: # pragma: nocover
- # somehow we aren't in the children -- probably we are
- # recovering from a session failure and our ephemeral
- # node was removed
- raise ForceRetryError()
-
- predecessor = self.predecessor(children, our_index)
- if not predecessor:
+ predecessor = self._get_predecessor(node)
+ if predecessor is None:
return True
if not blocking:
@@ -263,40 +277,51 @@ class Lock(object):
else:
self.wake_event.wait(timeout)
if not self.wake_event.isSet():
- raise LockTimeout("Failed to acquire lock on %s after "
- "%s seconds" % (self.path, timeout))
+ raise LockTimeout(
+ "Failed to acquire lock on %s after %s seconds"
+ % (self.path, timeout)
+ )
finally:
self.client.remove_listener(self._watch_session)
- def predecessor(self, children, index):
- for c in reversed(children[:index]):
- if any(n in c for n in self._EXCLUDE_NAMES):
- return c
- return None
-
def _watch_predecessor(self, event):
self.wake_event.set()
- def _get_sorted_children(self):
+ def _get_predecessor(self, node):
+ """returns `node`'s predecessor or None
+
+ Note: This handle the case where the current lock is not a contender
+ (e.g. rlock), this and also edge cases where the lock's ephemeral node
+ is gone.
+ """
children = self.client.get_children(self.path)
+ found_self = False
+ # Filter out the contenders using the computed regex
+ contender_matches = []
+ for child in children:
+ match = self._contenders_re.search(child)
+ if match is not None:
+ contender_matches.append(match)
+ if child == node:
+ # Remember the node's match object so we can short circuit
+ # below.
+ found_self = match
+
+ if found_self is False: # pragma: nocover
+ # somehow we aren't in the childrens -- probably we are
+ # recovering from a session failure and our ephemeral
+ # node was removed.
+ raise ForceRetryError()
+
+ predecessor = None
+ # Sort the contenders using the sequence number extracted by the regex,
+ # then extract the original string.
+ for match in sorted(contender_matches, key=lambda m: m.groups()):
+ if match is found_self:
+ break
+ predecessor = match.string
- # Node names are prefixed by a type: strip the prefix first, which may
- # be one of multiple values in case of a read-write lock, and return
- # only the sequence number (as a string since it is padded and will
- # sort correctly anyway).
- #
- # In some cases, the lock path may contain nodes with other prefixes
- # (eg. in case of a lease), just sort them last ('~' sorts after all
- # ASCII digits).
- def _seq(c):
- for name in ["__lock__", "__rlock__"]:
- idx = c.find(name)
- if idx != -1:
- return c[idx + len(name):]
- # Sort unknown node names eg. "lease_holder" last.
- return '~'
- children.sort(key=_seq)
- return children
+ return predecessor
def _find_node(self):
children = self.client.get_children(self.path)
@@ -347,16 +372,37 @@ class Lock(object):
if not self.assured_path:
self._ensure_path()
- children = self._get_sorted_children()
-
- contenders = []
+ children = self.client.get_children(self.path)
+ # We want all contenders, including self (this is especially important
+ # for r/w locks). This is similar to the logic of `_get_predecessor`
+ # except we include our own pattern.
+ all_contenders_re = re.compile(
+ r"(?:{patterns})(-?\d{{10}})$".format(
+ patterns="|".join(self._exclude_names | {self._NODE_NAME})
+ )
+ )
+ # Filter out the contenders using the computed regex
+ contender_matches = []
for child in children:
+ match = all_contenders_re.search(child)
+ if match is not None:
+ contender_matches.append(match)
+ # Sort the contenders using the sequence number extracted by the regex,
+ # then extract the original string.
+ contender_nodes = [
+ match.string
+ for match in sorted(contender_matches, key=lambda m: m.groups())
+ ]
+ # Retrieve all the contender nodes data (preserving order).
+ contenders = []
+ for node in contender_nodes:
try:
- data, stat = self.client.get(self.path + "/" + child)
+ data, stat = self.client.get(self.path + "/" + node)
if data is not None:
- contenders.append(data.decode('utf-8'))
+ contenders.append(data.decode("utf-8"))
except NoNodeError: # pragma: nocover
pass
+
return contenders
def __enter__(self):
@@ -391,6 +437,7 @@ class WriteLock(Lock):
shared lock.
"""
+
_NODE_NAME = "__lock__"
_EXCLUDE_NAMES = ["__lock__", "__rlock__"]
@@ -420,6 +467,7 @@ class ReadLock(Lock):
shared lock.
"""
+
_NODE_NAME = "__rlock__"
_EXCLUDE_NAMES = ["__lock__"]
@@ -458,6 +506,7 @@ class Semaphore(object):
The max_leases check.
"""
+
def __init__(self, client, path, identifier=None, max_leases=1):
"""Create a Kazoo Lock
@@ -483,12 +532,12 @@ class Semaphore(object):
# some data is written to the node. this can be queried via
# contenders() to see who is contending for the lock
- self.data = str(identifier or "").encode('utf-8')
+ self.data = str(identifier or "").encode("utf-8")
self.max_leases = max_leases
self.wake_event = client.handler.event_object()
self.create_path = self.path + "/" + uuid.uuid4().hex
- self.lock_path = path + '-' + '__lock__'
+ self.lock_path = path + "-" + "__lock__"
self.is_acquired = False
self.assured_path = False
self.cancelled = False
@@ -501,7 +550,7 @@ class Semaphore(object):
# node did already exist
data, _ = self.client.get(self.path)
try:
- leases = int(data.decode('utf-8'))
+ leases = int(data.decode("utf-8"))
except (ValueError, TypeError):
# ignore non-numeric data, maybe the node data is used
# for other purposes
@@ -509,11 +558,11 @@ class Semaphore(object):
else:
if leases != self.max_leases:
raise ValueError(
- "Inconsistent max leases: %s, expected: %s" %
- (leases, self.max_leases)
+ "Inconsistent max leases: %s, expected: %s"
+ % (leases, self.max_leases)
)
else:
- self.client.set(self.path, str(self.max_leases).encode('utf-8'))
+ self.client.set(self.path, str(self.max_leases).encode("utf-8"))
def cancel(self):
"""Cancel a pending semaphore acquire."""
@@ -548,7 +597,8 @@ class Semaphore(object):
try:
self.is_acquired = self.client.retry(
- self._inner_acquire, blocking=blocking, timeout=timeout)
+ self._inner_acquire, blocking=blocking, timeout=timeout
+ )
except KazooException:
# if we did ultimately fail, attempt to clean up
self._best_effort_cleanup()
@@ -590,8 +640,9 @@ class Semaphore(object):
self.wake_event.wait(w.leftover())
if not self.wake_event.isSet():
raise LockTimeout(
- "Failed to acquire semaphore on %s "
- "after %s seconds" % (self.path, timeout))
+ "Failed to acquire semaphore on %s"
+ " after %s seconds" % (self.path, timeout)
+ )
else:
return False
finally:
@@ -612,8 +663,9 @@ class Semaphore(object):
# Get a list of the current potential lock holders. If they change,
# notify our wake_event object. This is used to unblock a blocking
# self._inner_acquire call.
- children = self.client.get_children(self.path,
- self._watch_lease_change)
+ children = self.client.get_children(
+ self.path, self._watch_lease_change
+ )
# If there are leases available, acquire one
if len(children) < self.max_leases:
@@ -674,7 +726,7 @@ class Semaphore(object):
for child in children:
try:
data, stat = self.client.get(self.path + "/" + child)
- lease_holders.append(data.decode('utf-8'))
+ lease_holders.append(data.decode("utf-8"))
except NoNodeError: # pragma: nocover
pass
return lease_holders