diff options
author | Ben Bangert <ben@groovie.org> | 2012-07-13 16:10:21 -0700 |
---|---|---|
committer | Ben Bangert <ben@groovie.org> | 2012-07-13 16:10:21 -0700 |
commit | 0033bb8eade15ce6ecb88c83cbfec0894ea6c239 (patch) | |
tree | 1c617b6e646a027acb3c1838eea904ad0b180ac3 /kazoo/recipe/watchers.py | |
parent | 456325be649ba06dd123bed2048e610948603cfa (diff) | |
download | kazoo-0033bb8eade15ce6ecb88c83cbfec0894ea6c239.tar.gz |
Add data watching API.
Diffstat (limited to 'kazoo/recipe/watchers.py')
-rw-r--r-- | kazoo/recipe/watchers.py | 111 |
1 files changed, 111 insertions, 0 deletions
diff --git a/kazoo/recipe/watchers.py b/kazoo/recipe/watchers.py index 1373138..f6410be 100644 --- a/kazoo/recipe/watchers.py +++ b/kazoo/recipe/watchers.py @@ -9,10 +9,119 @@ from kazoo.client import KazooState log = logging.getLogger(__name__) +class DataWatch(object): + """Watches a node for data updates and calls the specified + function each time it changes + + The function will also be called the very first time its + registered to get the data. + + Returning `False` from the registered function will disable + future data change calls. + + Example with client: + + .. code-block:: python + + @client.DataWatch('/path/to/watch') + def my_func(data, stat): + print "Data is %s" % data + print "Version is %s" % stat.version + + # Above function is called immediately and prints + + """ + def __init__(self, client, path, func=None, + allow_session_lost=True): + """Create a children watcher for a path + + :param client: A zookeeper client + :type client: :class:`~kazoo.client.KazooClient` + :param path: The path to watch for children on + :type path: str + :param func: Function to call initially and every time the + children change. `func` will be called with a + tuple, the value of the node and a + :class:`~kazoo.client.ZnodeStat` instance + :type func: callable + :param allow_session_lost: Whether the watch should be + re-registered if the zookeeper + session is lost. + :type allow_session_lost: bool + + The path must already exist for the children watcher to + run. + + """ + self._client = client + self._path = path + self._func = func + self._stopped = False + self._watch_established = False + self._allow_session_lost = allow_session_lost + self._run_lock = threading.Lock() + + # Register our session listener if we're going to resume + # across session losses + if func: + if allow_session_lost: + self._client.add_listener(self._session_watcher) + self._get_data() + + def __call__(self, func): + """Callable version for use as a decorator + + :param func: Function to call initially and every time the + children change. `func` will be called with a + tuple, the value of the node and a + :class:`~kazoo.client.ZnodeStat` instance + :type func: callable + + """ + self._func = func + + if self._allow_session_lost: + self._client.add_listener(self._session_watcher) + self._get_data() + + def _get_data(self): + with self._run_lock: # Ensure this runs one at a time + if self._stopped: + return + + data, stat = self._client.retry(self._client.get, + self._path, self._watcher) + if not self._watch_established: + self._watch_established = True + + try: + if self._func(data, stat) is False: + self._stopped = True + except Exception as exc: + log.exception(exc) + raise + + def _watcher(self, event): + self._get_data() + + def _session_watcher(self, state): + if state == KazooState.LOST: + self._watch_established = False + elif state == KazooState.CONNECTED and \ + not self._watch_established and not self._stopped: + self._get_data() + + class ChildrenWatch(object): """Watches a node for children updates and calls the specified function each time it changes + The function will also be called the very first time its + registered to get children. + + Returning `False` from the registered function will disable + future children change calls. + Example with client: .. code-block:: python @@ -21,6 +130,8 @@ class ChildrenWatch(object): def my_func(children): print "Children are %s" % children + # Above function is called immediately and prints children + """ def __init__(self, client, path, func=None, allow_session_lost=True): |