diff options
-rw-r--r-- | kazoo/handlers/eventlet.py | 4 | ||||
-rw-r--r-- | kazoo/handlers/gevent.py | 15 | ||||
-rw-r--r-- | kazoo/handlers/threading.py | 1 | ||||
-rw-r--r-- | kazoo/recipe/cache.py | 39 | ||||
-rw-r--r-- | kazoo/tests/test_cache.py | 125 | ||||
-rw-r--r-- | requirements.txt | 1 | ||||
-rw-r--r-- | setup.py | 1 |
7 files changed, 171 insertions, 15 deletions
diff --git a/kazoo/handlers/eventlet.py b/kazoo/handlers/eventlet.py index c87898e..c13f886 100644 --- a/kazoo/handlers/eventlet.py +++ b/kazoo/handlers/eventlet.py @@ -107,6 +107,8 @@ class SequentialEventletHandler(object): except Exception: LOG.warning("Exception in worker completion queue greenlet", exc_info=True) + finally: + del cb # release before possible idle def _process_callback_queue(self): while True: @@ -119,6 +121,8 @@ class SequentialEventletHandler(object): except Exception: LOG.warning("Exception in worker callback queue greenlet", exc_info=True) + finally: + del cb # release before possible idle def start(self): if not self._started: diff --git a/kazoo/handlers/gevent.py b/kazoo/handlers/gevent.py index bc721d9..78d234d 100644 --- a/kazoo/handlers/gevent.py +++ b/kazoo/handlers/gevent.py @@ -69,14 +69,17 @@ class SequentialGeventHandler(object): while True: try: func = queue.get() - if func is _STOP: - break - func() + try: + if func is _STOP: + break + func() + except Exception as exc: + log.warning("Exception in worker greenlet") + log.exception(exc) + finally: + del func # release before possible idle except self.queue_empty: continue - except Exception as exc: - log.warning("Exception in worker greenlet") - log.exception(exc) return gevent.spawn(greenlet_worker) def start(self): diff --git a/kazoo/handlers/threading.py b/kazoo/handlers/threading.py index b91f9cb..afd05c5 100644 --- a/kazoo/handlers/threading.py +++ b/kazoo/handlers/threading.py @@ -126,6 +126,7 @@ class SequentialThreadingHandler(object): log.exception("Exception in worker queue thread") finally: queue.task_done() + del func # release before possible idle except self.queue_empty: continue t = self.spawn(_thread_worker) diff --git a/kazoo/recipe/cache.py b/kazoo/recipe/cache.py index ed31f72..dc3b8bd 100644 --- a/kazoo/recipe/cache.py +++ b/kazoo/recipe/cache.py @@ -19,6 +19,7 @@ import operator import os from kazoo.exceptions import NoNodeError, KazooException +from kazoo.protocol.paths import _prefix_root from kazoo.protocol.states import KazooState, EventType @@ -57,6 +58,12 @@ class TreeCache(object): After a cache started, all changes of subtree will be synchronized from the ZooKeeper server. Events will be fired for those activity. + Don't forget to call :meth:`close` if a tree was started and you don't + need it anymore, or you will leak the memory of cached nodes, even if + you have released all references to the :class:`TreeCache` instance. + Because there are so many callbacks that have been registered to the + Kazoo client. + See also :meth:`~TreeCache.listen`. .. note:: @@ -75,7 +82,10 @@ class TreeCache(object): self._client.ensure_path(self._root._path) if self._client.connected: - self._root.on_created() + # The on_created and other on_* methods must not be invoked outside + # the background task. This is the key to keep concurrency safe + # without lock. + self._in_background(self._root.on_created) def close(self): """Closes the cache. @@ -95,6 +105,10 @@ class TreeCache(object): self._task_queue.put(self._STOP) self._client.remove_listener(self._session_watcher) with handle_exception(self._error_listeners): + # We must invoke on_deleted outside background queue because: + # 1. The background task has been stopped. + # 2. The on_deleted on closed tree does not communicate with + # ZooKeeper actually. self._root.on_deleted() def listen(self, listener): @@ -185,6 +199,9 @@ class TreeCache(object): func, args, kwargs = cb func(*args, **kwargs) + # release before possible idle + del cb, func, args, kwargs + def _session_watcher(self, state): if state == KazooState.SUSPENDED: self._publish_event(TreeEvent.CONNECTION_SUSPENDED) @@ -241,6 +258,7 @@ class TreeNode(object): old_child.on_deleted() if self._tree._state == self._tree.STATE_CLOSED: + self._reset_watchers() return old_state, self._state = self._state, self.STATE_DEAD @@ -253,10 +271,18 @@ class TreeNode(object): child = self._path[len(self._parent._path) + 1:] if self._parent._children.get(child) is self: del self._parent._children[child] + self._reset_watchers() def _publish_event(self, *args, **kwargs): return self._tree._publish_event(*args, **kwargs) + def _reset_watchers(self): + client = self._tree._client + for _watchers in (client._data_watchers, client._child_watchers): + _path = _prefix_root(client.chroot, self._path) + _watcher = _watchers.get(_path, set()) + _watcher.discard(self._process_watch) + def _refresh(self): self._refresh_data() self._refresh_children() @@ -391,10 +417,11 @@ def handle_exception(listeners): yield except Exception as e: logger.debug('processing error: %r', e) - for listener in listeners: - try: - listener(e) - except: # pragma: no cover - logger.exception('Exception handling exception') # oops + if listeners: + for listener in listeners: + try: + listener(e) + except BaseException: # pragma: no cover + logger.exception('Exception handling exception') # oops else: logger.exception('No listener to process %r', e) diff --git a/kazoo/tests/test_cache.py b/kazoo/tests/test_cache.py index 0cf0227..3a73880 100644 --- a/kazoo/tests/test_cache.py +++ b/kazoo/tests/test_cache.py @@ -1,16 +1,45 @@ +import gc +import importlib import uuid from mock import patch, call, Mock from nose.tools import eq_, ok_, assert_not_equal, raises +from objgraph import count as count_refs_by_type -from kazoo.testing import KazooTestCase +from kazoo.testing import KazooTestHarness from kazoo.exceptions import KazooException from kazoo.recipe.cache import TreeCache, TreeNode, TreeEvent -class KazooTreeCacheTests(KazooTestCase): +class KazooAdaptiveHandlerTestCase(KazooTestHarness): + HANDLERS = ( + ('kazoo.handlers.gevent', 'SequentialGeventHandler'), + ('kazoo.handlers.eventlet', 'SequentialEventletHandler'), + ('kazoo.handlers.threading', 'SequentialThreadingHandler'), + ) def setUp(self): + self.handler = self.choose_an_installed_handler() + self.setup_zookeeper(handler=self.handler) + + def tearDown(self): + self.handler = None + self.teardown_zookeeper() + + def choose_an_installed_handler(self): + for handler_module, handler_class in self.HANDLERS: + try: + mod = importlib.import_module(handler_module) + cls = getattr(mod, handler_class) + except ImportError: + continue + else: + return cls() + raise ImportError('No available handler') + + +class KazooTreeCacheTests(KazooAdaptiveHandlerTestCase): + def setUp(self): super(KazooTreeCacheTests, self).setUp() self._event_queue = self.client.handler.queue_impl() self._error_queue = self.client.handler.queue_impl() @@ -18,12 +47,15 @@ class KazooTreeCacheTests(KazooTestCase): self.cache = None def tearDown(self): - super(KazooTreeCacheTests, self).tearDown() if not self._error_queue.empty(): try: raise self._error_queue.get() except FakeException: pass + if self.cache is not None: + self.cache.close() + self.cache = None + super(KazooTreeCacheTests, self).tearDown() def make_cache(self): if self.cache is None: @@ -51,6 +83,29 @@ class KazooTreeCacheTests(KazooTestCase): method = getattr(self.client, method_name) return patch.object(self.client, method_name, wraps=method) + def _wait_gc(self): + # trigger switching on some coroutine handlers + self.client.handler.sleep_func(0.1) + + completion_queue = getattr(self.handler, 'completion_queue', None) + if completion_queue is not None: + while not self.client.handler.completion_queue.empty(): + self.client.handler.sleep_func(0.1) + + for gen in range(3): + gc.collect(gen) + + def count_tree_node(self): + # inspect GC and count tree nodes for checking memory leak + for retry in range(10): + result = set() + for _ in range(5): + self._wait_gc() + result.add(count_refs_by_type('TreeNode')) + if len(result) == 1: + return list(result)[0] + raise RuntimeError('could not count refs exactly') + def test_start(self): self.make_cache() self.wait_cache(since=TreeEvent.INITIALIZED) @@ -74,12 +129,29 @@ class KazooTreeCacheTests(KazooTestCase): self.cache.start() def test_close(self): + eq_(self.count_tree_node(), 0) + self.make_cache() self.wait_cache(since=TreeEvent.INITIALIZED) self.client.create(self.path + '/foo/bar/baz', makepath=True) for _ in range(3): self.wait_cache(TreeEvent.NODE_ADDED) + # setup stub watchers which are outside of tree cache + stub_data_watcher = Mock(spec=lambda event: None) + stub_child_watcher = Mock(spec=lambda event: None) + self.client.get(self.path + '/foo', stub_data_watcher) + self.client.get_children(self.path + '/foo', stub_child_watcher) + + # watchers inside tree cache should be here + root_path = self.client.chroot + self.path + eq_(len(self.client._data_watchers[root_path + '/foo']), 2) + eq_(len(self.client._data_watchers[root_path + '/foo/bar']), 1) + eq_(len(self.client._data_watchers[root_path + '/foo/bar/baz']), 1) + eq_(len(self.client._child_watchers[root_path + '/foo']), 2) + eq_(len(self.client._child_watchers[root_path + '/foo/bar']), 1) + eq_(len(self.client._child_watchers[root_path + '/foo/bar/baz']), 1) + self.cache.close() # nothing should be published since tree closed @@ -93,6 +165,53 @@ class KazooTreeCacheTests(KazooTestCase): # node state should not be changed assert_not_equal(self.cache._root._state, TreeNode.STATE_DEAD) + # watchers should be reset + eq_(len(self.client._data_watchers[root_path + '/foo']), 1) + eq_(len(self.client._data_watchers[root_path + '/foo/bar']), 0) + eq_(len(self.client._data_watchers[root_path + '/foo/bar/baz']), 0) + eq_(len(self.client._child_watchers[root_path + '/foo']), 1) + eq_(len(self.client._child_watchers[root_path + '/foo/bar']), 0) + eq_(len(self.client._child_watchers[root_path + '/foo/bar/baz']), 0) + + # outside watchers should not be deleted + eq_(list(self.client._data_watchers[root_path + '/foo'])[0], + stub_data_watcher) + eq_(list(self.client._child_watchers[root_path + '/foo'])[0], + stub_child_watcher) + + # should not be any leaked memory (tree node) here + self.cache = None + eq_(self.count_tree_node(), 0) + + def test_delete_operation(self): + self.make_cache() + self.wait_cache(since=TreeEvent.INITIALIZED) + + eq_(self.count_tree_node(), 1) + + self.client.create(self.path + '/foo/bar/baz', makepath=True) + for _ in range(3): + self.wait_cache(TreeEvent.NODE_ADDED) + + self.client.delete(self.path + '/foo', recursive=True) + for _ in range(3): + self.wait_cache(TreeEvent.NODE_REMOVED) + + # tree should be empty + eq_(self.cache._root._children, {}) + + # watchers should be reset + root_path = self.client.chroot + self.path + eq_(self.client._data_watchers[root_path + '/foo'], set()) + eq_(self.client._data_watchers[root_path + '/foo/bar'], set()) + eq_(self.client._data_watchers[root_path + '/foo/bar/baz'], set()) + eq_(self.client._child_watchers[root_path + '/foo'], set()) + eq_(self.client._child_watchers[root_path + '/foo/bar'], set()) + eq_(self.client._child_watchers[root_path + '/foo/bar/baz'], set()) + + # should not be any leaked memory (tree node) here + eq_(self.count_tree_node(), 1) + def test_children_operation(self): self.make_cache() self.wait_cache(since=TreeEvent.INITIALIZED) diff --git a/requirements.txt b/requirements.txt index d96b210..2d5c0c6 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,3 +3,4 @@ mock==1.0.1 nose==1.3.3 pure-sasl==0.5.1 flake8==2.3.0 +objgraph==3.4.0 @@ -25,6 +25,7 @@ tests_require = install_requires + [ 'nose', 'flake8', 'pure-sasl', + 'objgraph', ] if not (PYTHON3 or PYPY): |