summaryrefslogtreecommitdiff
path: root/kazoo/handlers/utils.py
diff options
context:
space:
mode:
Diffstat (limited to 'kazoo/handlers/utils.py')
-rw-r--r--kazoo/handlers/utils.py33
1 files changed, 18 insertions, 15 deletions
diff --git a/kazoo/handlers/utils.py b/kazoo/handlers/utils.py
index 2517390..bd1b92e 100644
--- a/kazoo/handlers/utils.py
+++ b/kazoo/handlers/utils.py
@@ -46,20 +46,14 @@ class AsyncResult(object):
with self._condition:
self.value = value
self._exception = None
- for callback in self._callbacks:
- self._handler.completion_queue.put(
- functools.partial(callback, self)
- )
+ self._do_callbacks()
self._condition.notify_all()
def set_exception(self, exception):
"""Store the exception. Wake up the waiters."""
with self._condition:
self._exception = exception
- for callback in self._callbacks:
- self._handler.completion_queue.put(
- functools.partial(callback, self)
- )
+ self._do_callbacks()
self._condition.notify_all()
def get(self, block=True, timeout=None):
@@ -102,16 +96,13 @@ class AsyncResult(object):
"""Register a callback to call when a value or an exception is
set"""
with self._condition:
- # Are we already set? Dispatch it now
- if self.ready():
- self._handler.completion_queue.put(
- functools.partial(callback, self)
- )
- return
-
if callback not in self._callbacks:
self._callbacks.append(callback)
+ # Are we already set? Dispatch it now
+ if self.ready():
+ self._do_callbacks()
+
def unlink(self, callback):
"""Remove the callback set by :meth:`rawlink`"""
with self._condition:
@@ -122,6 +113,18 @@ class AsyncResult(object):
if callback in self._callbacks:
self._callbacks.remove(callback)
+ def _do_callbacks(self):
+ """Execute the callbacks that were registered by :meth:`rawlink`.
+ If the handler is in running state this method only schedules
+ the calls to be performed by the handler. If it's stopped,
+ the callbacks are called right away."""
+
+ for callback in self._callbacks:
+ if self._handler.running:
+ self._handler.completion_queue.put(
+ functools.partial(callback, self))
+ else:
+ functools.partial(callback, self)()
def _set_fd_cloexec(fd):
flags = fcntl.fcntl(fd, fcntl.F_GETFD)