summaryrefslogtreecommitdiff
path: root/kazoo/recipe/watchers.py
diff options
context:
space:
mode:
Diffstat (limited to 'kazoo/recipe/watchers.py')
-rw-r--r--kazoo/recipe/watchers.py78
1 files changed, 50 insertions, 28 deletions
diff --git a/kazoo/recipe/watchers.py b/kazoo/recipe/watchers.py
index 96ec4fe..de61777 100644
--- a/kazoo/recipe/watchers.py
+++ b/kazoo/recipe/watchers.py
@@ -15,11 +15,7 @@ import logging
import time
import warnings
-from kazoo.exceptions import (
- ConnectionClosedError,
- NoNodeError,
- KazooException
-)
+from kazoo.exceptions import ConnectionClosedError, NoNodeError, KazooException
from kazoo.protocol.states import KazooState
from kazoo.retry import KazooRetry
@@ -37,6 +33,7 @@ def _ignore_closed(func):
return func(*args, **kwargs)
except ConnectionClosedError:
pass
+
return wrapper
@@ -90,6 +87,7 @@ class DataWatch(object):
passed to it and warns that they are no longer respected.
"""
+
def __init__(self, client, path, func=None, *args, **kwargs):
"""Create a data watcher for a path
@@ -110,18 +108,22 @@ class DataWatch(object):
self._stopped = False
self._run_lock = client.handler.lock_object()
self._version = None
- self._retry = KazooRetry(max_tries=None,
- sleep_func=client.handler.sleep_func)
+ self._retry = KazooRetry(
+ max_tries=None, sleep_func=client.handler.sleep_func
+ )
self._include_event = None
self._ever_called = False
self._used = False
if args or kwargs:
- warnings.warn('Passing additional arguments to DataWatch is'
- ' deprecated. ignore_missing_node is now assumed '
- ' to be True by default, and the event will be '
- ' sent if the function can handle receiving it',
- DeprecationWarning, stacklevel=2)
+ warnings.warn(
+ "Passing additional arguments to DataWatch is"
+ " deprecated. ignore_missing_node is now assumed "
+ " to be True by default, and the event will be "
+ " sent if the function can handle receiving it",
+ DeprecationWarning,
+ stacklevel=2,
+ )
# Register our session listener if we're going to resume
# across session losses
@@ -143,7 +145,8 @@ class DataWatch(object):
if self._used:
raise KazooException(
"A function has already been associated with this "
- "DataWatch instance.")
+ "DataWatch instance."
+ )
self._func = func
@@ -181,15 +184,17 @@ class DataWatch(object):
initial_version = self._version
try:
- data, stat = self._retry(self._client.get,
- self._path, self._watcher)
+ data, stat = self._retry(
+ self._client.get, self._path, self._watcher
+ )
except NoNodeError:
data = None
# This will set 'stat' to None if the node does not yet
# exist.
- stat = self._retry(self._client.exists, self._path,
- self._watcher)
+ stat = self._retry(
+ self._client.exists, self._path, self._watcher
+ )
if stat:
self._client.handler.spawn(self._get_data)
return
@@ -245,8 +250,15 @@ class ChildrenWatch(object):
# Above function is called immediately and prints children
"""
- def __init__(self, client, path, func=None,
- allow_session_lost=True, send_event=False):
+
+ def __init__(
+ self,
+ client,
+ path,
+ func=None,
+ allow_session_lost=True,
+ send_event=False,
+ ):
"""Create a children watcher for a path
:param client: A zookeeper client.
@@ -301,7 +313,8 @@ class ChildrenWatch(object):
if self._used:
raise KazooException(
"A function has already been associated with this "
- "ChildrenWatch instance.")
+ "ChildrenWatch instance."
+ )
self._func = func
@@ -318,8 +331,9 @@ class ChildrenWatch(object):
return
try:
- children = self._client.retry(self._client.get_children,
- self._path, self._watcher)
+ children = self._client.retry(
+ self._client.get_children, self._path, self._watcher
+ )
except NoNodeError:
self._stopped = True
return
@@ -327,8 +341,10 @@ class ChildrenWatch(object):
if not self._watch_established:
self._watch_established = True
- if self._prior_children is not None and \
- self._prior_children == children:
+ if (
+ self._prior_children is not None
+ and self._prior_children == children
+ ):
return
self._prior_children = children
@@ -354,8 +370,11 @@ class ChildrenWatch(object):
def _session_watcher(self, state):
if state in (KazooState.LOST, KazooState.SUSPENDED):
self._watch_established = False
- elif (state == KazooState.CONNECTED and
- not self._watch_established and not self._stopped):
+ elif (
+ state == KazooState.CONNECTED
+ and not self._watch_established
+ and not self._stopped
+ ):
self._client.handler.spawn(self._get_children)
@@ -388,6 +407,7 @@ class PatientChildrenWatch(object):
checked to see if the children have changed later.
"""
+
def __init__(self, client, path, time_boundary=30):
self.client = client
self.path = path
@@ -412,8 +432,10 @@ class PatientChildrenWatch(object):
while True:
async_result = self.client.handler.async_result()
self.children = self.client.retry(
- self.client.get_children, self.path,
- partial(self._children_watcher, async_result))
+ self.client.get_children,
+ self.path,
+ partial(self._children_watcher, async_result),
+ )
self.client.handler.sleep_func(self.time_boundary)
if self.children_changed.is_set():