summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kazoo/handlers/eventlet.py4
-rw-r--r--kazoo/handlers/gevent.py15
-rw-r--r--kazoo/handlers/threading.py1
-rw-r--r--kazoo/recipe/cache.py39
-rw-r--r--kazoo/tests/test_cache.py125
-rw-r--r--requirements.txt1
-rw-r--r--setup.py1
7 files changed, 171 insertions, 15 deletions
diff --git a/kazoo/handlers/eventlet.py b/kazoo/handlers/eventlet.py
index c87898e..c13f886 100644
--- a/kazoo/handlers/eventlet.py
+++ b/kazoo/handlers/eventlet.py
@@ -107,6 +107,8 @@ class SequentialEventletHandler(object):
except Exception:
LOG.warning("Exception in worker completion queue greenlet",
exc_info=True)
+ finally:
+ del cb # release before possible idle
def _process_callback_queue(self):
while True:
@@ -119,6 +121,8 @@ class SequentialEventletHandler(object):
except Exception:
LOG.warning("Exception in worker callback queue greenlet",
exc_info=True)
+ finally:
+ del cb # release before possible idle
def start(self):
if not self._started:
diff --git a/kazoo/handlers/gevent.py b/kazoo/handlers/gevent.py
index bc721d9..78d234d 100644
--- a/kazoo/handlers/gevent.py
+++ b/kazoo/handlers/gevent.py
@@ -69,14 +69,17 @@ class SequentialGeventHandler(object):
while True:
try:
func = queue.get()
- if func is _STOP:
- break
- func()
+ try:
+ if func is _STOP:
+ break
+ func()
+ except Exception as exc:
+ log.warning("Exception in worker greenlet")
+ log.exception(exc)
+ finally:
+ del func # release before possible idle
except self.queue_empty:
continue
- except Exception as exc:
- log.warning("Exception in worker greenlet")
- log.exception(exc)
return gevent.spawn(greenlet_worker)
def start(self):
diff --git a/kazoo/handlers/threading.py b/kazoo/handlers/threading.py
index b91f9cb..afd05c5 100644
--- a/kazoo/handlers/threading.py
+++ b/kazoo/handlers/threading.py
@@ -126,6 +126,7 @@ class SequentialThreadingHandler(object):
log.exception("Exception in worker queue thread")
finally:
queue.task_done()
+ del func # release before possible idle
except self.queue_empty:
continue
t = self.spawn(_thread_worker)
diff --git a/kazoo/recipe/cache.py b/kazoo/recipe/cache.py
index ed31f72..dc3b8bd 100644
--- a/kazoo/recipe/cache.py
+++ b/kazoo/recipe/cache.py
@@ -19,6 +19,7 @@ import operator
import os
from kazoo.exceptions import NoNodeError, KazooException
+from kazoo.protocol.paths import _prefix_root
from kazoo.protocol.states import KazooState, EventType
@@ -57,6 +58,12 @@ class TreeCache(object):
After a cache started, all changes of subtree will be synchronized
from the ZooKeeper server. Events will be fired for those activity.
+ Don't forget to call :meth:`close` if a tree was started and you don't
+ need it anymore, or you will leak the memory of cached nodes, even if
+ you have released all references to the :class:`TreeCache` instance.
+ Because there are so many callbacks that have been registered to the
+ Kazoo client.
+
See also :meth:`~TreeCache.listen`.
.. note::
@@ -75,7 +82,10 @@ class TreeCache(object):
self._client.ensure_path(self._root._path)
if self._client.connected:
- self._root.on_created()
+ # The on_created and other on_* methods must not be invoked outside
+ # the background task. This is the key to keep concurrency safe
+ # without lock.
+ self._in_background(self._root.on_created)
def close(self):
"""Closes the cache.
@@ -95,6 +105,10 @@ class TreeCache(object):
self._task_queue.put(self._STOP)
self._client.remove_listener(self._session_watcher)
with handle_exception(self._error_listeners):
+ # We must invoke on_deleted outside background queue because:
+ # 1. The background task has been stopped.
+ # 2. The on_deleted on closed tree does not communicate with
+ # ZooKeeper actually.
self._root.on_deleted()
def listen(self, listener):
@@ -185,6 +199,9 @@ class TreeCache(object):
func, args, kwargs = cb
func(*args, **kwargs)
+ # release before possible idle
+ del cb, func, args, kwargs
+
def _session_watcher(self, state):
if state == KazooState.SUSPENDED:
self._publish_event(TreeEvent.CONNECTION_SUSPENDED)
@@ -241,6 +258,7 @@ class TreeNode(object):
old_child.on_deleted()
if self._tree._state == self._tree.STATE_CLOSED:
+ self._reset_watchers()
return
old_state, self._state = self._state, self.STATE_DEAD
@@ -253,10 +271,18 @@ class TreeNode(object):
child = self._path[len(self._parent._path) + 1:]
if self._parent._children.get(child) is self:
del self._parent._children[child]
+ self._reset_watchers()
def _publish_event(self, *args, **kwargs):
return self._tree._publish_event(*args, **kwargs)
+ def _reset_watchers(self):
+ client = self._tree._client
+ for _watchers in (client._data_watchers, client._child_watchers):
+ _path = _prefix_root(client.chroot, self._path)
+ _watcher = _watchers.get(_path, set())
+ _watcher.discard(self._process_watch)
+
def _refresh(self):
self._refresh_data()
self._refresh_children()
@@ -391,10 +417,11 @@ def handle_exception(listeners):
yield
except Exception as e:
logger.debug('processing error: %r', e)
- for listener in listeners:
- try:
- listener(e)
- except: # pragma: no cover
- logger.exception('Exception handling exception') # oops
+ if listeners:
+ for listener in listeners:
+ try:
+ listener(e)
+ except BaseException: # pragma: no cover
+ logger.exception('Exception handling exception') # oops
else:
logger.exception('No listener to process %r', e)
diff --git a/kazoo/tests/test_cache.py b/kazoo/tests/test_cache.py
index 0cf0227..3a73880 100644
--- a/kazoo/tests/test_cache.py
+++ b/kazoo/tests/test_cache.py
@@ -1,16 +1,45 @@
+import gc
+import importlib
import uuid
from mock import patch, call, Mock
from nose.tools import eq_, ok_, assert_not_equal, raises
+from objgraph import count as count_refs_by_type
-from kazoo.testing import KazooTestCase
+from kazoo.testing import KazooTestHarness
from kazoo.exceptions import KazooException
from kazoo.recipe.cache import TreeCache, TreeNode, TreeEvent
-class KazooTreeCacheTests(KazooTestCase):
+class KazooAdaptiveHandlerTestCase(KazooTestHarness):
+ HANDLERS = (
+ ('kazoo.handlers.gevent', 'SequentialGeventHandler'),
+ ('kazoo.handlers.eventlet', 'SequentialEventletHandler'),
+ ('kazoo.handlers.threading', 'SequentialThreadingHandler'),
+ )
def setUp(self):
+ self.handler = self.choose_an_installed_handler()
+ self.setup_zookeeper(handler=self.handler)
+
+ def tearDown(self):
+ self.handler = None
+ self.teardown_zookeeper()
+
+ def choose_an_installed_handler(self):
+ for handler_module, handler_class in self.HANDLERS:
+ try:
+ mod = importlib.import_module(handler_module)
+ cls = getattr(mod, handler_class)
+ except ImportError:
+ continue
+ else:
+ return cls()
+ raise ImportError('No available handler')
+
+
+class KazooTreeCacheTests(KazooAdaptiveHandlerTestCase):
+ def setUp(self):
super(KazooTreeCacheTests, self).setUp()
self._event_queue = self.client.handler.queue_impl()
self._error_queue = self.client.handler.queue_impl()
@@ -18,12 +47,15 @@ class KazooTreeCacheTests(KazooTestCase):
self.cache = None
def tearDown(self):
- super(KazooTreeCacheTests, self).tearDown()
if not self._error_queue.empty():
try:
raise self._error_queue.get()
except FakeException:
pass
+ if self.cache is not None:
+ self.cache.close()
+ self.cache = None
+ super(KazooTreeCacheTests, self).tearDown()
def make_cache(self):
if self.cache is None:
@@ -51,6 +83,29 @@ class KazooTreeCacheTests(KazooTestCase):
method = getattr(self.client, method_name)
return patch.object(self.client, method_name, wraps=method)
+ def _wait_gc(self):
+ # trigger switching on some coroutine handlers
+ self.client.handler.sleep_func(0.1)
+
+ completion_queue = getattr(self.handler, 'completion_queue', None)
+ if completion_queue is not None:
+ while not self.client.handler.completion_queue.empty():
+ self.client.handler.sleep_func(0.1)
+
+ for gen in range(3):
+ gc.collect(gen)
+
+ def count_tree_node(self):
+ # inspect GC and count tree nodes for checking memory leak
+ for retry in range(10):
+ result = set()
+ for _ in range(5):
+ self._wait_gc()
+ result.add(count_refs_by_type('TreeNode'))
+ if len(result) == 1:
+ return list(result)[0]
+ raise RuntimeError('could not count refs exactly')
+
def test_start(self):
self.make_cache()
self.wait_cache(since=TreeEvent.INITIALIZED)
@@ -74,12 +129,29 @@ class KazooTreeCacheTests(KazooTestCase):
self.cache.start()
def test_close(self):
+ eq_(self.count_tree_node(), 0)
+
self.make_cache()
self.wait_cache(since=TreeEvent.INITIALIZED)
self.client.create(self.path + '/foo/bar/baz', makepath=True)
for _ in range(3):
self.wait_cache(TreeEvent.NODE_ADDED)
+ # setup stub watchers which are outside of tree cache
+ stub_data_watcher = Mock(spec=lambda event: None)
+ stub_child_watcher = Mock(spec=lambda event: None)
+ self.client.get(self.path + '/foo', stub_data_watcher)
+ self.client.get_children(self.path + '/foo', stub_child_watcher)
+
+ # watchers inside tree cache should be here
+ root_path = self.client.chroot + self.path
+ eq_(len(self.client._data_watchers[root_path + '/foo']), 2)
+ eq_(len(self.client._data_watchers[root_path + '/foo/bar']), 1)
+ eq_(len(self.client._data_watchers[root_path + '/foo/bar/baz']), 1)
+ eq_(len(self.client._child_watchers[root_path + '/foo']), 2)
+ eq_(len(self.client._child_watchers[root_path + '/foo/bar']), 1)
+ eq_(len(self.client._child_watchers[root_path + '/foo/bar/baz']), 1)
+
self.cache.close()
# nothing should be published since tree closed
@@ -93,6 +165,53 @@ class KazooTreeCacheTests(KazooTestCase):
# node state should not be changed
assert_not_equal(self.cache._root._state, TreeNode.STATE_DEAD)
+ # watchers should be reset
+ eq_(len(self.client._data_watchers[root_path + '/foo']), 1)
+ eq_(len(self.client._data_watchers[root_path + '/foo/bar']), 0)
+ eq_(len(self.client._data_watchers[root_path + '/foo/bar/baz']), 0)
+ eq_(len(self.client._child_watchers[root_path + '/foo']), 1)
+ eq_(len(self.client._child_watchers[root_path + '/foo/bar']), 0)
+ eq_(len(self.client._child_watchers[root_path + '/foo/bar/baz']), 0)
+
+ # outside watchers should not be deleted
+ eq_(list(self.client._data_watchers[root_path + '/foo'])[0],
+ stub_data_watcher)
+ eq_(list(self.client._child_watchers[root_path + '/foo'])[0],
+ stub_child_watcher)
+
+ # should not be any leaked memory (tree node) here
+ self.cache = None
+ eq_(self.count_tree_node(), 0)
+
+ def test_delete_operation(self):
+ self.make_cache()
+ self.wait_cache(since=TreeEvent.INITIALIZED)
+
+ eq_(self.count_tree_node(), 1)
+
+ self.client.create(self.path + '/foo/bar/baz', makepath=True)
+ for _ in range(3):
+ self.wait_cache(TreeEvent.NODE_ADDED)
+
+ self.client.delete(self.path + '/foo', recursive=True)
+ for _ in range(3):
+ self.wait_cache(TreeEvent.NODE_REMOVED)
+
+ # tree should be empty
+ eq_(self.cache._root._children, {})
+
+ # watchers should be reset
+ root_path = self.client.chroot + self.path
+ eq_(self.client._data_watchers[root_path + '/foo'], set())
+ eq_(self.client._data_watchers[root_path + '/foo/bar'], set())
+ eq_(self.client._data_watchers[root_path + '/foo/bar/baz'], set())
+ eq_(self.client._child_watchers[root_path + '/foo'], set())
+ eq_(self.client._child_watchers[root_path + '/foo/bar'], set())
+ eq_(self.client._child_watchers[root_path + '/foo/bar/baz'], set())
+
+ # should not be any leaked memory (tree node) here
+ eq_(self.count_tree_node(), 1)
+
def test_children_operation(self):
self.make_cache()
self.wait_cache(since=TreeEvent.INITIALIZED)
diff --git a/requirements.txt b/requirements.txt
index d96b210..2d5c0c6 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -3,3 +3,4 @@ mock==1.0.1
nose==1.3.3
pure-sasl==0.5.1
flake8==2.3.0
+objgraph==3.4.0
diff --git a/setup.py b/setup.py
index 8676e29..ee055d5 100644
--- a/setup.py
+++ b/setup.py
@@ -25,6 +25,7 @@ tests_require = install_requires + [
'nose',
'flake8',
'pure-sasl',
+ 'objgraph',
]
if not (PYTHON3 or PYPY):