diff options
author | Jiangge Zhang <tonyseek@gmail.com> | 2017-10-31 15:15:47 +0800 |
---|---|---|
committer | Jeff Widman <jeff@jeffwidman.com> | 2018-03-23 02:01:50 -0700 |
commit | 4456f180735a0f8520bfc42474de9d27fa01bb2c (patch) | |
tree | 530d33fb2cc34ccf687dda30d6d38417a38c1117 | |
parent | cbd02f5ccc6ea9e6dd6b5da01a0397ea649143a0 (diff) | |
download | kazoo-4456f180735a0f8520bfc42474de9d27fa01bb2c.tar.gz |
perf(recipe): Give TreeCache standalone queue
This commit lets TreeCache do not use queue of connection routine any more.
-rw-r--r-- | kazoo/recipe/cache.py | 17 |
1 files changed, 16 insertions, 1 deletions
diff --git a/kazoo/recipe/cache.py b/kazoo/recipe/cache.py index 1a084af..65fad85 100644 --- a/kazoo/recipe/cache.py +++ b/kazoo/recipe/cache.py @@ -36,6 +36,8 @@ class TreeCache(object): STATE_STARTED = 1 STATE_CLOSED = 2 + _STOP = object() + def __init__(self, client, path): self._client = client self._root = TreeNode.make_root(self, path) @@ -44,6 +46,8 @@ class TreeCache(object): self._is_initialized = False self._error_listeners = [] self._event_listeners = [] + self._task_queue = client.handler.queue_impl() + self._task_thread = None def start(self): """Starts the cache. @@ -66,6 +70,7 @@ class TreeCache(object): else: raise KazooException('already started') + self._task_thread = self._client.handler.spawn(self._do_background) self._client.add_listener(self._session_watcher) self._client.ensure_path(self._root._path) @@ -87,6 +92,7 @@ class TreeCache(object): """ if self._state == self.STATE_STARTED: self._state = self.STATE_CLOSED + self._task_queue.put(self._STOP) self._client.remove_listener(self._session_watcher) with handle_exception(self._error_listeners): self._root.on_deleted() @@ -168,7 +174,16 @@ class TreeCache(object): listener(event) def _in_background(self, func, *args, **kwargs): - self._client.handler.callback_queue.put(lambda: func(*args, **kwargs)) + self._task_queue.put((func, args, kwargs)) + + def _do_background(self): + while True: + with handle_exception(self._error_listeners): + cb = self._task_queue.get() + if cb is self._STOP: + break + func, args, kwargs = cb + func(*args, **kwargs) def _session_watcher(self, state): if state == KazooState.SUSPENDED: |