summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJiangge Zhang <tonyseek@gmail.com>2017-06-12 15:29:27 +0800
committerJiangge Zhang <tonyseek@gmail.com>2017-06-13 12:43:04 +0800
commitec8b337e6f1a4ff12e669f4b96ca98fb37ee5d8a (patch)
tree9e3365381dc68df7803c64e62773f67dae5f170f
parentb4967d16b3daa4b1a00c0f408888b4f61e5486cc (diff)
downloadkazoo-ec8b337e6f1a4ff12e669f4b96ca98fb37ee5d8a.tar.gz
feat(recipe): Add TreeCache recipe
Kazoo implementation of Apache Curator's TreeCache recipe. See also: http://curator.apache.org/curator-recipes/tree-cache.html
-rw-r--r--docs/api.rst1
-rw-r--r--docs/api/recipe/cache.rst26
-rw-r--r--kazoo/recipe/cache.py389
-rw-r--r--kazoo/tests/test_cache.py278
4 files changed, 694 insertions, 0 deletions
diff --git a/docs/api.rst b/docs/api.rst
index 744b012..ff921c0 100644
--- a/docs/api.rst
+++ b/docs/api.rst
@@ -16,6 +16,7 @@ organized alphabetically by module name.
api/interfaces
api/protocol/states
api/recipe/barrier
+ api/recipe/cache
api/recipe/counter
api/recipe/election
api/recipe/lease
diff --git a/docs/api/recipe/cache.rst b/docs/api/recipe/cache.rst
new file mode 100644
index 0000000..fb7f372
--- /dev/null
+++ b/docs/api/recipe/cache.rst
@@ -0,0 +1,26 @@
+.. _cache_module:
+
+:mod:`kazoo.recipe.cache`
+----------------------------
+
+.. automodule:: kazoo.recipe.cache
+
+Public API
+++++++++++
+
+ .. autoclass:: TreeCache
+
+ .. automethod:: start
+ .. automethod:: close
+ .. automethod:: listen
+ .. automethod:: listen_fault
+ .. automethod:: get_data
+ .. automethod:: get_children
+
+ .. autoclass:: TreeEvent
+ :members:
+ :show-inheritance:
+
+ .. autoclass:: NodeData
+ :members:
+ :show-inheritance:
diff --git a/kazoo/recipe/cache.py b/kazoo/recipe/cache.py
new file mode 100644
index 0000000..1e59f28
--- /dev/null
+++ b/kazoo/recipe/cache.py
@@ -0,0 +1,389 @@
+"""TreeCache
+
+:Maintainer: Jiangge Zhang <tonyseek@gmail.com>
+:Maintainer: Haochuan Guo <guohaochuan@gmail.com>
+:Maintainer: Tianwen Zhang <mail2tevin@gmail.com>
+:Status: Alpha
+
+A port of the Apache Curator's TreeCache recipe. It builds an in-memory cache
+of a subtree in ZooKeeper and keeps it up-to-date.
+
+See also: http://curator.apache.org/curator-recipes/tree-cache.html
+"""
+
+from __future__ import absolute_import
+
+import os
+import logging
+import contextlib
+import functools
+import operator
+
+from kazoo.exceptions import NoNodeError, KazooException
+from kazoo.protocol.states import KazooState, EventType
+
+
+logger = logging.getLogger(__name__)
+
+
+class TreeCache(object):
+ """The cache of a ZooKeeper subtree.
+
+ :param client: A :class:`~kazoo.client.KazooClient` instance.
+ :param path: The root path of subtree.
+ """
+
+ STATE_LATENT = 0
+ STATE_STARTED = 1
+ STATE_CLOSED = 2
+
+ def __init__(self, client, path):
+ self._client = client
+ self._root = TreeNode.make_root(self, path)
+ self._state = self.STATE_LATENT
+ self._outstanding_ops = 0
+ self._is_initialized = False
+ self._error_listeners = []
+ self._event_listeners = []
+
+ def start(self):
+ """Starts the cache.
+
+ The cache is not started automatically. You must call this method.
+
+ After a cache started, all changes of subtree will be synchronized
+ from the ZooKeeper server. Events will be fired for those activity.
+
+ See also :meth:`~TreeCache.listen`.
+
+ .. note::
+
+ This method is not thread safe.
+ """
+ if self._state == self.STATE_LATENT:
+ self._state = self.STATE_STARTED
+ elif self._state == self.STATE_CLOSED:
+ raise KazooException('already closed')
+ else:
+ raise KazooException('already started')
+
+ self._client.add_listener(self._session_watcher)
+ self._client.ensure_path(self._root._path)
+
+ if self._client.connected:
+ self._root.on_created()
+
+ def close(self):
+ """Closes the cache.
+
+ A closed cache was detached from ZooKeeper's changes. And all nodes
+ will be invalidated.
+
+ Once a tree cache was closed, it could not be started again. You should
+ only close a tree cache while you want to recycle it.
+
+ .. note::
+
+ This method is not thread safe.
+ """
+ if self._state == self.STATE_STARTED:
+ self._state = self.STATE_CLOSED
+ self._client.remove_listener(self._session_watcher)
+ with handle_exception(self._error_listeners):
+ self._root.on_deleted()
+
+ def listen(self, listener):
+ """Registers a function to listen the cache events.
+
+ The cache events are changes of local data. They are delivered from
+ watching notifications in ZooKeeper session.
+
+ This method can be use as a decorator.
+
+ :param listener: A callable object which accepting a
+ :class:`~kazoo.recipe.cache.TreeEvent` instance as
+ its argument.
+ """
+ self._event_listeners.append(listener)
+ return listener
+
+ def listen_fault(self, listener):
+ """Registers a function to listen the exceptions.
+
+ It is possible to meet some exceptions during the cache running. You
+ could specific handlers for them.
+
+ This method can be use as a decorator.
+
+ :param listener: A callable object which accepting an exception as its
+ argument.
+ """
+ self._error_listeners.append(listener)
+ return listener
+
+ def get_data(self, path, default=None):
+ """Gets data of a node from cache.
+
+ :param path: The absolute path string.
+ :param default: The default value which will be returned if the node
+ does not exist.
+ :raises ValueError: If the path is outside of this subtree.
+ :returns: A :class:`~kazoo.recipe.cache.NodeData` instance.
+ """
+ node = self._find_node(path)
+ return default if node is None else node._data
+
+ def get_children(self, path, default=None):
+ """Gets node children list from in-memory snapshot.
+
+ :param path: The absolute path string.
+ :param default: The default value which will be returned if the node
+ does not exist.
+ :raises ValueError: If the path is outside of this subtree.
+ :returns: The :class:`frozenset` which including children names.
+ """
+ node = self._find_node(path)
+ return default if node is None else frozenset(node._children)
+
+ def _find_node(self, path):
+ if not path.startswith(self._root._path):
+ raise ValueError('outside of tree')
+ striped_path = path[len(self._root._path):].strip('/')
+ splited_path = [p for p in striped_path.split('/') if p]
+ current_node = self._root
+ for node_name in splited_path:
+ if node_name not in current_node._children:
+ return
+ current_node = current_node._children[node_name]
+ return current_node
+
+ def _publish_event(self, event_type, event_data=None):
+ event = TreeEvent.make(event_type, event_data)
+ if self._state != self.STATE_CLOSED:
+ logger.debug('public event: %r', event)
+ self._in_background(self._do_publish_event, event)
+
+ def _do_publish_event(self, event):
+ for listener in self._event_listeners:
+ with handle_exception(self._error_listeners):
+ listener(event)
+
+ def _in_background(self, func, *args, **kwargs):
+ self._client.handler.callback_queue.put(lambda: func(*args, **kwargs))
+
+ def _session_watcher(self, state):
+ if state == KazooState.SUSPENDED:
+ self._publish_event(TreeEvent.CONNECTION_SUSPENDED)
+ elif state == KazooState.CONNECTED:
+ with handle_exception(self._error_listeners):
+ self._root.on_reconnected()
+ self._publish_event(TreeEvent.CONNECTION_RECONNECTED)
+ elif state == KazooState.LOST:
+ self._is_initialized = False
+ self._publish_event(TreeEvent.CONNECTION_LOST)
+
+
+class TreeNode(object):
+ """The tree node record.
+
+ :param tree: A :class:`~kazoo.recipe.cache.TreeCache` instance.
+ :param path: The path of current node.
+ :param parent: The parent node reference. ``None`` for root node.
+ """
+
+ __slots__ = ('_tree', '_path', '_parent', '_depth', '_children', '_state',
+ '_data')
+
+ STATE_PENDING = 0
+ STATE_LIVE = 1
+ STATE_DEAD = 2
+
+ def __init__(self, tree, path, parent):
+ self._tree = tree
+ self._path = path
+ self._parent = parent
+ self._depth = parent._depth + 1 if parent else 0
+ self._children = {}
+ self._state = self.STATE_PENDING
+ self._data = None
+
+ @classmethod
+ def make_root(cls, tree, path):
+ return cls(tree, path, None)
+
+ def on_reconnected(self):
+ self._refresh()
+ for child in self._children.values():
+ child.on_reconnected()
+
+ def on_created(self):
+ self._refresh()
+
+ def on_deleted(self):
+ old_children, self._children = self._children, {}
+ old_data, self._data = self._data, None
+
+ for old_child in old_children.values():
+ old_child.on_deleted()
+
+ if self._tree._state == self._tree.STATE_CLOSED:
+ return
+
+ old_state, self._state = self._state, self.STATE_DEAD
+ if old_state == self.STATE_LIVE:
+ self._publish_event(TreeEvent.NODE_REMOVED, old_data)
+
+ if self._parent is None:
+ self._call_client('exists', self._path) # root node
+ else:
+ child = self._path[len(self._parent._path) + 1:]
+ if self._parent._children.get(child) is self:
+ del self._parent._children[child]
+
+ def _publish_event(self, *args, **kwargs):
+ return self._tree._publish_event(*args, **kwargs)
+
+ def _refresh(self):
+ self._refresh_data()
+ self._refresh_children()
+
+ def _refresh_data(self):
+ self._call_client('get', self._path)
+
+ def _refresh_children(self):
+ # TODO max-depth checking support
+ self._call_client('get_children', self._path)
+
+ def _call_client(self, method_name, path, *args):
+ self._tree._outstanding_ops += 1
+ callback = functools.partial(
+ self._tree._in_background, self._process_result,
+ method_name, path)
+ kwargs = {'watch': self._process_watch}
+ method = getattr(self._tree._client, method_name + '_async')
+ method(path, *args, **kwargs).rawlink(callback)
+
+ def _process_watch(self, watched_event):
+ logger.debug('process_watch: %r', watched_event)
+ with handle_exception(self._tree._error_listeners):
+ if watched_event.type == EventType.CREATED:
+ assert self._parent is None, 'unexpected CREATED on non-root'
+ self.on_created()
+ elif watched_event.type == EventType.DELETED:
+ self.on_deleted()
+ elif watched_event.type == EventType.CHANGED:
+ self._refresh_data()
+ elif watched_event.type == EventType.CHILD:
+ self._refresh_children()
+
+ def _process_result(self, method_name, path, result):
+ logger.debug('process_result: %s %s', method_name, path)
+ if method_name == 'exists':
+ assert self._parent is None, 'unexpected EXISTS on non-root'
+ # the value of result will be set with `None` if node not exists.
+ if result.get() is not None:
+ if self._state == self.STATE_DEAD:
+ self._state = self.STATE_PENDING
+ self.on_created()
+ elif method_name == 'get_children':
+ try:
+ children = result.get()
+ except NoNodeError:
+ self.on_deleted()
+ else:
+ for child in sorted(children):
+ full_path = os.path.join(path, child)
+ if child not in self._children:
+ node = TreeNode(self._tree, full_path, self)
+ self._children[child] = node
+ node.on_created()
+ elif method_name == 'get':
+ try:
+ data, stat = result.get()
+ except NoNodeError:
+ self.on_deleted()
+ else:
+ old_data, self._data = (
+ self._data, NodeData.make(path, data, stat))
+
+ old_state, self._state = self._state, self.STATE_LIVE
+ if old_state == self.STATE_LIVE:
+ if old_data is None or old_data.stat.mzxid != stat.mzxid:
+ self._publish_event(TreeEvent.NODE_UPDATED, self._data)
+ else:
+ self._publish_event(TreeEvent.NODE_ADDED, self._data)
+ else: # pragma: no cover
+ logger.warning('unknown operation %s', method_name)
+ self._tree._outstanding_ops -= 1
+ return
+
+ self._tree._outstanding_ops -= 1
+ if self._tree._outstanding_ops == 0 and not self._tree._is_initialized:
+ self._tree._is_initialized = True
+ self._publish_event(TreeEvent.INITIALIZED)
+
+
+class TreeEvent(tuple):
+ """The immutable event tuple of cache."""
+
+ NODE_ADDED = 0
+ NODE_UPDATED = 1
+ NODE_REMOVED = 2
+ CONNECTION_SUSPENDED = 3
+ CONNECTION_RECONNECTED = 4
+ CONNECTION_LOST = 5
+ INITIALIZED = 6
+
+ #: An enumerate integer to indicate event type.
+ event_type = property(operator.itemgetter(0))
+
+ #: A :class:`~kazoo.recipe.cache.NodeData` instance.
+ event_data = property(operator.itemgetter(1))
+
+ @classmethod
+ def make(cls, event_type, event_data):
+ """Creates a new TreeEvent tuple.
+
+ :returns: A :class:`~kazoo.recipe.cache.TreeEvent` instance.
+ """
+ assert event_type in (
+ cls.NODE_ADDED, cls.NODE_UPDATED, cls.NODE_REMOVED,
+ cls.CONNECTION_SUSPENDED, cls.CONNECTION_RECONNECTED,
+ cls.CONNECTION_LOST, cls.INITIALIZED)
+ return cls((event_type, event_data))
+
+
+class NodeData(tuple):
+ """The immutable node data tuple of cache."""
+
+ #: The absolute path string of current node.
+ path = property(operator.itemgetter(0))
+
+ #: The bytes data of current node.
+ data = property(operator.itemgetter(1))
+
+ #: The stat information of current node.
+ stat = property(operator.itemgetter(2))
+
+ @classmethod
+ def make(cls, path, data, stat):
+ """Creates a new NodeData tuple.
+
+ :returns: A :class:`~kazoo.recipe.cache.NodeData` instance.
+ """
+ return cls((path, data, stat))
+
+
+@contextlib.contextmanager
+def handle_exception(listeners):
+ try:
+ yield
+ except Exception as e:
+ logger.debug('processing error: %r', e)
+ for listener in listeners:
+ try:
+ listener(e)
+ except: # pragma: no cover
+ logger.exception('Exception handling exception') # oops
+ else:
+ logger.exception('No listener to process %r', e)
diff --git a/kazoo/tests/test_cache.py b/kazoo/tests/test_cache.py
new file mode 100644
index 0000000..1aa03fb
--- /dev/null
+++ b/kazoo/tests/test_cache.py
@@ -0,0 +1,278 @@
+import uuid
+
+from mock import patch, call, Mock
+from nose.tools import eq_, ok_, assert_not_equal, raises
+
+from kazoo.testing import KazooTestCase
+from kazoo.exceptions import KazooException
+from kazoo.recipe.cache import TreeCache, TreeNode, TreeEvent
+
+
+class KazooTreeCacheTests(KazooTestCase):
+
+ def setUp(self):
+ super(KazooTreeCacheTests, self).setUp()
+ self._event_queue = self.client.handler.queue_impl()
+ self._error_queue = self.client.handler.queue_impl()
+ self.path = None
+ self.cache = None
+
+ def tearDown(self):
+ super(KazooTreeCacheTests, self).tearDown()
+ if not self._error_queue.empty():
+ try:
+ raise self._error_queue.get()
+ except FakeException:
+ pass
+
+ def make_cache(self):
+ if self.cache is None:
+ self.path = '/' + uuid.uuid4().hex
+ self.cache = TreeCache(self.client, self.path)
+ self.cache.listen(lambda event: self._event_queue.put(event))
+ self.cache.listen_fault(lambda error: self._error_queue.put(error))
+ self.cache.start()
+ return self.cache
+
+ def wait_cache(self, expect=None, since=None, timeout=10):
+ started = since is None
+ while True:
+ event = self._event_queue.get(timeout=timeout)
+ if started:
+ if expect is not None:
+ eq_(event.event_type, expect)
+ return event
+ if event.event_type == since:
+ started = True
+ if expect is None:
+ return
+
+ def spy_client(self, method_name):
+ method = getattr(self.client, method_name)
+ return patch.object(self.client, method_name, wraps=method)
+
+ def test_start(self):
+ self.make_cache()
+ self.wait_cache(since=TreeEvent.INITIALIZED)
+
+ stat = self.client.exists(self.path)
+ eq_(stat.version, 0)
+
+ eq_(self.cache._state, TreeCache.STATE_STARTED)
+ eq_(self.cache._root._state, TreeNode.STATE_LIVE)
+
+ @raises(KazooException)
+ def test_start_started(self):
+ self.make_cache()
+ self.cache.start()
+
+ @raises(KazooException)
+ def test_start_closed(self):
+ self.make_cache()
+ self.cache.start()
+ self.cache.close()
+ self.cache.start()
+
+ def test_close(self):
+ self.make_cache()
+ self.wait_cache(since=TreeEvent.INITIALIZED)
+ self.client.create(self.path + '/foo/bar/baz', makepath=True)
+ for _ in range(3):
+ self.wait_cache(TreeEvent.NODE_ADDED)
+
+ self.cache.close()
+
+ # nothing should be published since tree closed
+ ok_(self._event_queue.empty())
+
+ # tree should be empty
+ eq_(self.cache._root._children, {})
+ eq_(self.cache._root._data, None)
+ eq_(self.cache._state, TreeCache.STATE_CLOSED)
+
+ # node state should not be changed
+ assert_not_equal(self.cache._root._state, TreeNode.STATE_DEAD)
+
+ def test_children_operation(self):
+ self.make_cache()
+ self.wait_cache(since=TreeEvent.INITIALIZED)
+
+ self.client.create(self.path + '/test_children', b'test_children_1')
+ event = self.wait_cache(TreeEvent.NODE_ADDED)
+ eq_(event.event_type, TreeEvent.NODE_ADDED)
+ eq_(event.event_data.path, self.path + '/test_children')
+ eq_(event.event_data.data, b'test_children_1')
+ eq_(event.event_data.stat.version, 0)
+
+ self.client.set(self.path + '/test_children', b'test_children_2')
+ event = self.wait_cache(TreeEvent.NODE_UPDATED)
+ eq_(event.event_type, TreeEvent.NODE_UPDATED)
+ eq_(event.event_data.path, self.path + '/test_children')
+ eq_(event.event_data.data, b'test_children_2')
+ eq_(event.event_data.stat.version, 1)
+
+ self.client.delete(self.path + '/test_children')
+ event = self.wait_cache(TreeEvent.NODE_REMOVED)
+ eq_(event.event_type, TreeEvent.NODE_REMOVED)
+ eq_(event.event_data.path, self.path + '/test_children')
+ eq_(event.event_data.data, b'test_children_2')
+ eq_(event.event_data.stat.version, 1)
+
+ def test_subtree_operation(self):
+ self.make_cache()
+ self.wait_cache(since=TreeEvent.INITIALIZED)
+
+ self.client.create(self.path + '/foo/bar/baz', makepath=True)
+ for relative_path in ('/foo', '/foo/bar', '/foo/bar/baz'):
+ event = self.wait_cache(TreeEvent.NODE_ADDED)
+ eq_(event.event_type, TreeEvent.NODE_ADDED)
+ eq_(event.event_data.path, self.path + relative_path)
+ eq_(event.event_data.data, b'')
+ eq_(event.event_data.stat.version, 0)
+
+ self.client.delete(self.path + '/foo', recursive=True)
+ for relative_path in ('/foo/bar/baz', '/foo/bar', '/foo'):
+ event = self.wait_cache(TreeEvent.NODE_REMOVED)
+ eq_(event.event_type, TreeEvent.NODE_REMOVED)
+ eq_(event.event_data.path, self.path + relative_path)
+
+ def test_get_data(self):
+ cache = self.make_cache()
+ self.wait_cache(since=TreeEvent.INITIALIZED)
+ self.client.create(self.path + '/foo/bar/baz', b'@', makepath=True)
+ self.wait_cache(TreeEvent.NODE_ADDED)
+ self.wait_cache(TreeEvent.NODE_ADDED)
+ self.wait_cache(TreeEvent.NODE_ADDED)
+
+ with patch.object(cache, '_client'): # disable any remote operation
+ eq_(cache.get_data(self.path).data, b'')
+ eq_(cache.get_data(self.path).stat.version, 0)
+
+ eq_(cache.get_data(self.path + '/foo').data, b'')
+ eq_(cache.get_data(self.path + '/foo').stat.version, 0)
+
+ eq_(cache.get_data(self.path + '/foo/bar').data, b'')
+ eq_(cache.get_data(self.path + '/foo/bar').stat.version, 0)
+
+ eq_(cache.get_data(self.path + '/foo/bar/baz').data, b'@')
+ eq_(cache.get_data(self.path + '/foo/bar/baz').stat.version, 0)
+
+ def test_get_children(self):
+ cache = self.make_cache()
+ self.wait_cache(since=TreeEvent.INITIALIZED)
+ self.client.create(self.path + '/foo/bar/baz', b'@', makepath=True)
+ self.wait_cache(TreeEvent.NODE_ADDED)
+ self.wait_cache(TreeEvent.NODE_ADDED)
+ self.wait_cache(TreeEvent.NODE_ADDED)
+
+ with patch.object(cache, '_client'): # disable any remote operation
+ eq_(cache.get_children(self.path + '/foo/bar/baz'), frozenset())
+ eq_(cache.get_children(self.path + '/foo/bar'), frozenset(['baz']))
+ eq_(cache.get_children(self.path + '/foo'), frozenset(['bar']))
+ eq_(cache.get_children(self.path), frozenset(['foo']))
+
+ @raises(ValueError)
+ def test_get_data_out_of_tree(self):
+ self.make_cache()
+ self.wait_cache(since=TreeEvent.INITIALIZED)
+ self.cache.get_data('/out_of_tree')
+
+ @raises(ValueError)
+ def test_get_children_out_of_tree(self):
+ self.make_cache()
+ self.wait_cache(since=TreeEvent.INITIALIZED)
+ self.cache.get_children('/out_of_tree')
+
+ def test_get_data_no_node(self):
+ cache = self.make_cache()
+ self.wait_cache(since=TreeEvent.INITIALIZED)
+
+ with patch.object(cache, '_client'): # disable any remote operation
+ eq_(cache.get_data(self.path + '/non_exists'), None)
+
+ def test_get_children_no_node(self):
+ cache = self.make_cache()
+ self.wait_cache(since=TreeEvent.INITIALIZED)
+
+ with patch.object(cache, '_client'): # disable any remote operation
+ eq_(cache.get_children(self.path + '/non_exists'), None)
+
+ def test_session_reconnected(self):
+ self.make_cache()
+ self.wait_cache(since=TreeEvent.INITIALIZED)
+
+ self.client.create(self.path + '/foo')
+ event = self.wait_cache(TreeEvent.NODE_ADDED)
+ eq_(event.event_data.path, self.path + '/foo')
+
+ with self.spy_client('get_async') as get_data:
+ with self.spy_client('get_children_async') as get_children:
+ # session suspended
+ self.lose_connection(self.client.handler.event_object)
+ self.wait_cache(TreeEvent.CONNECTION_SUSPENDED)
+
+ # There are a serial refreshing operation here. But NODE_ADDED
+ # events will not be raised because the zxid of nodes are the
+ # same during reconnecting.
+
+ # connection restore
+ self.wait_cache(TreeEvent.CONNECTION_RECONNECTED)
+
+ # wait for outstanding operations
+ while self.cache._outstanding_ops > 0:
+ self.client.handler.sleep_func(0.1)
+
+ # inspect in-memory nodes
+ _node_root = self.cache._root
+ _node_foo = self.cache._root._children['foo']
+
+ # make sure that all nodes are refreshed
+ get_data.assert_has_calls([
+ call(self.path, watch=_node_root._process_watch),
+ call(self.path + '/foo', watch=_node_foo._process_watch),
+ ], any_order=True)
+ get_children.assert_has_calls([
+ call(self.path, watch=_node_root._process_watch),
+ call(self.path + '/foo', watch=_node_foo._process_watch),
+ ], any_order=True)
+
+ def test_root_recreated(self):
+ self.make_cache()
+ self.wait_cache(since=TreeEvent.INITIALIZED)
+
+ # remove root node
+ self.client.delete(self.path)
+ event = self.wait_cache(TreeEvent.NODE_REMOVED)
+ eq_(event.event_type, TreeEvent.NODE_REMOVED)
+ eq_(event.event_data.data, b'')
+ eq_(event.event_data.path, self.path)
+ eq_(event.event_data.stat.version, 0)
+
+ # re-create root node
+ self.client.ensure_path(self.path)
+ event = self.wait_cache(TreeEvent.NODE_ADDED)
+ eq_(event.event_type, TreeEvent.NODE_ADDED)
+ eq_(event.event_data.data, b'')
+ eq_(event.event_data.path, self.path)
+ eq_(event.event_data.stat.version, 0)
+
+ self.assertTrue(
+ self.cache._outstanding_ops >= 0,
+ 'unexpected outstanding ops %r' % self.cache._outstanding_ops)
+
+ def test_exception_handler(self):
+ error_value = FakeException()
+ error_handler = Mock()
+
+ with patch.object(TreeNode, 'on_deleted') as on_deleted:
+ on_deleted.side_effect = [error_value]
+
+ self.make_cache()
+ self.cache.listen_fault(error_handler)
+
+ self.cache.close()
+ error_handler.assert_called_once_with(error_value)
+
+
+class FakeException(Exception):
+ pass