diff options
author | Jiangge Zhang <tonyseek@gmail.com> | 2018-11-21 18:35:18 +0800 |
---|---|---|
committer | Stephen SORRIAUX <stephen.sorriaux@gmail.com> | 2018-11-21 10:35:18 +0000 |
commit | c48f2733f2a6b2c2941738e4208e8cfede676730 (patch) | |
tree | 3326e02522450e21670dc14abda722eb5c66ca88 | |
parent | 03340fbde44053885da25dbd914d4f63d1f76e3e (diff) | |
download | kazoo-c48f2733f2a6b2c2941738e4208e8cfede676730.tar.gz |
fix(recipe): No more memory leak once TreeCache was closed (#524)
fix(recipe): Fix memory leak of TreeCache recipe.
Fix memory leak on idle handler and on closed TreeCache.
Add new memory tests for TreeCache recipe that uses
objgraph and other tests for various handler on TreeCache.
Let TreeCache start in a safe way. The doc now suggest to
close unused TreeCache.
-rw-r--r-- | kazoo/handlers/eventlet.py | 4 | ||||
-rw-r--r-- | kazoo/handlers/gevent.py | 15 | ||||
-rw-r--r-- | kazoo/handlers/threading.py | 1 | ||||
-rw-r--r-- | kazoo/recipe/cache.py | 39 | ||||
-rw-r--r-- | kazoo/tests/test_cache.py | 125 | ||||
-rw-r--r-- | requirements.txt | 1 | ||||
-rw-r--r-- | setup.py | 1 |
7 files changed, 171 insertions, 15 deletions
diff --git a/kazoo/handlers/eventlet.py b/kazoo/handlers/eventlet.py index c87898e..c13f886 100644 --- a/kazoo/handlers/eventlet.py +++ b/kazoo/handlers/eventlet.py @@ -107,6 +107,8 @@ class SequentialEventletHandler(object): except Exception: LOG.warning("Exception in worker completion queue greenlet", exc_info=True) + finally: + del cb # release before possible idle def _process_callback_queue(self): while True: @@ -119,6 +121,8 @@ class SequentialEventletHandler(object): except Exception: LOG.warning("Exception in worker callback queue greenlet", exc_info=True) + finally: + del cb # release before possible idle def start(self): if not self._started: diff --git a/kazoo/handlers/gevent.py b/kazoo/handlers/gevent.py index bc721d9..78d234d 100644 --- a/kazoo/handlers/gevent.py +++ b/kazoo/handlers/gevent.py @@ -69,14 +69,17 @@ class SequentialGeventHandler(object): while True: try: func = queue.get() - if func is _STOP: - break - func() + try: + if func is _STOP: + break + func() + except Exception as exc: + log.warning("Exception in worker greenlet") + log.exception(exc) + finally: + del func # release before possible idle except self.queue_empty: continue - except Exception as exc: - log.warning("Exception in worker greenlet") - log.exception(exc) return gevent.spawn(greenlet_worker) def start(self): diff --git a/kazoo/handlers/threading.py b/kazoo/handlers/threading.py index b91f9cb..afd05c5 100644 --- a/kazoo/handlers/threading.py +++ b/kazoo/handlers/threading.py @@ -126,6 +126,7 @@ class SequentialThreadingHandler(object): log.exception("Exception in worker queue thread") finally: queue.task_done() + del func # release before possible idle except self.queue_empty: continue t = self.spawn(_thread_worker) diff --git a/kazoo/recipe/cache.py b/kazoo/recipe/cache.py index ed31f72..dc3b8bd 100644 --- a/kazoo/recipe/cache.py +++ b/kazoo/recipe/cache.py @@ -19,6 +19,7 @@ import operator import os from kazoo.exceptions import NoNodeError, KazooException +from kazoo.protocol.paths import _prefix_root from kazoo.protocol.states import KazooState, EventType @@ -57,6 +58,12 @@ class TreeCache(object): After a cache started, all changes of subtree will be synchronized from the ZooKeeper server. Events will be fired for those activity. + Don't forget to call :meth:`close` if a tree was started and you don't + need it anymore, or you will leak the memory of cached nodes, even if + you have released all references to the :class:`TreeCache` instance. + Because there are so many callbacks that have been registered to the + Kazoo client. + See also :meth:`~TreeCache.listen`. .. note:: @@ -75,7 +82,10 @@ class TreeCache(object): self._client.ensure_path(self._root._path) if self._client.connected: - self._root.on_created() + # The on_created and other on_* methods must not be invoked outside + # the background task. This is the key to keep concurrency safe + # without lock. + self._in_background(self._root.on_created) def close(self): """Closes the cache. @@ -95,6 +105,10 @@ class TreeCache(object): self._task_queue.put(self._STOP) self._client.remove_listener(self._session_watcher) with handle_exception(self._error_listeners): + # We must invoke on_deleted outside background queue because: + # 1. The background task has been stopped. + # 2. The on_deleted on closed tree does not communicate with + # ZooKeeper actually. self._root.on_deleted() def listen(self, listener): @@ -185,6 +199,9 @@ class TreeCache(object): func, args, kwargs = cb func(*args, **kwargs) + # release before possible idle + del cb, func, args, kwargs + def _session_watcher(self, state): if state == KazooState.SUSPENDED: self._publish_event(TreeEvent.CONNECTION_SUSPENDED) @@ -241,6 +258,7 @@ class TreeNode(object): old_child.on_deleted() if self._tree._state == self._tree.STATE_CLOSED: + self._reset_watchers() return old_state, self._state = self._state, self.STATE_DEAD @@ -253,10 +271,18 @@ class TreeNode(object): child = self._path[len(self._parent._path) + 1:] if self._parent._children.get(child) is self: del self._parent._children[child] + self._reset_watchers() def _publish_event(self, *args, **kwargs): return self._tree._publish_event(*args, **kwargs) + def _reset_watchers(self): + client = self._tree._client + for _watchers in (client._data_watchers, client._child_watchers): + _path = _prefix_root(client.chroot, self._path) + _watcher = _watchers.get(_path, set()) + _watcher.discard(self._process_watch) + def _refresh(self): self._refresh_data() self._refresh_children() @@ -391,10 +417,11 @@ def handle_exception(listeners): yield except Exception as e: logger.debug('processing error: %r', e) - for listener in listeners: - try: - listener(e) - except: # pragma: no cover - logger.exception('Exception handling exception') # oops + if listeners: + for listener in listeners: + try: + listener(e) + except BaseException: # pragma: no cover + logger.exception('Exception handling exception') # oops else: logger.exception('No listener to process %r', e) diff --git a/kazoo/tests/test_cache.py b/kazoo/tests/test_cache.py index 0cf0227..3a73880 100644 --- a/kazoo/tests/test_cache.py +++ b/kazoo/tests/test_cache.py @@ -1,16 +1,45 @@ +import gc +import importlib import uuid from mock import patch, call, Mock from nose.tools import eq_, ok_, assert_not_equal, raises +from objgraph import count as count_refs_by_type -from kazoo.testing import KazooTestCase +from kazoo.testing import KazooTestHarness from kazoo.exceptions import KazooException from kazoo.recipe.cache import TreeCache, TreeNode, TreeEvent -class KazooTreeCacheTests(KazooTestCase): +class KazooAdaptiveHandlerTestCase(KazooTestHarness): + HANDLERS = ( + ('kazoo.handlers.gevent', 'SequentialGeventHandler'), + ('kazoo.handlers.eventlet', 'SequentialEventletHandler'), + ('kazoo.handlers.threading', 'SequentialThreadingHandler'), + ) def setUp(self): + self.handler = self.choose_an_installed_handler() + self.setup_zookeeper(handler=self.handler) + + def tearDown(self): + self.handler = None + self.teardown_zookeeper() + + def choose_an_installed_handler(self): + for handler_module, handler_class in self.HANDLERS: + try: + mod = importlib.import_module(handler_module) + cls = getattr(mod, handler_class) + except ImportError: + continue + else: + return cls() + raise ImportError('No available handler') + + +class KazooTreeCacheTests(KazooAdaptiveHandlerTestCase): + def setUp(self): super(KazooTreeCacheTests, self).setUp() self._event_queue = self.client.handler.queue_impl() self._error_queue = self.client.handler.queue_impl() @@ -18,12 +47,15 @@ class KazooTreeCacheTests(KazooTestCase): self.cache = None def tearDown(self): - super(KazooTreeCacheTests, self).tearDown() if not self._error_queue.empty(): try: raise self._error_queue.get() except FakeException: pass + if self.cache is not None: + self.cache.close() + self.cache = None + super(KazooTreeCacheTests, self).tearDown() def make_cache(self): if self.cache is None: @@ -51,6 +83,29 @@ class KazooTreeCacheTests(KazooTestCase): method = getattr(self.client, method_name) return patch.object(self.client, method_name, wraps=method) + def _wait_gc(self): + # trigger switching on some coroutine handlers + self.client.handler.sleep_func(0.1) + + completion_queue = getattr(self.handler, 'completion_queue', None) + if completion_queue is not None: + while not self.client.handler.completion_queue.empty(): + self.client.handler.sleep_func(0.1) + + for gen in range(3): + gc.collect(gen) + + def count_tree_node(self): + # inspect GC and count tree nodes for checking memory leak + for retry in range(10): + result = set() + for _ in range(5): + self._wait_gc() + result.add(count_refs_by_type('TreeNode')) + if len(result) == 1: + return list(result)[0] + raise RuntimeError('could not count refs exactly') + def test_start(self): self.make_cache() self.wait_cache(since=TreeEvent.INITIALIZED) @@ -74,12 +129,29 @@ class KazooTreeCacheTests(KazooTestCase): self.cache.start() def test_close(self): + eq_(self.count_tree_node(), 0) + self.make_cache() self.wait_cache(since=TreeEvent.INITIALIZED) self.client.create(self.path + '/foo/bar/baz', makepath=True) for _ in range(3): self.wait_cache(TreeEvent.NODE_ADDED) + # setup stub watchers which are outside of tree cache + stub_data_watcher = Mock(spec=lambda event: None) + stub_child_watcher = Mock(spec=lambda event: None) + self.client.get(self.path + '/foo', stub_data_watcher) + self.client.get_children(self.path + '/foo', stub_child_watcher) + + # watchers inside tree cache should be here + root_path = self.client.chroot + self.path + eq_(len(self.client._data_watchers[root_path + '/foo']), 2) + eq_(len(self.client._data_watchers[root_path + '/foo/bar']), 1) + eq_(len(self.client._data_watchers[root_path + '/foo/bar/baz']), 1) + eq_(len(self.client._child_watchers[root_path + '/foo']), 2) + eq_(len(self.client._child_watchers[root_path + '/foo/bar']), 1) + eq_(len(self.client._child_watchers[root_path + '/foo/bar/baz']), 1) + self.cache.close() # nothing should be published since tree closed @@ -93,6 +165,53 @@ class KazooTreeCacheTests(KazooTestCase): # node state should not be changed assert_not_equal(self.cache._root._state, TreeNode.STATE_DEAD) + # watchers should be reset + eq_(len(self.client._data_watchers[root_path + '/foo']), 1) + eq_(len(self.client._data_watchers[root_path + '/foo/bar']), 0) + eq_(len(self.client._data_watchers[root_path + '/foo/bar/baz']), 0) + eq_(len(self.client._child_watchers[root_path + '/foo']), 1) + eq_(len(self.client._child_watchers[root_path + '/foo/bar']), 0) + eq_(len(self.client._child_watchers[root_path + '/foo/bar/baz']), 0) + + # outside watchers should not be deleted + eq_(list(self.client._data_watchers[root_path + '/foo'])[0], + stub_data_watcher) + eq_(list(self.client._child_watchers[root_path + '/foo'])[0], + stub_child_watcher) + + # should not be any leaked memory (tree node) here + self.cache = None + eq_(self.count_tree_node(), 0) + + def test_delete_operation(self): + self.make_cache() + self.wait_cache(since=TreeEvent.INITIALIZED) + + eq_(self.count_tree_node(), 1) + + self.client.create(self.path + '/foo/bar/baz', makepath=True) + for _ in range(3): + self.wait_cache(TreeEvent.NODE_ADDED) + + self.client.delete(self.path + '/foo', recursive=True) + for _ in range(3): + self.wait_cache(TreeEvent.NODE_REMOVED) + + # tree should be empty + eq_(self.cache._root._children, {}) + + # watchers should be reset + root_path = self.client.chroot + self.path + eq_(self.client._data_watchers[root_path + '/foo'], set()) + eq_(self.client._data_watchers[root_path + '/foo/bar'], set()) + eq_(self.client._data_watchers[root_path + '/foo/bar/baz'], set()) + eq_(self.client._child_watchers[root_path + '/foo'], set()) + eq_(self.client._child_watchers[root_path + '/foo/bar'], set()) + eq_(self.client._child_watchers[root_path + '/foo/bar/baz'], set()) + + # should not be any leaked memory (tree node) here + eq_(self.count_tree_node(), 1) + def test_children_operation(self): self.make_cache() self.wait_cache(since=TreeEvent.INITIALIZED) diff --git a/requirements.txt b/requirements.txt index d96b210..2d5c0c6 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,3 +3,4 @@ mock==1.0.1 nose==1.3.3 pure-sasl==0.5.1 flake8==2.3.0 +objgraph==3.4.0 @@ -25,6 +25,7 @@ tests_require = install_requires + [ 'nose', 'flake8', 'pure-sasl', + 'objgraph', ] if not (PYTHON3 or PYPY): |