diff options
| author | Ivan A. Melnikov <imelnikov@griddynamics.com> | 2013-12-19 12:17:46 +0200 |
|---|---|---|
| committer | Ivan A. Melnikov <imelnikov@griddynamics.com> | 2013-12-19 12:28:23 +0200 |
| commit | 2409a462950dddab32db5ae587ad7fa93c4629bb (patch) | |
| tree | 8378c6dcb051c1800f2b46fd1d7c9e10a8383eba /taskflow/utils/eventlet_utils.py | |
| parent | 24172082a9780a38b34e4ac5671b27c9d28c72e4 (diff) | |
| download | taskflow-2409a462950dddab32db5ae587ad7fa93c4629bb.tar.gz | |
Add wait_for_any method to eventlet utils
This change introduces new utility funtion in eventlet utils, named
wait_for_any. It waits on one or more futures until at least some of
them completes. This is simplified version of concurrent.futures.wait
adapted to work with our green futures.
Change-Id: I2ab50c4e108ea1e9c81f618bd8fa8a8156bb695b
Diffstat (limited to 'taskflow/utils/eventlet_utils.py')
| -rw-r--r-- | taskflow/utils/eventlet_utils.py | 56 |
1 files changed, 55 insertions, 1 deletions
diff --git a/taskflow/utils/eventlet_utils.py b/taskflow/utils/eventlet_utils.py index 3d81d10..a633122 100644 --- a/taskflow/utils/eventlet_utils.py +++ b/taskflow/utils/eventlet_utils.py @@ -19,6 +19,8 @@ import logging import threading +from concurrent import futures + try: from eventlet.green import threading as gthreading from eventlet import greenpool @@ -28,7 +30,6 @@ try: except ImportError: EVENTLET_AVAILABLE = False -from concurrent import futures from taskflow.utils import lock_utils @@ -129,3 +130,56 @@ class GreenExecutor(futures.Executor): self._work_queue.put(_TOMBSTONE) if wait: self._pool.waitall() + + +class _FirstCompletedWaiter(object): + """Provides the event that wait_for_any() block on.""" + def __init__(self, is_green): + if is_green: + assert EVENTLET_AVAILABLE, 'eventlet is needed to use this feature' + self.event = gthreading.Event() + else: + self.event = threading.Event() + self.finished_futures = [] + + def add_result(self, future): + self.finished_futures.append(future) + self.event.set() + + def add_exception(self, future): + self.finished_futures.append(future) + self.event.set() + + def add_cancelled(self, future): + self.finished_futures.append(future) + self.event.set() + + +def _done_futures(fs): + return set(f for f in fs + if f._state in [futures._base.CANCELLED_AND_NOTIFIED, + futures._base.FINISHED]) + + +def wait_for_any(fs, timeout=None): + """Wait for one of the futures to complete. + + Works correctly with both green and non-green futures. + Returns pair (done, not_done). + """ + with futures._base._AcquireFutures(fs): + done = _done_futures(fs) + if done: + return done, set(fs) - done + is_green = any(isinstance(f, _GreenFuture) for f in fs) + waiter = _FirstCompletedWaiter(is_green) + for f in fs: + f._waiters.append(waiter) + + waiter.event.wait(timeout) + for f in fs: + f._waiters.remove(waiter) + + with futures._base._AcquireFutures(fs): + done = _done_futures(fs) + return done, set(fs) - done |
