summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJiangge Zhang <tonyseek@gmail.com>2018-11-21 18:35:18 +0800
committerStephen SORRIAUX <stephen.sorriaux@gmail.com>2018-11-21 10:35:18 +0000
commitc48f2733f2a6b2c2941738e4208e8cfede676730 (patch)
tree3326e02522450e21670dc14abda722eb5c66ca88
parent03340fbde44053885da25dbd914d4f63d1f76e3e (diff)
downloadkazoo-c48f2733f2a6b2c2941738e4208e8cfede676730.tar.gz
fix(recipe): No more memory leak once TreeCache was closed (#524)
fix(recipe): Fix memory leak of TreeCache recipe. Fix memory leak on idle handler and on closed TreeCache. Add new memory tests for TreeCache recipe that uses objgraph and other tests for various handler on TreeCache. Let TreeCache start in a safe way. The doc now suggest to close unused TreeCache.
-rw-r--r--kazoo/handlers/eventlet.py4
-rw-r--r--kazoo/handlers/gevent.py15
-rw-r--r--kazoo/handlers/threading.py1
-rw-r--r--kazoo/recipe/cache.py39
-rw-r--r--kazoo/tests/test_cache.py125
-rw-r--r--requirements.txt1
-rw-r--r--setup.py1
7 files changed, 171 insertions, 15 deletions
diff --git a/kazoo/handlers/eventlet.py b/kazoo/handlers/eventlet.py
index c87898e..c13f886 100644
--- a/kazoo/handlers/eventlet.py
+++ b/kazoo/handlers/eventlet.py
@@ -107,6 +107,8 @@ class SequentialEventletHandler(object):
except Exception:
LOG.warning("Exception in worker completion queue greenlet",
exc_info=True)
+ finally:
+ del cb # release before possible idle
def _process_callback_queue(self):
while True:
@@ -119,6 +121,8 @@ class SequentialEventletHandler(object):
except Exception:
LOG.warning("Exception in worker callback queue greenlet",
exc_info=True)
+ finally:
+ del cb # release before possible idle
def start(self):
if not self._started:
diff --git a/kazoo/handlers/gevent.py b/kazoo/handlers/gevent.py
index bc721d9..78d234d 100644
--- a/kazoo/handlers/gevent.py
+++ b/kazoo/handlers/gevent.py
@@ -69,14 +69,17 @@ class SequentialGeventHandler(object):
while True:
try:
func = queue.get()
- if func is _STOP:
- break
- func()
+ try:
+ if func is _STOP:
+ break
+ func()
+ except Exception as exc:
+ log.warning("Exception in worker greenlet")
+ log.exception(exc)
+ finally:
+ del func # release before possible idle
except self.queue_empty:
continue
- except Exception as exc:
- log.warning("Exception in worker greenlet")
- log.exception(exc)
return gevent.spawn(greenlet_worker)
def start(self):
diff --git a/kazoo/handlers/threading.py b/kazoo/handlers/threading.py
index b91f9cb..afd05c5 100644
--- a/kazoo/handlers/threading.py
+++ b/kazoo/handlers/threading.py
@@ -126,6 +126,7 @@ class SequentialThreadingHandler(object):
log.exception("Exception in worker queue thread")
finally:
queue.task_done()
+ del func # release before possible idle
except self.queue_empty:
continue
t = self.spawn(_thread_worker)
diff --git a/kazoo/recipe/cache.py b/kazoo/recipe/cache.py
index ed31f72..dc3b8bd 100644
--- a/kazoo/recipe/cache.py
+++ b/kazoo/recipe/cache.py
@@ -19,6 +19,7 @@ import operator
import os
from kazoo.exceptions import NoNodeError, KazooException
+from kazoo.protocol.paths import _prefix_root
from kazoo.protocol.states import KazooState, EventType
@@ -57,6 +58,12 @@ class TreeCache(object):
After a cache started, all changes of subtree will be synchronized
from the ZooKeeper server. Events will be fired for those activity.
+ Don't forget to call :meth:`close` if a tree was started and you don't
+ need it anymore, or you will leak the memory of cached nodes, even if
+ you have released all references to the :class:`TreeCache` instance.
+ Because there are so many callbacks that have been registered to the
+ Kazoo client.
+
See also :meth:`~TreeCache.listen`.
.. note::
@@ -75,7 +82,10 @@ class TreeCache(object):
self._client.ensure_path(self._root._path)
if self._client.connected:
- self._root.on_created()
+ # The on_created and other on_* methods must not be invoked outside
+ # the background task. This is the key to keep concurrency safe
+ # without lock.
+ self._in_background(self._root.on_created)
def close(self):
"""Closes the cache.
@@ -95,6 +105,10 @@ class TreeCache(object):
self._task_queue.put(self._STOP)
self._client.remove_listener(self._session_watcher)
with handle_exception(self._error_listeners):
+ # We must invoke on_deleted outside background queue because:
+ # 1. The background task has been stopped.
+ # 2. The on_deleted on closed tree does not communicate with
+ # ZooKeeper actually.
self._root.on_deleted()
def listen(self, listener):
@@ -185,6 +199,9 @@ class TreeCache(object):
func, args, kwargs = cb
func(*args, **kwargs)
+ # release before possible idle
+ del cb, func, args, kwargs
+
def _session_watcher(self, state):
if state == KazooState.SUSPENDED:
self._publish_event(TreeEvent.CONNECTION_SUSPENDED)
@@ -241,6 +258,7 @@ class TreeNode(object):
old_child.on_deleted()
if self._tree._state == self._tree.STATE_CLOSED:
+ self._reset_watchers()
return
old_state, self._state = self._state, self.STATE_DEAD
@@ -253,10 +271,18 @@ class TreeNode(object):
child = self._path[len(self._parent._path) + 1:]
if self._parent._children.get(child) is self:
del self._parent._children[child]
+ self._reset_watchers()
def _publish_event(self, *args, **kwargs):
return self._tree._publish_event(*args, **kwargs)
+ def _reset_watchers(self):
+ client = self._tree._client
+ for _watchers in (client._data_watchers, client._child_watchers):
+ _path = _prefix_root(client.chroot, self._path)
+ _watcher = _watchers.get(_path, set())
+ _watcher.discard(self._process_watch)
+
def _refresh(self):
self._refresh_data()
self._refresh_children()
@@ -391,10 +417,11 @@ def handle_exception(listeners):
yield
except Exception as e:
logger.debug('processing error: %r', e)
- for listener in listeners:
- try:
- listener(e)
- except: # pragma: no cover
- logger.exception('Exception handling exception') # oops
+ if listeners:
+ for listener in listeners:
+ try:
+ listener(e)
+ except BaseException: # pragma: no cover
+ logger.exception('Exception handling exception') # oops
else:
logger.exception('No listener to process %r', e)
diff --git a/kazoo/tests/test_cache.py b/kazoo/tests/test_cache.py
index 0cf0227..3a73880 100644
--- a/kazoo/tests/test_cache.py
+++ b/kazoo/tests/test_cache.py
@@ -1,16 +1,45 @@
+import gc
+import importlib
import uuid
from mock import patch, call, Mock
from nose.tools import eq_, ok_, assert_not_equal, raises
+from objgraph import count as count_refs_by_type
-from kazoo.testing import KazooTestCase
+from kazoo.testing import KazooTestHarness
from kazoo.exceptions import KazooException
from kazoo.recipe.cache import TreeCache, TreeNode, TreeEvent
-class KazooTreeCacheTests(KazooTestCase):
+class KazooAdaptiveHandlerTestCase(KazooTestHarness):
+ HANDLERS = (
+ ('kazoo.handlers.gevent', 'SequentialGeventHandler'),
+ ('kazoo.handlers.eventlet', 'SequentialEventletHandler'),
+ ('kazoo.handlers.threading', 'SequentialThreadingHandler'),
+ )
def setUp(self):
+ self.handler = self.choose_an_installed_handler()
+ self.setup_zookeeper(handler=self.handler)
+
+ def tearDown(self):
+ self.handler = None
+ self.teardown_zookeeper()
+
+ def choose_an_installed_handler(self):
+ for handler_module, handler_class in self.HANDLERS:
+ try:
+ mod = importlib.import_module(handler_module)
+ cls = getattr(mod, handler_class)
+ except ImportError:
+ continue
+ else:
+ return cls()
+ raise ImportError('No available handler')
+
+
+class KazooTreeCacheTests(KazooAdaptiveHandlerTestCase):
+ def setUp(self):
super(KazooTreeCacheTests, self).setUp()
self._event_queue = self.client.handler.queue_impl()
self._error_queue = self.client.handler.queue_impl()
@@ -18,12 +47,15 @@ class KazooTreeCacheTests(KazooTestCase):
self.cache = None
def tearDown(self):
- super(KazooTreeCacheTests, self).tearDown()
if not self._error_queue.empty():
try:
raise self._error_queue.get()
except FakeException:
pass
+ if self.cache is not None:
+ self.cache.close()
+ self.cache = None
+ super(KazooTreeCacheTests, self).tearDown()
def make_cache(self):
if self.cache is None:
@@ -51,6 +83,29 @@ class KazooTreeCacheTests(KazooTestCase):
method = getattr(self.client, method_name)
return patch.object(self.client, method_name, wraps=method)
+ def _wait_gc(self):
+ # trigger switching on some coroutine handlers
+ self.client.handler.sleep_func(0.1)
+
+ completion_queue = getattr(self.handler, 'completion_queue', None)
+ if completion_queue is not None:
+ while not self.client.handler.completion_queue.empty():
+ self.client.handler.sleep_func(0.1)
+
+ for gen in range(3):
+ gc.collect(gen)
+
+ def count_tree_node(self):
+ # inspect GC and count tree nodes for checking memory leak
+ for retry in range(10):
+ result = set()
+ for _ in range(5):
+ self._wait_gc()
+ result.add(count_refs_by_type('TreeNode'))
+ if len(result) == 1:
+ return list(result)[0]
+ raise RuntimeError('could not count refs exactly')
+
def test_start(self):
self.make_cache()
self.wait_cache(since=TreeEvent.INITIALIZED)
@@ -74,12 +129,29 @@ class KazooTreeCacheTests(KazooTestCase):
self.cache.start()
def test_close(self):
+ eq_(self.count_tree_node(), 0)
+
self.make_cache()
self.wait_cache(since=TreeEvent.INITIALIZED)
self.client.create(self.path + '/foo/bar/baz', makepath=True)
for _ in range(3):
self.wait_cache(TreeEvent.NODE_ADDED)
+ # setup stub watchers which are outside of tree cache
+ stub_data_watcher = Mock(spec=lambda event: None)
+ stub_child_watcher = Mock(spec=lambda event: None)
+ self.client.get(self.path + '/foo', stub_data_watcher)
+ self.client.get_children(self.path + '/foo', stub_child_watcher)
+
+ # watchers inside tree cache should be here
+ root_path = self.client.chroot + self.path
+ eq_(len(self.client._data_watchers[root_path + '/foo']), 2)
+ eq_(len(self.client._data_watchers[root_path + '/foo/bar']), 1)
+ eq_(len(self.client._data_watchers[root_path + '/foo/bar/baz']), 1)
+ eq_(len(self.client._child_watchers[root_path + '/foo']), 2)
+ eq_(len(self.client._child_watchers[root_path + '/foo/bar']), 1)
+ eq_(len(self.client._child_watchers[root_path + '/foo/bar/baz']), 1)
+
self.cache.close()
# nothing should be published since tree closed
@@ -93,6 +165,53 @@ class KazooTreeCacheTests(KazooTestCase):
# node state should not be changed
assert_not_equal(self.cache._root._state, TreeNode.STATE_DEAD)
+ # watchers should be reset
+ eq_(len(self.client._data_watchers[root_path + '/foo']), 1)
+ eq_(len(self.client._data_watchers[root_path + '/foo/bar']), 0)
+ eq_(len(self.client._data_watchers[root_path + '/foo/bar/baz']), 0)
+ eq_(len(self.client._child_watchers[root_path + '/foo']), 1)
+ eq_(len(self.client._child_watchers[root_path + '/foo/bar']), 0)
+ eq_(len(self.client._child_watchers[root_path + '/foo/bar/baz']), 0)
+
+ # outside watchers should not be deleted
+ eq_(list(self.client._data_watchers[root_path + '/foo'])[0],
+ stub_data_watcher)
+ eq_(list(self.client._child_watchers[root_path + '/foo'])[0],
+ stub_child_watcher)
+
+ # should not be any leaked memory (tree node) here
+ self.cache = None
+ eq_(self.count_tree_node(), 0)
+
+ def test_delete_operation(self):
+ self.make_cache()
+ self.wait_cache(since=TreeEvent.INITIALIZED)
+
+ eq_(self.count_tree_node(), 1)
+
+ self.client.create(self.path + '/foo/bar/baz', makepath=True)
+ for _ in range(3):
+ self.wait_cache(TreeEvent.NODE_ADDED)
+
+ self.client.delete(self.path + '/foo', recursive=True)
+ for _ in range(3):
+ self.wait_cache(TreeEvent.NODE_REMOVED)
+
+ # tree should be empty
+ eq_(self.cache._root._children, {})
+
+ # watchers should be reset
+ root_path = self.client.chroot + self.path
+ eq_(self.client._data_watchers[root_path + '/foo'], set())
+ eq_(self.client._data_watchers[root_path + '/foo/bar'], set())
+ eq_(self.client._data_watchers[root_path + '/foo/bar/baz'], set())
+ eq_(self.client._child_watchers[root_path + '/foo'], set())
+ eq_(self.client._child_watchers[root_path + '/foo/bar'], set())
+ eq_(self.client._child_watchers[root_path + '/foo/bar/baz'], set())
+
+ # should not be any leaked memory (tree node) here
+ eq_(self.count_tree_node(), 1)
+
def test_children_operation(self):
self.make_cache()
self.wait_cache(since=TreeEvent.INITIALIZED)
diff --git a/requirements.txt b/requirements.txt
index d96b210..2d5c0c6 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -3,3 +3,4 @@ mock==1.0.1
nose==1.3.3
pure-sasl==0.5.1
flake8==2.3.0
+objgraph==3.4.0
diff --git a/setup.py b/setup.py
index 8676e29..ee055d5 100644
--- a/setup.py
+++ b/setup.py
@@ -25,6 +25,7 @@ tests_require = install_requires + [
'nose',
'flake8',
'pure-sasl',
+ 'objgraph',
]
if not (PYTHON3 or PYPY):