summaryrefslogtreecommitdiff
path: root/taskflow/utils/eventlet_utils.py
diff options
context:
space:
mode:
authorIvan A. Melnikov <imelnikov@griddynamics.com>2013-12-19 12:17:46 +0200
committerIvan A. Melnikov <imelnikov@griddynamics.com>2013-12-19 12:28:23 +0200
commit2409a462950dddab32db5ae587ad7fa93c4629bb (patch)
tree8378c6dcb051c1800f2b46fd1d7c9e10a8383eba /taskflow/utils/eventlet_utils.py
parent24172082a9780a38b34e4ac5671b27c9d28c72e4 (diff)
downloadtaskflow-2409a462950dddab32db5ae587ad7fa93c4629bb.tar.gz
Add wait_for_any method to eventlet utils
This change introduces new utility funtion in eventlet utils, named wait_for_any. It waits on one or more futures until at least some of them completes. This is simplified version of concurrent.futures.wait adapted to work with our green futures. Change-Id: I2ab50c4e108ea1e9c81f618bd8fa8a8156bb695b
Diffstat (limited to 'taskflow/utils/eventlet_utils.py')
-rw-r--r--taskflow/utils/eventlet_utils.py56
1 files changed, 55 insertions, 1 deletions
diff --git a/taskflow/utils/eventlet_utils.py b/taskflow/utils/eventlet_utils.py
index 3d81d10..a633122 100644
--- a/taskflow/utils/eventlet_utils.py
+++ b/taskflow/utils/eventlet_utils.py
@@ -19,6 +19,8 @@
import logging
import threading
+from concurrent import futures
+
try:
from eventlet.green import threading as gthreading
from eventlet import greenpool
@@ -28,7 +30,6 @@ try:
except ImportError:
EVENTLET_AVAILABLE = False
-from concurrent import futures
from taskflow.utils import lock_utils
@@ -129,3 +130,56 @@ class GreenExecutor(futures.Executor):
self._work_queue.put(_TOMBSTONE)
if wait:
self._pool.waitall()
+
+
+class _FirstCompletedWaiter(object):
+ """Provides the event that wait_for_any() block on."""
+ def __init__(self, is_green):
+ if is_green:
+ assert EVENTLET_AVAILABLE, 'eventlet is needed to use this feature'
+ self.event = gthreading.Event()
+ else:
+ self.event = threading.Event()
+ self.finished_futures = []
+
+ def add_result(self, future):
+ self.finished_futures.append(future)
+ self.event.set()
+
+ def add_exception(self, future):
+ self.finished_futures.append(future)
+ self.event.set()
+
+ def add_cancelled(self, future):
+ self.finished_futures.append(future)
+ self.event.set()
+
+
+def _done_futures(fs):
+ return set(f for f in fs
+ if f._state in [futures._base.CANCELLED_AND_NOTIFIED,
+ futures._base.FINISHED])
+
+
+def wait_for_any(fs, timeout=None):
+ """Wait for one of the futures to complete.
+
+ Works correctly with both green and non-green futures.
+ Returns pair (done, not_done).
+ """
+ with futures._base._AcquireFutures(fs):
+ done = _done_futures(fs)
+ if done:
+ return done, set(fs) - done
+ is_green = any(isinstance(f, _GreenFuture) for f in fs)
+ waiter = _FirstCompletedWaiter(is_green)
+ for f in fs:
+ f._waiters.append(waiter)
+
+ waiter.event.wait(timeout)
+ for f in fs:
+ f._waiters.remove(waiter)
+
+ with futures._base._AcquireFutures(fs):
+ done = _done_futures(fs)
+ return done, set(fs) - done