diff options
Diffstat (limited to 'kazoo/recipe/lock.py')
-rw-r--r-- | kazoo/recipe/lock.py | 192 |
1 files changed, 122 insertions, 70 deletions
diff --git a/kazoo/recipe/lock.py b/kazoo/recipe/lock.py index 9bd5dc0..7722a97 100644 --- a/kazoo/recipe/lock.py +++ b/kazoo/recipe/lock.py @@ -14,7 +14,9 @@ changes and re-act appropriately. In the event that a and/or the lease has been lost. """ +import re import sys + try: from time import monotonic as now except ImportError: @@ -27,13 +29,13 @@ from kazoo.exceptions import ( CancelledError, KazooException, LockTimeout, - NoNodeError + NoNodeError, ) from kazoo.protocol.states import KazooState from kazoo.retry import ( ForceRetryError, KazooRetry, - RetryFailedError + RetryFailedError, ) @@ -82,22 +84,38 @@ class Lock(object): # sequence number. Involved in read/write locks. _EXCLUDE_NAMES = ["__lock__"] - def __init__(self, client, path, identifier=None): + def __init__(self, client, path, identifier=None, extra_lock_patterns=()): """Create a Kazoo lock. :param client: A :class:`~kazoo.client.KazooClient` instance. :param path: The lock path to use. - :param identifier: Name to use for this lock contender. This - can be useful for querying to see who the - current lock contenders are. - + :param identifier: Name to use for this lock contender. This can be + useful for querying to see who the current lock + contenders are. + :param extra_lock_patterns: Strings that will be used to + identify other znode in the path + that should be considered contenders + for this lock. + Use this for cross-implementation + compatibility. + + .. versionadded:: 2.7.1 + The extra_lock_patterns option. """ self.client = client self.path = path + self._exclude_names = set( + self._EXCLUDE_NAMES + list(extra_lock_patterns) + ) + self._contenders_re = re.compile( + r"(?:{patterns})(-?\d{{10}})$".format( + patterns="|".join(self._exclude_names) + ) + ) # some data is written to the node. this can be queried via # contenders() to see who is contending for the lock - self.data = str(identifier or "").encode('utf-8') + self.data = str(identifier or "").encode("utf-8") self.node = None self.wake_event = client.handler.event_object() @@ -113,8 +131,9 @@ class Lock(object): self.is_acquired = False self.assured_path = False self.cancelled = False - self._retry = KazooRetry(max_tries=None, - sleep_func=client.handler.sleep_func) + self._retry = KazooRetry( + max_tries=None, sleep_func=client.handler.sleep_func + ) self._lock = client.handler.lock_object() def _ensure_path(self): @@ -171,6 +190,7 @@ class Lock(object): return False if not locked: # Lock acquire doesn't take a timeout, so simulate it... + # XXX: This is not true in Py3 >= 3.2 try: locked = retry(_acquire_lock) except RetryFailedError: @@ -179,9 +199,12 @@ class Lock(object): try: gotten = False try: - gotten = retry(self._inner_acquire, - blocking=blocking, timeout=timeout, - ephemeral=ephemeral) + gotten = retry( + self._inner_acquire, + blocking=blocking, + timeout=timeout, + ephemeral=ephemeral, + ) except RetryFailedError: pass except KazooException: @@ -222,8 +245,9 @@ class Lock(object): self.create_tried = True if not node: - node = self.client.create(self.create_path, self.data, - ephemeral=ephemeral, sequence=True) + node = self.client.create( + self.create_path, self.data, ephemeral=ephemeral, sequence=True + ) # strip off path to node node = node[len(self.path) + 1:] @@ -236,18 +260,8 @@ class Lock(object): if self.cancelled: raise CancelledError() - children = self._get_sorted_children() - - try: - our_index = children.index(node) - except ValueError: # pragma: nocover - # somehow we aren't in the children -- probably we are - # recovering from a session failure and our ephemeral - # node was removed - raise ForceRetryError() - - predecessor = self.predecessor(children, our_index) - if not predecessor: + predecessor = self._get_predecessor(node) + if predecessor is None: return True if not blocking: @@ -263,40 +277,51 @@ class Lock(object): else: self.wake_event.wait(timeout) if not self.wake_event.isSet(): - raise LockTimeout("Failed to acquire lock on %s after " - "%s seconds" % (self.path, timeout)) + raise LockTimeout( + "Failed to acquire lock on %s after %s seconds" + % (self.path, timeout) + ) finally: self.client.remove_listener(self._watch_session) - def predecessor(self, children, index): - for c in reversed(children[:index]): - if any(n in c for n in self._EXCLUDE_NAMES): - return c - return None - def _watch_predecessor(self, event): self.wake_event.set() - def _get_sorted_children(self): + def _get_predecessor(self, node): + """returns `node`'s predecessor or None + + Note: This handle the case where the current lock is not a contender + (e.g. rlock), this and also edge cases where the lock's ephemeral node + is gone. + """ children = self.client.get_children(self.path) + found_self = False + # Filter out the contenders using the computed regex + contender_matches = [] + for child in children: + match = self._contenders_re.search(child) + if match is not None: + contender_matches.append(match) + if child == node: + # Remember the node's match object so we can short circuit + # below. + found_self = match + + if found_self is False: # pragma: nocover + # somehow we aren't in the childrens -- probably we are + # recovering from a session failure and our ephemeral + # node was removed. + raise ForceRetryError() + + predecessor = None + # Sort the contenders using the sequence number extracted by the regex, + # then extract the original string. + for match in sorted(contender_matches, key=lambda m: m.groups()): + if match is found_self: + break + predecessor = match.string - # Node names are prefixed by a type: strip the prefix first, which may - # be one of multiple values in case of a read-write lock, and return - # only the sequence number (as a string since it is padded and will - # sort correctly anyway). - # - # In some cases, the lock path may contain nodes with other prefixes - # (eg. in case of a lease), just sort them last ('~' sorts after all - # ASCII digits). - def _seq(c): - for name in ["__lock__", "__rlock__"]: - idx = c.find(name) - if idx != -1: - return c[idx + len(name):] - # Sort unknown node names eg. "lease_holder" last. - return '~' - children.sort(key=_seq) - return children + return predecessor def _find_node(self): children = self.client.get_children(self.path) @@ -347,16 +372,37 @@ class Lock(object): if not self.assured_path: self._ensure_path() - children = self._get_sorted_children() - - contenders = [] + children = self.client.get_children(self.path) + # We want all contenders, including self (this is especially important + # for r/w locks). This is similar to the logic of `_get_predecessor` + # except we include our own pattern. + all_contenders_re = re.compile( + r"(?:{patterns})(-?\d{{10}})$".format( + patterns="|".join(self._exclude_names | {self._NODE_NAME}) + ) + ) + # Filter out the contenders using the computed regex + contender_matches = [] for child in children: + match = all_contenders_re.search(child) + if match is not None: + contender_matches.append(match) + # Sort the contenders using the sequence number extracted by the regex, + # then extract the original string. + contender_nodes = [ + match.string + for match in sorted(contender_matches, key=lambda m: m.groups()) + ] + # Retrieve all the contender nodes data (preserving order). + contenders = [] + for node in contender_nodes: try: - data, stat = self.client.get(self.path + "/" + child) + data, stat = self.client.get(self.path + "/" + node) if data is not None: - contenders.append(data.decode('utf-8')) + contenders.append(data.decode("utf-8")) except NoNodeError: # pragma: nocover pass + return contenders def __enter__(self): @@ -391,6 +437,7 @@ class WriteLock(Lock): shared lock. """ + _NODE_NAME = "__lock__" _EXCLUDE_NAMES = ["__lock__", "__rlock__"] @@ -420,6 +467,7 @@ class ReadLock(Lock): shared lock. """ + _NODE_NAME = "__rlock__" _EXCLUDE_NAMES = ["__lock__"] @@ -458,6 +506,7 @@ class Semaphore(object): The max_leases check. """ + def __init__(self, client, path, identifier=None, max_leases=1): """Create a Kazoo Lock @@ -483,12 +532,12 @@ class Semaphore(object): # some data is written to the node. this can be queried via # contenders() to see who is contending for the lock - self.data = str(identifier or "").encode('utf-8') + self.data = str(identifier or "").encode("utf-8") self.max_leases = max_leases self.wake_event = client.handler.event_object() self.create_path = self.path + "/" + uuid.uuid4().hex - self.lock_path = path + '-' + '__lock__' + self.lock_path = path + "-" + "__lock__" self.is_acquired = False self.assured_path = False self.cancelled = False @@ -501,7 +550,7 @@ class Semaphore(object): # node did already exist data, _ = self.client.get(self.path) try: - leases = int(data.decode('utf-8')) + leases = int(data.decode("utf-8")) except (ValueError, TypeError): # ignore non-numeric data, maybe the node data is used # for other purposes @@ -509,11 +558,11 @@ class Semaphore(object): else: if leases != self.max_leases: raise ValueError( - "Inconsistent max leases: %s, expected: %s" % - (leases, self.max_leases) + "Inconsistent max leases: %s, expected: %s" + % (leases, self.max_leases) ) else: - self.client.set(self.path, str(self.max_leases).encode('utf-8')) + self.client.set(self.path, str(self.max_leases).encode("utf-8")) def cancel(self): """Cancel a pending semaphore acquire.""" @@ -548,7 +597,8 @@ class Semaphore(object): try: self.is_acquired = self.client.retry( - self._inner_acquire, blocking=blocking, timeout=timeout) + self._inner_acquire, blocking=blocking, timeout=timeout + ) except KazooException: # if we did ultimately fail, attempt to clean up self._best_effort_cleanup() @@ -590,8 +640,9 @@ class Semaphore(object): self.wake_event.wait(w.leftover()) if not self.wake_event.isSet(): raise LockTimeout( - "Failed to acquire semaphore on %s " - "after %s seconds" % (self.path, timeout)) + "Failed to acquire semaphore on %s" + " after %s seconds" % (self.path, timeout) + ) else: return False finally: @@ -612,8 +663,9 @@ class Semaphore(object): # Get a list of the current potential lock holders. If they change, # notify our wake_event object. This is used to unblock a blocking # self._inner_acquire call. - children = self.client.get_children(self.path, - self._watch_lease_change) + children = self.client.get_children( + self.path, self._watch_lease_change + ) # If there are leases available, acquire one if len(children) < self.max_leases: @@ -674,7 +726,7 @@ class Semaphore(object): for child in children: try: data, stat = self.client.get(self.path + "/" + child) - lease_holders.append(data.decode('utf-8')) + lease_holders.append(data.decode("utf-8")) except NoNodeError: # pragma: nocover pass return lease_holders |