summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJiangge Zhang <tonyseek@gmail.com>2017-12-11 17:10:58 +0800
committerJeff Widman <jeff@jeffwidman.com>2018-03-23 02:01:50 -0700
commitdb0c2d4f8ab5ecfb367b7b2accfd9c52c1c91fcd (patch)
tree4dee950180040aad0c013f758edfbff78bf5b044
parent4456f180735a0f8520bfc42474de9d27fa01bb2c (diff)
downloadkazoo-db0c2d4f8ab5ecfb367b7b2accfd9c52c1c91fcd.tar.gz
fix(recipe): Unexpected exceptions break TreeCache
-rw-r--r--kazoo/recipe/cache.py25
-rw-r--r--kazoo/tests/test_cache.py19
2 files changed, 30 insertions, 14 deletions
diff --git a/kazoo/recipe/cache.py b/kazoo/recipe/cache.py
index 65fad85..1d6fad1 100644
--- a/kazoo/recipe/cache.py
+++ b/kazoo/recipe/cache.py
@@ -268,14 +268,14 @@ class TreeNode(object):
# TODO max-depth checking support
self._call_client('get_children', self._path)
- def _call_client(self, method_name, path, *args):
+ def _call_client(self, method_name, path):
+ assert method_name in ('get', 'get_children', 'exists')
self._tree._outstanding_ops += 1
callback = functools.partial(
self._tree._in_background, self._process_result,
method_name, path)
- kwargs = {'watch': self._process_watch}
method = getattr(self._tree._client, method_name + '_async')
- method(path, *args, **kwargs).rawlink(callback)
+ method(path, watch=self._process_watch).rawlink(callback)
def _process_watch(self, watched_event):
logger.debug('process_watch: %r', watched_event)
@@ -294,38 +294,35 @@ class TreeNode(object):
logger.debug('process_result: %s %s', method_name, path)
if method_name == 'exists':
assert self._parent is None, 'unexpected EXISTS on non-root'
- # the value of result will be set with `None` if node not exists.
- if result.get() is not None:
+ # The result will be `None` if the node doesn't exist.
+ if result.successful() and result.get() is not None:
if self._state == self.STATE_DEAD:
self._state = self.STATE_PENDING
self.on_created()
elif method_name == 'get_children':
- try:
+ if result.successful():
children = result.get()
- except NoNodeError:
- self.on_deleted()
- else:
for child in sorted(children):
full_path = os.path.join(path, child)
if child not in self._children:
node = TreeNode(self._tree, full_path, self)
self._children[child] = node
node.on_created()
+ elif isinstance(result.exception, NoNodeError):
+ self.on_deleted()
elif method_name == 'get':
- try:
+ if result.successful():
data, stat = result.get()
- except NoNodeError:
- self.on_deleted()
- else:
old_data, self._data = (
self._data, NodeData.make(path, data, stat))
-
old_state, self._state = self._state, self.STATE_LIVE
if old_state == self.STATE_LIVE:
if old_data is None or old_data.stat.mzxid != stat.mzxid:
self._publish_event(TreeEvent.NODE_UPDATED, self._data)
else:
self._publish_event(TreeEvent.NODE_ADDED, self._data)
+ elif isinstance(result.exception, NoNodeError):
+ self.on_deleted()
else: # pragma: no cover
logger.warning('unknown operation %s', method_name)
self._tree._outstanding_ops -= 1
diff --git a/kazoo/tests/test_cache.py b/kazoo/tests/test_cache.py
index 1aa03fb..0cf0227 100644
--- a/kazoo/tests/test_cache.py
+++ b/kazoo/tests/test_cache.py
@@ -273,6 +273,25 @@ class KazooTreeCacheTests(KazooTestCase):
self.cache.close()
error_handler.assert_called_once_with(error_value)
+ def test_exception_suppressed(self):
+ self.make_cache()
+ self.wait_cache(since=TreeEvent.INITIALIZED)
+
+ # stoke up ConnectionClosedError
+ self.client.stop()
+ self.client.close()
+ self.client.handler.start() # keep the async completion
+ self.wait_cache(since=TreeEvent.CONNECTION_LOST)
+
+ with patch.object(TreeNode, 'on_created') as on_created:
+ self.cache._root._call_client('exists', '/')
+ self.cache._root._call_client('get', '/')
+ self.cache._root._call_client('get_children', '/')
+
+ self.wait_cache(since=TreeEvent.INITIALIZED)
+ on_created.assert_not_called()
+ eq_(self.cache._outstanding_ops, 0)
+
class FakeException(Exception):
pass