diff options
Diffstat (limited to 'kazoo/recipe/watchers.py')
-rw-r--r-- | kazoo/recipe/watchers.py | 78 |
1 files changed, 50 insertions, 28 deletions
diff --git a/kazoo/recipe/watchers.py b/kazoo/recipe/watchers.py index 96ec4fe..de61777 100644 --- a/kazoo/recipe/watchers.py +++ b/kazoo/recipe/watchers.py @@ -15,11 +15,7 @@ import logging import time import warnings -from kazoo.exceptions import ( - ConnectionClosedError, - NoNodeError, - KazooException -) +from kazoo.exceptions import ConnectionClosedError, NoNodeError, KazooException from kazoo.protocol.states import KazooState from kazoo.retry import KazooRetry @@ -37,6 +33,7 @@ def _ignore_closed(func): return func(*args, **kwargs) except ConnectionClosedError: pass + return wrapper @@ -90,6 +87,7 @@ class DataWatch(object): passed to it and warns that they are no longer respected. """ + def __init__(self, client, path, func=None, *args, **kwargs): """Create a data watcher for a path @@ -110,18 +108,22 @@ class DataWatch(object): self._stopped = False self._run_lock = client.handler.lock_object() self._version = None - self._retry = KazooRetry(max_tries=None, - sleep_func=client.handler.sleep_func) + self._retry = KazooRetry( + max_tries=None, sleep_func=client.handler.sleep_func + ) self._include_event = None self._ever_called = False self._used = False if args or kwargs: - warnings.warn('Passing additional arguments to DataWatch is' - ' deprecated. ignore_missing_node is now assumed ' - ' to be True by default, and the event will be ' - ' sent if the function can handle receiving it', - DeprecationWarning, stacklevel=2) + warnings.warn( + "Passing additional arguments to DataWatch is" + " deprecated. ignore_missing_node is now assumed " + " to be True by default, and the event will be " + " sent if the function can handle receiving it", + DeprecationWarning, + stacklevel=2, + ) # Register our session listener if we're going to resume # across session losses @@ -143,7 +145,8 @@ class DataWatch(object): if self._used: raise KazooException( "A function has already been associated with this " - "DataWatch instance.") + "DataWatch instance." + ) self._func = func @@ -181,15 +184,17 @@ class DataWatch(object): initial_version = self._version try: - data, stat = self._retry(self._client.get, - self._path, self._watcher) + data, stat = self._retry( + self._client.get, self._path, self._watcher + ) except NoNodeError: data = None # This will set 'stat' to None if the node does not yet # exist. - stat = self._retry(self._client.exists, self._path, - self._watcher) + stat = self._retry( + self._client.exists, self._path, self._watcher + ) if stat: self._client.handler.spawn(self._get_data) return @@ -245,8 +250,15 @@ class ChildrenWatch(object): # Above function is called immediately and prints children """ - def __init__(self, client, path, func=None, - allow_session_lost=True, send_event=False): + + def __init__( + self, + client, + path, + func=None, + allow_session_lost=True, + send_event=False, + ): """Create a children watcher for a path :param client: A zookeeper client. @@ -301,7 +313,8 @@ class ChildrenWatch(object): if self._used: raise KazooException( "A function has already been associated with this " - "ChildrenWatch instance.") + "ChildrenWatch instance." + ) self._func = func @@ -318,8 +331,9 @@ class ChildrenWatch(object): return try: - children = self._client.retry(self._client.get_children, - self._path, self._watcher) + children = self._client.retry( + self._client.get_children, self._path, self._watcher + ) except NoNodeError: self._stopped = True return @@ -327,8 +341,10 @@ class ChildrenWatch(object): if not self._watch_established: self._watch_established = True - if self._prior_children is not None and \ - self._prior_children == children: + if ( + self._prior_children is not None + and self._prior_children == children + ): return self._prior_children = children @@ -354,8 +370,11 @@ class ChildrenWatch(object): def _session_watcher(self, state): if state in (KazooState.LOST, KazooState.SUSPENDED): self._watch_established = False - elif (state == KazooState.CONNECTED and - not self._watch_established and not self._stopped): + elif ( + state == KazooState.CONNECTED + and not self._watch_established + and not self._stopped + ): self._client.handler.spawn(self._get_children) @@ -388,6 +407,7 @@ class PatientChildrenWatch(object): checked to see if the children have changed later. """ + def __init__(self, client, path, time_boundary=30): self.client = client self.path = path @@ -412,8 +432,10 @@ class PatientChildrenWatch(object): while True: async_result = self.client.handler.async_result() self.children = self.client.retry( - self.client.get_children, self.path, - partial(self._children_watcher, async_result)) + self.client.get_children, + self.path, + partial(self._children_watcher, async_result), + ) self.client.handler.sleep_func(self.time_boundary) if self.children_changed.is_set(): |