summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBen Bangert <ben@groovie.org>2017-05-31 16:55:30 -0700
committerBen Bangert <ben@groovie.org>2017-05-31 17:21:16 -0700
commit3a657b9f93df418fe437507c259f32752469c9df (patch)
tree58b9293072319fa97aed55dc512b8412abe461b3
parent3cc86ac06e20722d0ba78582abed2b2a63a7a2aa (diff)
downloadkazoo-3a657b9f93df418fe437507c259f32752469c9df.tar.gz
Revert "New recipe proposal: TreeCache"
-rw-r--r--docs/api.rst1
-rw-r--r--docs/api/recipe/cache.rst26
-rw-r--r--kazoo/recipe/cache.py389
-rw-r--r--kazoo/tests/test_cache.py278
4 files changed, 0 insertions, 694 deletions
diff --git a/docs/api.rst b/docs/api.rst
index ff921c0..744b012 100644
--- a/docs/api.rst
+++ b/docs/api.rst
@@ -16,7 +16,6 @@ organized alphabetically by module name.
api/interfaces
api/protocol/states
api/recipe/barrier
- api/recipe/cache
api/recipe/counter
api/recipe/election
api/recipe/lease
diff --git a/docs/api/recipe/cache.rst b/docs/api/recipe/cache.rst
deleted file mode 100644
index fb7f372..0000000
--- a/docs/api/recipe/cache.rst
+++ /dev/null
@@ -1,26 +0,0 @@
-.. _cache_module:
-
-:mod:`kazoo.recipe.cache`
-----------------------------
-
-.. automodule:: kazoo.recipe.cache
-
-Public API
-++++++++++
-
- .. autoclass:: TreeCache
-
- .. automethod:: start
- .. automethod:: close
- .. automethod:: listen
- .. automethod:: listen_fault
- .. automethod:: get_data
- .. automethod:: get_children
-
- .. autoclass:: TreeEvent
- :members:
- :show-inheritance:
-
- .. autoclass:: NodeData
- :members:
- :show-inheritance:
diff --git a/kazoo/recipe/cache.py b/kazoo/recipe/cache.py
deleted file mode 100644
index 1e59f28..0000000
--- a/kazoo/recipe/cache.py
+++ /dev/null
@@ -1,389 +0,0 @@
-"""TreeCache
-
-:Maintainer: Jiangge Zhang <tonyseek@gmail.com>
-:Maintainer: Haochuan Guo <guohaochuan@gmail.com>
-:Maintainer: Tianwen Zhang <mail2tevin@gmail.com>
-:Status: Alpha
-
-A port of the Apache Curator's TreeCache recipe. It builds an in-memory cache
-of a subtree in ZooKeeper and keeps it up-to-date.
-
-See also: http://curator.apache.org/curator-recipes/tree-cache.html
-"""
-
-from __future__ import absolute_import
-
-import os
-import logging
-import contextlib
-import functools
-import operator
-
-from kazoo.exceptions import NoNodeError, KazooException
-from kazoo.protocol.states import KazooState, EventType
-
-
-logger = logging.getLogger(__name__)
-
-
-class TreeCache(object):
- """The cache of a ZooKeeper subtree.
-
- :param client: A :class:`~kazoo.client.KazooClient` instance.
- :param path: The root path of subtree.
- """
-
- STATE_LATENT = 0
- STATE_STARTED = 1
- STATE_CLOSED = 2
-
- def __init__(self, client, path):
- self._client = client
- self._root = TreeNode.make_root(self, path)
- self._state = self.STATE_LATENT
- self._outstanding_ops = 0
- self._is_initialized = False
- self._error_listeners = []
- self._event_listeners = []
-
- def start(self):
- """Starts the cache.
-
- The cache is not started automatically. You must call this method.
-
- After a cache started, all changes of subtree will be synchronized
- from the ZooKeeper server. Events will be fired for those activity.
-
- See also :meth:`~TreeCache.listen`.
-
- .. note::
-
- This method is not thread safe.
- """
- if self._state == self.STATE_LATENT:
- self._state = self.STATE_STARTED
- elif self._state == self.STATE_CLOSED:
- raise KazooException('already closed')
- else:
- raise KazooException('already started')
-
- self._client.add_listener(self._session_watcher)
- self._client.ensure_path(self._root._path)
-
- if self._client.connected:
- self._root.on_created()
-
- def close(self):
- """Closes the cache.
-
- A closed cache was detached from ZooKeeper's changes. And all nodes
- will be invalidated.
-
- Once a tree cache was closed, it could not be started again. You should
- only close a tree cache while you want to recycle it.
-
- .. note::
-
- This method is not thread safe.
- """
- if self._state == self.STATE_STARTED:
- self._state = self.STATE_CLOSED
- self._client.remove_listener(self._session_watcher)
- with handle_exception(self._error_listeners):
- self._root.on_deleted()
-
- def listen(self, listener):
- """Registers a function to listen the cache events.
-
- The cache events are changes of local data. They are delivered from
- watching notifications in ZooKeeper session.
-
- This method can be use as a decorator.
-
- :param listener: A callable object which accepting a
- :class:`~kazoo.recipe.cache.TreeEvent` instance as
- its argument.
- """
- self._event_listeners.append(listener)
- return listener
-
- def listen_fault(self, listener):
- """Registers a function to listen the exceptions.
-
- It is possible to meet some exceptions during the cache running. You
- could specific handlers for them.
-
- This method can be use as a decorator.
-
- :param listener: A callable object which accepting an exception as its
- argument.
- """
- self._error_listeners.append(listener)
- return listener
-
- def get_data(self, path, default=None):
- """Gets data of a node from cache.
-
- :param path: The absolute path string.
- :param default: The default value which will be returned if the node
- does not exist.
- :raises ValueError: If the path is outside of this subtree.
- :returns: A :class:`~kazoo.recipe.cache.NodeData` instance.
- """
- node = self._find_node(path)
- return default if node is None else node._data
-
- def get_children(self, path, default=None):
- """Gets node children list from in-memory snapshot.
-
- :param path: The absolute path string.
- :param default: The default value which will be returned if the node
- does not exist.
- :raises ValueError: If the path is outside of this subtree.
- :returns: The :class:`frozenset` which including children names.
- """
- node = self._find_node(path)
- return default if node is None else frozenset(node._children)
-
- def _find_node(self, path):
- if not path.startswith(self._root._path):
- raise ValueError('outside of tree')
- striped_path = path[len(self._root._path):].strip('/')
- splited_path = [p for p in striped_path.split('/') if p]
- current_node = self._root
- for node_name in splited_path:
- if node_name not in current_node._children:
- return
- current_node = current_node._children[node_name]
- return current_node
-
- def _publish_event(self, event_type, event_data=None):
- event = TreeEvent.make(event_type, event_data)
- if self._state != self.STATE_CLOSED:
- logger.debug('public event: %r', event)
- self._in_background(self._do_publish_event, event)
-
- def _do_publish_event(self, event):
- for listener in self._event_listeners:
- with handle_exception(self._error_listeners):
- listener(event)
-
- def _in_background(self, func, *args, **kwargs):
- self._client.handler.callback_queue.put(lambda: func(*args, **kwargs))
-
- def _session_watcher(self, state):
- if state == KazooState.SUSPENDED:
- self._publish_event(TreeEvent.CONNECTION_SUSPENDED)
- elif state == KazooState.CONNECTED:
- with handle_exception(self._error_listeners):
- self._root.on_reconnected()
- self._publish_event(TreeEvent.CONNECTION_RECONNECTED)
- elif state == KazooState.LOST:
- self._is_initialized = False
- self._publish_event(TreeEvent.CONNECTION_LOST)
-
-
-class TreeNode(object):
- """The tree node record.
-
- :param tree: A :class:`~kazoo.recipe.cache.TreeCache` instance.
- :param path: The path of current node.
- :param parent: The parent node reference. ``None`` for root node.
- """
-
- __slots__ = ('_tree', '_path', '_parent', '_depth', '_children', '_state',
- '_data')
-
- STATE_PENDING = 0
- STATE_LIVE = 1
- STATE_DEAD = 2
-
- def __init__(self, tree, path, parent):
- self._tree = tree
- self._path = path
- self._parent = parent
- self._depth = parent._depth + 1 if parent else 0
- self._children = {}
- self._state = self.STATE_PENDING
- self._data = None
-
- @classmethod
- def make_root(cls, tree, path):
- return cls(tree, path, None)
-
- def on_reconnected(self):
- self._refresh()
- for child in self._children.values():
- child.on_reconnected()
-
- def on_created(self):
- self._refresh()
-
- def on_deleted(self):
- old_children, self._children = self._children, {}
- old_data, self._data = self._data, None
-
- for old_child in old_children.values():
- old_child.on_deleted()
-
- if self._tree._state == self._tree.STATE_CLOSED:
- return
-
- old_state, self._state = self._state, self.STATE_DEAD
- if old_state == self.STATE_LIVE:
- self._publish_event(TreeEvent.NODE_REMOVED, old_data)
-
- if self._parent is None:
- self._call_client('exists', self._path) # root node
- else:
- child = self._path[len(self._parent._path) + 1:]
- if self._parent._children.get(child) is self:
- del self._parent._children[child]
-
- def _publish_event(self, *args, **kwargs):
- return self._tree._publish_event(*args, **kwargs)
-
- def _refresh(self):
- self._refresh_data()
- self._refresh_children()
-
- def _refresh_data(self):
- self._call_client('get', self._path)
-
- def _refresh_children(self):
- # TODO max-depth checking support
- self._call_client('get_children', self._path)
-
- def _call_client(self, method_name, path, *args):
- self._tree._outstanding_ops += 1
- callback = functools.partial(
- self._tree._in_background, self._process_result,
- method_name, path)
- kwargs = {'watch': self._process_watch}
- method = getattr(self._tree._client, method_name + '_async')
- method(path, *args, **kwargs).rawlink(callback)
-
- def _process_watch(self, watched_event):
- logger.debug('process_watch: %r', watched_event)
- with handle_exception(self._tree._error_listeners):
- if watched_event.type == EventType.CREATED:
- assert self._parent is None, 'unexpected CREATED on non-root'
- self.on_created()
- elif watched_event.type == EventType.DELETED:
- self.on_deleted()
- elif watched_event.type == EventType.CHANGED:
- self._refresh_data()
- elif watched_event.type == EventType.CHILD:
- self._refresh_children()
-
- def _process_result(self, method_name, path, result):
- logger.debug('process_result: %s %s', method_name, path)
- if method_name == 'exists':
- assert self._parent is None, 'unexpected EXISTS on non-root'
- # the value of result will be set with `None` if node not exists.
- if result.get() is not None:
- if self._state == self.STATE_DEAD:
- self._state = self.STATE_PENDING
- self.on_created()
- elif method_name == 'get_children':
- try:
- children = result.get()
- except NoNodeError:
- self.on_deleted()
- else:
- for child in sorted(children):
- full_path = os.path.join(path, child)
- if child not in self._children:
- node = TreeNode(self._tree, full_path, self)
- self._children[child] = node
- node.on_created()
- elif method_name == 'get':
- try:
- data, stat = result.get()
- except NoNodeError:
- self.on_deleted()
- else:
- old_data, self._data = (
- self._data, NodeData.make(path, data, stat))
-
- old_state, self._state = self._state, self.STATE_LIVE
- if old_state == self.STATE_LIVE:
- if old_data is None or old_data.stat.mzxid != stat.mzxid:
- self._publish_event(TreeEvent.NODE_UPDATED, self._data)
- else:
- self._publish_event(TreeEvent.NODE_ADDED, self._data)
- else: # pragma: no cover
- logger.warning('unknown operation %s', method_name)
- self._tree._outstanding_ops -= 1
- return
-
- self._tree._outstanding_ops -= 1
- if self._tree._outstanding_ops == 0 and not self._tree._is_initialized:
- self._tree._is_initialized = True
- self._publish_event(TreeEvent.INITIALIZED)
-
-
-class TreeEvent(tuple):
- """The immutable event tuple of cache."""
-
- NODE_ADDED = 0
- NODE_UPDATED = 1
- NODE_REMOVED = 2
- CONNECTION_SUSPENDED = 3
- CONNECTION_RECONNECTED = 4
- CONNECTION_LOST = 5
- INITIALIZED = 6
-
- #: An enumerate integer to indicate event type.
- event_type = property(operator.itemgetter(0))
-
- #: A :class:`~kazoo.recipe.cache.NodeData` instance.
- event_data = property(operator.itemgetter(1))
-
- @classmethod
- def make(cls, event_type, event_data):
- """Creates a new TreeEvent tuple.
-
- :returns: A :class:`~kazoo.recipe.cache.TreeEvent` instance.
- """
- assert event_type in (
- cls.NODE_ADDED, cls.NODE_UPDATED, cls.NODE_REMOVED,
- cls.CONNECTION_SUSPENDED, cls.CONNECTION_RECONNECTED,
- cls.CONNECTION_LOST, cls.INITIALIZED)
- return cls((event_type, event_data))
-
-
-class NodeData(tuple):
- """The immutable node data tuple of cache."""
-
- #: The absolute path string of current node.
- path = property(operator.itemgetter(0))
-
- #: The bytes data of current node.
- data = property(operator.itemgetter(1))
-
- #: The stat information of current node.
- stat = property(operator.itemgetter(2))
-
- @classmethod
- def make(cls, path, data, stat):
- """Creates a new NodeData tuple.
-
- :returns: A :class:`~kazoo.recipe.cache.NodeData` instance.
- """
- return cls((path, data, stat))
-
-
-@contextlib.contextmanager
-def handle_exception(listeners):
- try:
- yield
- except Exception as e:
- logger.debug('processing error: %r', e)
- for listener in listeners:
- try:
- listener(e)
- except: # pragma: no cover
- logger.exception('Exception handling exception') # oops
- else:
- logger.exception('No listener to process %r', e)
diff --git a/kazoo/tests/test_cache.py b/kazoo/tests/test_cache.py
deleted file mode 100644
index 1aa03fb..0000000
--- a/kazoo/tests/test_cache.py
+++ /dev/null
@@ -1,278 +0,0 @@
-import uuid
-
-from mock import patch, call, Mock
-from nose.tools import eq_, ok_, assert_not_equal, raises
-
-from kazoo.testing import KazooTestCase
-from kazoo.exceptions import KazooException
-from kazoo.recipe.cache import TreeCache, TreeNode, TreeEvent
-
-
-class KazooTreeCacheTests(KazooTestCase):
-
- def setUp(self):
- super(KazooTreeCacheTests, self).setUp()
- self._event_queue = self.client.handler.queue_impl()
- self._error_queue = self.client.handler.queue_impl()
- self.path = None
- self.cache = None
-
- def tearDown(self):
- super(KazooTreeCacheTests, self).tearDown()
- if not self._error_queue.empty():
- try:
- raise self._error_queue.get()
- except FakeException:
- pass
-
- def make_cache(self):
- if self.cache is None:
- self.path = '/' + uuid.uuid4().hex
- self.cache = TreeCache(self.client, self.path)
- self.cache.listen(lambda event: self._event_queue.put(event))
- self.cache.listen_fault(lambda error: self._error_queue.put(error))
- self.cache.start()
- return self.cache
-
- def wait_cache(self, expect=None, since=None, timeout=10):
- started = since is None
- while True:
- event = self._event_queue.get(timeout=timeout)
- if started:
- if expect is not None:
- eq_(event.event_type, expect)
- return event
- if event.event_type == since:
- started = True
- if expect is None:
- return
-
- def spy_client(self, method_name):
- method = getattr(self.client, method_name)
- return patch.object(self.client, method_name, wraps=method)
-
- def test_start(self):
- self.make_cache()
- self.wait_cache(since=TreeEvent.INITIALIZED)
-
- stat = self.client.exists(self.path)
- eq_(stat.version, 0)
-
- eq_(self.cache._state, TreeCache.STATE_STARTED)
- eq_(self.cache._root._state, TreeNode.STATE_LIVE)
-
- @raises(KazooException)
- def test_start_started(self):
- self.make_cache()
- self.cache.start()
-
- @raises(KazooException)
- def test_start_closed(self):
- self.make_cache()
- self.cache.start()
- self.cache.close()
- self.cache.start()
-
- def test_close(self):
- self.make_cache()
- self.wait_cache(since=TreeEvent.INITIALIZED)
- self.client.create(self.path + '/foo/bar/baz', makepath=True)
- for _ in range(3):
- self.wait_cache(TreeEvent.NODE_ADDED)
-
- self.cache.close()
-
- # nothing should be published since tree closed
- ok_(self._event_queue.empty())
-
- # tree should be empty
- eq_(self.cache._root._children, {})
- eq_(self.cache._root._data, None)
- eq_(self.cache._state, TreeCache.STATE_CLOSED)
-
- # node state should not be changed
- assert_not_equal(self.cache._root._state, TreeNode.STATE_DEAD)
-
- def test_children_operation(self):
- self.make_cache()
- self.wait_cache(since=TreeEvent.INITIALIZED)
-
- self.client.create(self.path + '/test_children', b'test_children_1')
- event = self.wait_cache(TreeEvent.NODE_ADDED)
- eq_(event.event_type, TreeEvent.NODE_ADDED)
- eq_(event.event_data.path, self.path + '/test_children')
- eq_(event.event_data.data, b'test_children_1')
- eq_(event.event_data.stat.version, 0)
-
- self.client.set(self.path + '/test_children', b'test_children_2')
- event = self.wait_cache(TreeEvent.NODE_UPDATED)
- eq_(event.event_type, TreeEvent.NODE_UPDATED)
- eq_(event.event_data.path, self.path + '/test_children')
- eq_(event.event_data.data, b'test_children_2')
- eq_(event.event_data.stat.version, 1)
-
- self.client.delete(self.path + '/test_children')
- event = self.wait_cache(TreeEvent.NODE_REMOVED)
- eq_(event.event_type, TreeEvent.NODE_REMOVED)
- eq_(event.event_data.path, self.path + '/test_children')
- eq_(event.event_data.data, b'test_children_2')
- eq_(event.event_data.stat.version, 1)
-
- def test_subtree_operation(self):
- self.make_cache()
- self.wait_cache(since=TreeEvent.INITIALIZED)
-
- self.client.create(self.path + '/foo/bar/baz', makepath=True)
- for relative_path in ('/foo', '/foo/bar', '/foo/bar/baz'):
- event = self.wait_cache(TreeEvent.NODE_ADDED)
- eq_(event.event_type, TreeEvent.NODE_ADDED)
- eq_(event.event_data.path, self.path + relative_path)
- eq_(event.event_data.data, b'')
- eq_(event.event_data.stat.version, 0)
-
- self.client.delete(self.path + '/foo', recursive=True)
- for relative_path in ('/foo/bar/baz', '/foo/bar', '/foo'):
- event = self.wait_cache(TreeEvent.NODE_REMOVED)
- eq_(event.event_type, TreeEvent.NODE_REMOVED)
- eq_(event.event_data.path, self.path + relative_path)
-
- def test_get_data(self):
- cache = self.make_cache()
- self.wait_cache(since=TreeEvent.INITIALIZED)
- self.client.create(self.path + '/foo/bar/baz', b'@', makepath=True)
- self.wait_cache(TreeEvent.NODE_ADDED)
- self.wait_cache(TreeEvent.NODE_ADDED)
- self.wait_cache(TreeEvent.NODE_ADDED)
-
- with patch.object(cache, '_client'): # disable any remote operation
- eq_(cache.get_data(self.path).data, b'')
- eq_(cache.get_data(self.path).stat.version, 0)
-
- eq_(cache.get_data(self.path + '/foo').data, b'')
- eq_(cache.get_data(self.path + '/foo').stat.version, 0)
-
- eq_(cache.get_data(self.path + '/foo/bar').data, b'')
- eq_(cache.get_data(self.path + '/foo/bar').stat.version, 0)
-
- eq_(cache.get_data(self.path + '/foo/bar/baz').data, b'@')
- eq_(cache.get_data(self.path + '/foo/bar/baz').stat.version, 0)
-
- def test_get_children(self):
- cache = self.make_cache()
- self.wait_cache(since=TreeEvent.INITIALIZED)
- self.client.create(self.path + '/foo/bar/baz', b'@', makepath=True)
- self.wait_cache(TreeEvent.NODE_ADDED)
- self.wait_cache(TreeEvent.NODE_ADDED)
- self.wait_cache(TreeEvent.NODE_ADDED)
-
- with patch.object(cache, '_client'): # disable any remote operation
- eq_(cache.get_children(self.path + '/foo/bar/baz'), frozenset())
- eq_(cache.get_children(self.path + '/foo/bar'), frozenset(['baz']))
- eq_(cache.get_children(self.path + '/foo'), frozenset(['bar']))
- eq_(cache.get_children(self.path), frozenset(['foo']))
-
- @raises(ValueError)
- def test_get_data_out_of_tree(self):
- self.make_cache()
- self.wait_cache(since=TreeEvent.INITIALIZED)
- self.cache.get_data('/out_of_tree')
-
- @raises(ValueError)
- def test_get_children_out_of_tree(self):
- self.make_cache()
- self.wait_cache(since=TreeEvent.INITIALIZED)
- self.cache.get_children('/out_of_tree')
-
- def test_get_data_no_node(self):
- cache = self.make_cache()
- self.wait_cache(since=TreeEvent.INITIALIZED)
-
- with patch.object(cache, '_client'): # disable any remote operation
- eq_(cache.get_data(self.path + '/non_exists'), None)
-
- def test_get_children_no_node(self):
- cache = self.make_cache()
- self.wait_cache(since=TreeEvent.INITIALIZED)
-
- with patch.object(cache, '_client'): # disable any remote operation
- eq_(cache.get_children(self.path + '/non_exists'), None)
-
- def test_session_reconnected(self):
- self.make_cache()
- self.wait_cache(since=TreeEvent.INITIALIZED)
-
- self.client.create(self.path + '/foo')
- event = self.wait_cache(TreeEvent.NODE_ADDED)
- eq_(event.event_data.path, self.path + '/foo')
-
- with self.spy_client('get_async') as get_data:
- with self.spy_client('get_children_async') as get_children:
- # session suspended
- self.lose_connection(self.client.handler.event_object)
- self.wait_cache(TreeEvent.CONNECTION_SUSPENDED)
-
- # There are a serial refreshing operation here. But NODE_ADDED
- # events will not be raised because the zxid of nodes are the
- # same during reconnecting.
-
- # connection restore
- self.wait_cache(TreeEvent.CONNECTION_RECONNECTED)
-
- # wait for outstanding operations
- while self.cache._outstanding_ops > 0:
- self.client.handler.sleep_func(0.1)
-
- # inspect in-memory nodes
- _node_root = self.cache._root
- _node_foo = self.cache._root._children['foo']
-
- # make sure that all nodes are refreshed
- get_data.assert_has_calls([
- call(self.path, watch=_node_root._process_watch),
- call(self.path + '/foo', watch=_node_foo._process_watch),
- ], any_order=True)
- get_children.assert_has_calls([
- call(self.path, watch=_node_root._process_watch),
- call(self.path + '/foo', watch=_node_foo._process_watch),
- ], any_order=True)
-
- def test_root_recreated(self):
- self.make_cache()
- self.wait_cache(since=TreeEvent.INITIALIZED)
-
- # remove root node
- self.client.delete(self.path)
- event = self.wait_cache(TreeEvent.NODE_REMOVED)
- eq_(event.event_type, TreeEvent.NODE_REMOVED)
- eq_(event.event_data.data, b'')
- eq_(event.event_data.path, self.path)
- eq_(event.event_data.stat.version, 0)
-
- # re-create root node
- self.client.ensure_path(self.path)
- event = self.wait_cache(TreeEvent.NODE_ADDED)
- eq_(event.event_type, TreeEvent.NODE_ADDED)
- eq_(event.event_data.data, b'')
- eq_(event.event_data.path, self.path)
- eq_(event.event_data.stat.version, 0)
-
- self.assertTrue(
- self.cache._outstanding_ops >= 0,
- 'unexpected outstanding ops %r' % self.cache._outstanding_ops)
-
- def test_exception_handler(self):
- error_value = FakeException()
- error_handler = Mock()
-
- with patch.object(TreeNode, 'on_deleted') as on_deleted:
- on_deleted.side_effect = [error_value]
-
- self.make_cache()
- self.cache.listen_fault(error_handler)
-
- self.cache.close()
- error_handler.assert_called_once_with(error_value)
-
-
-class FakeException(Exception):
- pass