summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorkulallwang <45503704+kulallwang@users.noreply.github.com>2018-12-06 15:55:45 +0800
committerStephen SORRIAUX <stephen.sorriaux@gmail.com>2018-12-06 07:55:45 +0000
commit37bcda357463155aba5f2383bc70528413a10f1b (patch)
treefef3ccfc6236f037747502d48e7afa61f10ab5e1
parent2ae392e69c4b2daca5d8e7f0e79b7ce90423e65c (diff)
downloadkazoo-37bcda357463155aba5f2383bc70528413a10f1b.tar.gz
fix(recipe): No more memory leak when ChildrenWatch was stopped (#543)
This ensures that the watcher is removed from the client listener when the func given to ChildrenWatch returns False. Previously, the watcher was never removed so the ChildrenWatch object would endlessly grow in memory. A unit test is added to ensure this case never happen again. Fix #542
-rw-r--r--kazoo/recipe/watchers.py2
-rw-r--r--kazoo/tests/test_watchers.py34
2 files changed, 36 insertions, 0 deletions
diff --git a/kazoo/recipe/watchers.py b/kazoo/recipe/watchers.py
index ea9516c..96ec4fe 100644
--- a/kazoo/recipe/watchers.py
+++ b/kazoo/recipe/watchers.py
@@ -341,6 +341,8 @@ class ChildrenWatch(object):
if result is False:
self._stopped = True
self._func = None
+ if self._allow_session_lost:
+ self._client.remove_listener(self._session_watcher)
except Exception as exc:
log.exception(exc)
raise
diff --git a/kazoo/tests/test_watchers.py b/kazoo/tests/test_watchers.py
index e08751e..35f34b5 100644
--- a/kazoo/tests/test_watchers.py
+++ b/kazoo/tests/test_watchers.py
@@ -418,6 +418,40 @@ class KazooChildrenWatcherTests(KazooTestCase):
update.wait(0.5)
eq_(all_children, ['smith'])
+ def test_child_watcher_remove_session_watcher(self):
+ update = threading.Event()
+ all_children = ['fred']
+
+ fail_through = []
+
+ def changed(children):
+ while all_children:
+ all_children.pop()
+ all_children.extend(children)
+ update.set()
+ if fail_through:
+ return False
+
+ children_watch = self.client.ChildrenWatch(self.path, changed)
+ session_watcher = children_watch._session_watcher
+
+ update.wait(10)
+ eq_(session_watcher in self.client.state_listeners, True)
+ eq_(all_children, [])
+ update.clear()
+
+ fail_through.append(True)
+ self.client.create(self.path + '/' + 'smith')
+ update.wait(10)
+ eq_(session_watcher not in self.client.state_listeners, True)
+ eq_(all_children, ['smith'])
+ update.clear()
+
+ self.client.create(self.path + '/' + 'george')
+ update.wait(10)
+ eq_(session_watcher not in self.client.state_listeners, True)
+ eq_(all_children, ['smith'])
+
def test_child_watch_session_loss(self):
update = threading.Event()
all_children = ['fred']