summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJiangge Zhang <tonyseek@gmail.com>2017-10-31 15:15:47 +0800
committerJeff Widman <jeff@jeffwidman.com>2018-03-23 02:01:50 -0700
commit4456f180735a0f8520bfc42474de9d27fa01bb2c (patch)
tree530d33fb2cc34ccf687dda30d6d38417a38c1117
parentcbd02f5ccc6ea9e6dd6b5da01a0397ea649143a0 (diff)
downloadkazoo-4456f180735a0f8520bfc42474de9d27fa01bb2c.tar.gz
perf(recipe): Give TreeCache standalone queue
This commit lets TreeCache do not use queue of connection routine any more.
-rw-r--r--kazoo/recipe/cache.py17
1 files changed, 16 insertions, 1 deletions
diff --git a/kazoo/recipe/cache.py b/kazoo/recipe/cache.py
index 1a084af..65fad85 100644
--- a/kazoo/recipe/cache.py
+++ b/kazoo/recipe/cache.py
@@ -36,6 +36,8 @@ class TreeCache(object):
STATE_STARTED = 1
STATE_CLOSED = 2
+ _STOP = object()
+
def __init__(self, client, path):
self._client = client
self._root = TreeNode.make_root(self, path)
@@ -44,6 +46,8 @@ class TreeCache(object):
self._is_initialized = False
self._error_listeners = []
self._event_listeners = []
+ self._task_queue = client.handler.queue_impl()
+ self._task_thread = None
def start(self):
"""Starts the cache.
@@ -66,6 +70,7 @@ class TreeCache(object):
else:
raise KazooException('already started')
+ self._task_thread = self._client.handler.spawn(self._do_background)
self._client.add_listener(self._session_watcher)
self._client.ensure_path(self._root._path)
@@ -87,6 +92,7 @@ class TreeCache(object):
"""
if self._state == self.STATE_STARTED:
self._state = self.STATE_CLOSED
+ self._task_queue.put(self._STOP)
self._client.remove_listener(self._session_watcher)
with handle_exception(self._error_listeners):
self._root.on_deleted()
@@ -168,7 +174,16 @@ class TreeCache(object):
listener(event)
def _in_background(self, func, *args, **kwargs):
- self._client.handler.callback_queue.put(lambda: func(*args, **kwargs))
+ self._task_queue.put((func, args, kwargs))
+
+ def _do_background(self):
+ while True:
+ with handle_exception(self._error_listeners):
+ cb = self._task_queue.get()
+ if cb is self._STOP:
+ break
+ func, args, kwargs = cb
+ func(*args, **kwargs)
def _session_watcher(self, state):
if state == KazooState.SUSPENDED: