summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorbrian.quinlan <devnull@localhost>2009-05-08 19:44:02 +0000
committerbrian.quinlan <devnull@localhost>2009-05-08 19:44:02 +0000
commit0a24721b25c89aed65030b940b19ac5fc2c27aaa (patch)
treedb2f4d1b5f8167c60684d1cbaa3dfd6da1952275
parent18bb9b9bdb0889127e2ffeb6a6893bf0e5f38bf1 (diff)
downloadfutures-0a24721b25c89aed65030b940b19ac5fc2c27aaa.tar.gz
Tests work again and wait works correctly if some futures are already complete.
-rw-r--r--futures/__init__.py5
-rw-r--r--futures/_base.py258
-rw-r--r--futures/process.py28
-rw-r--r--futures/thread.py46
-rw-r--r--test_futures.py433
5 files changed, 524 insertions, 246 deletions
diff --git a/futures/__init__.py b/futures/__init__.py
index 33a3e38..5f599ad 100644
--- a/futures/__init__.py
+++ b/futures/__init__.py
@@ -1,3 +1,6 @@
-from futures._base import CancelledException, TimeoutException, Future, FutureList, FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED, RETURN_IMMEDIATELY
+from futures._base import (FIRST_COMPLETED, FIRST_EXCEPTION,
+ ALL_COMPLETED, RETURN_IMMEDIATELY,
+ CancelledError, TimeoutError,
+ Future, FutureList)
from futures.thread import ThreadPoolExecutor
from futures.process import ProcessPoolExecutor
diff --git a/futures/_base.py b/futures/_base.py
index d7d3337..1debfbb 100644
--- a/futures/_base.py
+++ b/futures/_base.py
@@ -1,26 +1,61 @@
+import functools
+import logging
import threading
+import time
FIRST_COMPLETED = 0
FIRST_EXCEPTION = 1
ALL_COMPLETED = 2
RETURN_IMMEDIATELY = 3
+# Possible future states
PENDING = 0
RUNNING = 1
-CANCELLED = 2
-FINISHED = 3
+CANCELLED = 2 # The future was cancelled...
+CANCELLED_AND_NOTIFIED = 3 # ...and .add_cancelled() was called.
+FINISHED = 4
+
+FUTURE_STATES = [
+ PENDING,
+ RUNNING,
+ CANCELLED,
+ CANCELLED_AND_NOTIFIED,
+ FINISHED
+]
_STATE_TO_DESCRIPTION_MAP = {
PENDING: "pending",
RUNNING: "running",
CANCELLED: "cancelled",
+ CANCELLED_AND_NOTIFIED: "cancelled",
FINISHED: "finished"
}
-class CancelledException(Exception):
+LOGGER = logging.getLogger("futures")
+_handler = logging.StreamHandler()
+LOGGER.addHandler(_handler)
+del _handler
+
+def set_future_exception(future, event_sink, exception):
+ with future._condition:
+ future._exception = exception
+ with event_sink._condition:
+ future._state = FINISHED
+ event_sink.add_exception()
+ future._condition.notify_all()
+
+def set_future_result(future, event_sink, result):
+ with future._condition:
+ future._result = result
+ with event_sink._condition:
+ future._state = FINISHED
+ event_sink.add_result()
+ future._condition.notify_all()
+
+class CancelledError(Exception):
pass
-class TimeoutException(Exception):
+class TimeoutError(Exception):
pass
class Future(object):
@@ -48,16 +83,18 @@ class Future(object):
if self._state in [RUNNING, FINISHED]:
return False
- self._state = CANCELLED
+ if self._state not in [CANCELLED, CANCELLED_AND_NOTIFIED]:
+ self._state = CANCELLED
+ self._condition.notify_all()
return True
def cancelled(self):
with self._condition:
- return self._state == CANCELLED
+ return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]
def done(self):
with self._condition:
- return self._state in [CANCELLED, FINISHED]
+ return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]
def __get_result(self):
if self._exception:
@@ -67,45 +104,35 @@ class Future(object):
def result(self, timeout=None):
with self._condition:
- if self._state == CANCELLED:
- raise CancelledException()
+ if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
+ raise CancelledError()
elif self._state == FINISHED:
return self.__get_result()
self._condition.wait(timeout)
- if self._state == CANCELLED:
- raise CancelledException()
+ if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
+ raise CancelledError()
elif self._state == FINISHED:
return self.__get_result()
else:
- raise TimeoutException()
+ raise TimeoutError()
def exception(self, timeout=None):
with self._condition:
- if self._state == CANCELLED:
- raise CancelledException()
+ if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
+ raise CancelledError()
elif self._state == FINISHED:
return self._exception
self._condition.wait(timeout)
- if self._state == CANCELLED:
- raise CancelledException()
+ if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
+ raise CancelledError()
elif self._state == FINISHED:
return self._exception
else:
- raise TimeoutException()
-
-class _NullWaitTracker(object):
- def add_result(self):
- pass
-
- def add_exception(self):
- pass
-
- def add_cancelled(self):
- pass
+ raise TimeoutError()
class _FirstCompletedWaitTracker(object):
def __init__(self):
@@ -122,9 +149,9 @@ class _FirstCompletedWaitTracker(object):
class _AllCompletedWaitTracker(object):
def __init__(self, pending_calls, stop_on_exception):
- self.event = threading.Event()
self.pending_calls = pending_calls
self.stop_on_exception = stop_on_exception
+ self.event = threading.Event()
def add_result(self):
self.pending_calls -= 1
@@ -132,9 +159,10 @@ class _AllCompletedWaitTracker(object):
self.event.set()
def add_exception(self):
- self.add_result()
if self.stop_on_exception:
self.event.set()
+ else:
+ self.add_result()
def add_cancelled(self):
self.add_result()
@@ -147,85 +175,108 @@ class ThreadEventSink(object):
def add(self, e):
self._waiters.append(e)
+ def remove(self, e):
+ self._waiters.remove(e)
+
def add_result(self):
- with self._condition:
- for waiter in self._waiters:
- waiter.add_result()
+ for waiter in self._waiters:
+ waiter.add_result()
def add_exception(self):
- with self._condition:
- for waiter in self._waiters:
- waiter.add_exception()
+ for waiter in self._waiters:
+ waiter.add_exception()
def add_cancelled(self):
- with self._condition:
- for waiter in self._waiters:
- waiter.add_cancelled()
+ for waiter in self._waiters:
+ waiter.add_cancelled()
class FutureList(object):
def __init__(self, futures, event_sink):
self._futures = futures
self._event_sink = event_sink
- def wait(self, timeout=None, run_until=ALL_COMPLETED):
+ def wait(self, timeout=None, return_when=ALL_COMPLETED):
+ if return_when == RETURN_IMMEDIATELY:
+ return
+
with self._event_sink._condition:
- if all(f.done() for f in self):
+ # Make a quick exit if every future is already done. This check is
+ # necessary because, if every future is in the
+ # CANCELLED_AND_NOTIFIED or FINISHED state then the WaitTracker will
+ # never receive any
+ if all(f._state in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]
+ for f in self):
return
- if run_until == FIRST_COMPLETED:
- m = _FirstCompletedWaitTracker()
- elif run_until == FIRST_EXCEPTION:
- m = _AllCompletedWaitTracker(len(self), stop_on_exception=True)
- elif run_until == ALL_COMPLETED:
- m = _AllCompletedWaitTracker(len(self), stop_on_exception=False)
- elif run_until == RETURN_IMMEDIATELY:
- m = _NullWaitTracker()
+ if return_when == FIRST_COMPLETED:
+ completed_tracker = _FirstCompletedWaitTracker()
else:
- raise ValueError()
-
- self._event_sink.add(m)
-
- if run_until != RETURN_IMMEDIATELY:
- m.event.wait(timeout)
+ # Calculate how many events are expected before every future
+ # is complete. This can be done without holding the futures'
+ # locks because a future cannot transition itself into either
+ # of the states being looked for.
+ pending_count = sum(
+ f._state not in [CANCELLED_AND_NOTIFIED, FINISHED]
+ for f in self)
+
+ if return_when == FIRST_EXCEPTION:
+ completed_tracker = _AllCompletedWaitTracker(
+ pending_count, stop_on_exception=True)
+ elif return_when == ALL_COMPLETED:
+ completed_tracker = _AllCompletedWaitTracker(
+ pending_count, stop_on_exception=False)
+
+ self._event_sink.add(completed_tracker)
+
+ try:
+ completed_tracker.event.wait(timeout)
+ finally:
+ self._event_sink.remove(completed_tracker)
def cancel(self, timeout=None):
for f in self:
f.cancel()
- self.wait(timeout=timeout, run_until=ALL_COMPLETED)
+ self.wait(timeout=timeout, return_when=ALL_COMPLETED)
if any(not f.done() for f in self):
- raise TimeoutException()
+ raise TimeoutError()
def has_running_futures(self):
- return bool(self.running_futures())
+ return any(self.running_futures())
def has_cancelled_futures(self):
- return bool(self.cancelled_futures())
+ return any(self.cancelled_futures())
def has_done_futures(self):
- return bool(self.done_futures())
+ return any(self.done_futures())
def has_successful_futures(self):
- return bool(self.successful_futures())
+ return any(self.successful_futures())
def has_exception_futures(self):
- return bool(self.exception_futures())
-
- def running_futures(self):
- return [f for f in self if not f.done() and not f.cancelled()]
-
+ return any(self.exception_futures())
+
+ def has_running_futures(self):
+ return any(self.running_futures())
+
def cancelled_futures(self):
- return [f for f in self if f.cancelled()]
+ return (f for f in self
+ if f._state in [CANCELLED, CANCELLED_AND_NOTIFIED])
def done_futures(self):
- return [f for f in self if f.done()]
+ return (f for f in self
+ if f._state in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED])
def successful_futures(self):
- return [f for f in self
- if f.done() and not f.cancelled() and f.exception() is None]
+ return (f for f in self
+ if f._state == FINISHED and f._exception is None)
def exception_futures(self):
- return [f for f in self if f.done() and f.exception() is not None]
+ return (f for f in self
+ if f._state == FINISHED and f._exception is not None)
+ def running_futures(self):
+ return (f for f in self if f._state == RUNNING)
+
def __getitem__(self, i):
return self._futures[i]
@@ -239,23 +290,64 @@ class FutureList(object):
return f in self._futures
def __repr__(self):
+ states = {state: 0 for state in FUTURE_STATES}
+ for f in self:
+ states[f._state] += 1
+
return ('<FutureList #futures=%d '
- '[#success=%d #exception=%d #cancelled=%d]>' % (
+ '[#pending=%d #cancelled=%d #running=%d #finished=%d]>' % (
len(self),
- len(self.successful_futures()),
- len(self.exception_futures()),
- len(self.cancelled_futures())))
+ states[PENDING],
+ states[CANCELLED] + states[CANCELLED_AND_NOTIFIED],
+ states[RUNNING],
+ states[FINISHED]))
-import functools
class Executor(object):
- def map(self, fn, iter):
+ def run(self, calls, timeout=None, return_when=ALL_COMPLETED):
+ raise NotImplementedError()
+
+ def runXXX(self, calls, timeout=None):
+ """Execute the given calls and
+
+ Arguments:
+ calls: A sequence of functions that will be called without arguments
+ and whose results with be returned.
+ timeout: The maximum number of seconds to wait for the complete results.
+ None indicates that there is no timeout.
+
+ Yields:
+ The results of the given calls in the order that they are given.
+
+ Exceptions:
+ TimeoutError: if it takes more than timeout
+ """
+ if timeout is not None:
+ end_time = timeout + time.time()
+
+ fs = self.run(calls, return_when=RETURN_IMMEDIATELY)
+
+ try:
+ for future in fs:
+ if timeout is None:
+ yield future.result()
+ else:
+ yield future.result(end_time - time.time())
+ finally:
+ try:
+ fs.cancel(timeout=0)
+ except TimeoutError:
+ pass
+
+ def map(self, fn, iter, timeout=None):
calls = [functools.partial(fn, a) for a in iter]
- return self.runXXX(calls)
+ return self.runXXX(calls, timeout)
- def runXXX(self, calls):
- fs = self.run(calls, timeout=None, run_until=FIRST_EXCEPTION)
+ def shutdown(self):
+ raise NotImplementedError()
- if fs.has_exception_futures():
- raise fs.exception_futures()[0].exception()
- else:
- return [f.result() for f in fs]
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ self.shutdown()
+ return False
diff --git a/futures/process.py b/futures/process.py
index 499eeb6..06a8c25 100644
--- a/futures/process.py
+++ b/futures/process.py
@@ -1,6 +1,12 @@
#!/usr/bin/env python
-from futures._base import RUNNING, FINISHED, Executor, ALL_COMPLETED, ThreadEventSink, Future, FutureList
+from futures._base import (PENDING, RUNNING, CANCELLED,
+ CANCELLED_AND_NOTIFIED, FINISHED,
+ ALL_COMPLETED,
+ LOGGER,
+ set_future_exception, set_future_result,
+ Executor, Future, FutureList,ThreadEventSink)
+
import queue
import multiprocessing
import threading
@@ -98,17 +104,13 @@ class ProcessPoolExecutor(Executor):
del self._pending_work_items[result_item.work_id]
if result_item.exception:
- with work_item.future._condition:
- work_item.future._exception = result_item.exception
- work_item.future._state = FINISHED
- work_item.future._condition.notify_all()
- work_item.completion_tracker.add_exception()
+ set_future_exception(work_item.future,
+ work_item.completion_tracker,
+ result_item.exception)
else:
- with work_item.future._condition:
- work_item.future._result = result_item.result
- work_item.future._state = FINISHED
- work_item.future._condition.notify_all()
- work_item.completion_tracker.add_result()
+ set_future_result(work_item.future,
+ work_item.completion_tracker,
+ result_item.result)
def _adjust_process_count(self):
if self._queue_management_thread is None:
@@ -127,7 +129,7 @@ class ProcessPoolExecutor(Executor):
p.start()
self._processes.add(p)
- def run(self, calls, timeout=None, run_until=ALL_COMPLETED):
+ def run(self, calls, timeout=None, return_when=ALL_COMPLETED):
with self._lock:
if self._shutdown:
raise RuntimeError()
@@ -145,7 +147,7 @@ class ProcessPoolExecutor(Executor):
self._adjust_process_count()
fl = FutureList(futures, event_sink)
- fl.wait(timeout=timeout, run_until=run_until)
+ fl.wait(timeout=timeout, return_when=return_when)
return fl
def shutdown(self):
diff --git a/futures/thread.py b/futures/thread.py
index 3f1b807..d1c1409 100644
--- a/futures/thread.py
+++ b/futures/thread.py
@@ -1,6 +1,11 @@
#!/usr/bin/env python
-from futures._base import RUNNING, FINISHED, Executor, ALL_COMPLETED, ThreadEventSink, Future, FutureList
+from futures._base import (PENDING, RUNNING, CANCELLED,
+ CANCELLED_AND_NOTIFIED, FINISHED,
+ ALL_COMPLETED,
+ LOGGER,
+ set_future_exception, set_future_result,
+ Executor, Future, FutureList,ThreadEventSink)
import queue
import threading
@@ -11,29 +16,26 @@ class _WorkItem(object):
self.completion_tracker = completion_tracker
def run(self):
- if self.future.cancelled():
- with self.future._condition:
- self.future._condition.notify_all()
- self.completion_tracker.add_cancelled()
- return
-
with self.future._condition:
- self.future._state = RUNNING
+ if self.future._state == PENDING:
+ self.future._state = RUNNING
+ elif self.future._state == CANCELLED:
+ with self.completion_tracker._condition:
+ self.future._state = CANCELLED_AND_NOTIFIED
+ self.completion_tracker.add_cancelled()
+ return
+ else:
+ LOGGER.critical('Future %s in unexpected state: %d',
+ id(self.future),
+ self.future._state)
+ return
try:
- r = self.call()
+ result = self.call()
except BaseException as e:
- with self.future._condition:
- self.future._exception = e
- self.future._state = FINISHED
- self.future._condition.notify_all()
- self.completion_tracker.add_exception()
+ set_future_exception(self.future, self.completion_tracker, e)
else:
- with self.future._condition:
- self.future._result = r
- self.future._state = FINISHED
- self.future._condition.notify_all()
- self.completion_tracker.add_result()
+ set_future_result(self.future, self.completion_tracker, result)
class ThreadPoolExecutor(Executor):
def __init__(self, max_threads):
@@ -55,7 +57,7 @@ class ThreadPoolExecutor(Executor):
else:
work_item.run()
except BaseException as e:
- print('Out e:', e)
+ LOGGER.critical('Exception in worker', exc_info=True)
def _adjust_thread_count(self):
for _ in range(len(self._threads),
@@ -65,7 +67,7 @@ class ThreadPoolExecutor(Executor):
t.start()
self._threads.add(t)
- def run(self, calls, timeout=None, run_until=ALL_COMPLETED):
+ def run(self, calls, timeout=None, return_when=ALL_COMPLETED):
with self._lock:
if self._shutdown:
raise RuntimeError()
@@ -80,7 +82,7 @@ class ThreadPoolExecutor(Executor):
self._adjust_thread_count()
fl = FutureList(futures, event_sink)
- fl.wait(timeout=timeout, run_until=run_until)
+ fl.wait(timeout=timeout, return_when=return_when)
return fl
def shutdown(self):
diff --git a/test_futures.py b/test_futures.py
index 91ac955..0093b0e 100644
--- a/test_futures.py
+++ b/test_futures.py
@@ -7,6 +7,24 @@ import time
import multiprocessing
import futures
+import futures._base
+from futures._base import (
+ PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future)
+
+def create_future(state=PENDING, exception=None, result=None):
+ f = Future()
+ f._state = state
+ f._exception = exception
+ f._result = result
+ return f
+
+PENDING_FUTURE = create_future(state=PENDING)
+RUNNING_FUTURE = create_future(state=RUNNING)
+CANCELLED_FUTURE = create_future(state=CANCELLED)
+CANCELLED_AND_NOTIFIED_FUTURE = create_future(state=CANCELLED_AND_NOTIFIED)
+EXCEPTION_FUTURE = create_future(state=FINISHED, exception=IOError())
+SUCCESSFUL_FUTURE = create_future(state=FINISHED, result=42)
+
class Call(object):
def __init__(self, manual_finish=False):
@@ -40,21 +58,6 @@ class ExceptionCall(Call):
self._can_finished.wait()
raise ZeroDivisionError()
-class FutureStub(object):
- def __init__(self, cancelled, done, exception=None):
- self._cancelled = cancelled
- self._done = done
- self._exception = exception
-
- def cancelled(self):
- return self._cancelled
-
- def done(self):
- return self._done
-
- def exception(self):
- return self._exception
-
class ShutdownTest(unittest.TestCase):
def test_run_after_shutdown(self):
self.executor = futures.ThreadPoolExecutor(max_threads=1)
@@ -73,7 +76,7 @@ class ShutdownTest(unittest.TestCase):
call3 = Call(manual_finish=True)
self.executor.run([call1, call2, call3],
- run_until=futures.RETURN_IMMEDIATELY)
+ return_when=futures.RETURN_IMMEDIATELY)
call1.wait_on_called()
call2.wait_on_called()
@@ -91,26 +94,33 @@ class ShutdownTest(unittest.TestCase):
class WaitsTest(unittest.TestCase):
def test(self):
- def aaa():
- fs.wait(run_until=futures.ALL_COMPLETED)
+ def wait_for_ALL_COMPLETED():
+ fs.wait(return_when=futures.ALL_COMPLETED)
self.assertTrue(f1.done())
self.assertTrue(f2.done())
self.assertTrue(f3.done())
self.assertTrue(f4.done())
+ all_completed.release()
- def bbb():
- fs.wait(run_until=futures.FIRST_COMPLETED)
+ def wait_for_FIRST_COMPLETED():
+ fs.wait(return_when=futures.FIRST_COMPLETED)
self.assertTrue(f1.done())
self.assertFalse(f2.done())
self.assertFalse(f3.done())
self.assertFalse(f4.done())
+ first_completed.release()
- def ccc():
- fs.wait(run_until=futures.FIRST_EXCEPTION)
+ def wait_for_FIRST_EXCEPTION():
+ fs.wait(return_when=futures.FIRST_EXCEPTION)
self.assertTrue(f1.done())
self.assertTrue(f2.done())
self.assertFalse(f3.done())
self.assertFalse(f4.done())
+ first_exception.release()
+
+ all_completed = threading.Semaphore(0)
+ first_completed = threading.Semaphore(0)
+ first_exception = threading.Semaphore(0)
call1 = Call(manual_finish=True)
call2 = ExceptionCall(manual_finish=True)
@@ -118,26 +128,26 @@ class WaitsTest(unittest.TestCase):
call4 = Call()
fs = self.executor.run([call1, call2, call3, call4],
- run_until=futures.RETURN_IMMEDIATELY)
+ return_when=futures.RETURN_IMMEDIATELY)
f1, f2, f3, f4 = fs
threads = []
- for call in [aaa, bbb, ccc] * 3:
+ for call in [wait_for_ALL_COMPLETED,
+ wait_for_FIRST_COMPLETED,
+ wait_for_FIRST_EXCEPTION]:
t = threading.Thread(target=call)
t.start()
threads.append(t)
- time.sleep(1)
+ time.sleep(1) # give threads enough time to execute wait
call1.set_can()
- time.sleep(1)
+ first_completed.acquire()
call2.set_can()
- time.sleep(1)
+ first_exception.acquire()
call3.set_can()
- time.sleep(1)
call4.set_can()
+ all_completed.acquire()
- for t in threads:
- t.join()
self.executor.shutdown()
class ThreadPoolWaitTests(WaitsTest):
@@ -156,7 +166,7 @@ class CancelTests(unittest.TestCase):
call4 = Call()
fs = executor.run([call1, call2, call3, call4],
- run_until=futures.RETURN_IMMEDIATELY)
+ return_when=futures.RETURN_IMMEDIATELY)
f1, f2, f3, f4 = fs
call1.wait_on_called()
@@ -173,13 +183,13 @@ class CancelTests(unittest.TestCase):
self.assertEqual(f4.done(), True)
call1.set_can()
- fs.wait(run_until=futures.ALL_COMPLETED)
+ fs.wait(return_when=futures.ALL_COMPLETED)
self.assertEqual(f1.result(), 42)
- self.assertRaises(futures.CancelledException, f2.result)
- self.assertRaises(futures.CancelledException, f2.exception)
+ self.assertRaises(futures.CancelledError, f2.result)
+ self.assertRaises(futures.CancelledError, f2.exception)
self.assertEqual(f3.result(), 42)
- self.assertRaises(futures.CancelledException, f4.result)
- self.assertRaises(futures.CancelledException, f4.exception)
+ self.assertRaises(futures.CancelledError, f4.result)
+ self.assertRaises(futures.CancelledError, f4.exception)
self.assertEqual(call2.called(), False)
self.assertEqual(call4.called(), False)
@@ -196,17 +206,38 @@ class CancelTests(unittest.TestCase):
call1 = Call(manual_finish=True)
call2 = Call()
- fs = executor.run([call1, call2], run_until=futures.RETURN_IMMEDIATELY)
+ fs = executor.run([call1, call2], return_when=futures.RETURN_IMMEDIATELY)
f1, f2 = fs
call1.wait_on_called()
t = threading.Thread(target=end_call)
t.start()
- self.assertRaises(futures.CancelledException, f2.result)
- self.assertRaises(futures.CancelledException, f2.exception)
+ self.assertRaises(futures.CancelledError, f2.result)
+ self.assertRaises(futures.CancelledError, f2.exception)
t.join()
executor.shutdown()
+ def test_wait_with_already_cancelled_futures(self):
+ executor = futures.ThreadPoolExecutor(max_threads=1)
+
+ call1 = Call(manual_finish=True)
+ call2 = Call()
+ call3 = Call()
+ call4 = Call()
+
+ fs = executor.run([call1, call2, call3, call4],
+ return_when=futures.RETURN_IMMEDIATELY)
+ f1, f2, f3, f4 = fs
+
+ call1.wait_on_called()
+ self.assertTrue(f2.cancel())
+ self.assertTrue(f3.cancel())
+ call1.set_can()
+ time.sleep(0.1)
+
+ fs.wait(return_when=futures.ALL_COMPLETED)
+ executor.shutdown()
+
def test_cancel_all(self):
executor = futures.ThreadPoolExecutor(max_threads=1)
@@ -216,11 +247,11 @@ class CancelTests(unittest.TestCase):
call4 = Call()
fs = executor.run([call1, call2, call3, call4],
- run_until=futures.RETURN_IMMEDIATELY)
+ return_when=futures.RETURN_IMMEDIATELY)
f1, f2, f3, f4 = fs
call1.wait_on_called()
- self.assertRaises(futures.TimeoutException, fs.cancel, timeout=0)
+ self.assertRaises(futures.TimeoutError, fs.cancel, timeout=0)
call1.set_can()
fs.cancel()
@@ -236,7 +267,7 @@ class CancelTests(unittest.TestCase):
call1 = Call(manual_finish=True)
call2 = Call()
- fs = executor.run([call1, call2], run_until=futures.RETURN_IMMEDIATELY)
+ fs = executor.run([call1, call2], return_when=futures.RETURN_IMMEDIATELY)
f1, f2 = fs
call1.wait_on_called()
@@ -245,138 +276,286 @@ class CancelTests(unittest.TestCase):
self.assertEqual(repr(f2), '<Future state=cancelled>')
executor.shutdown()
-class FutureListTests(unittest.TestCase):
- def test_cancel_states(self):
- f1 = FutureStub(cancelled=False, done=False)
- f2 = FutureStub(cancelled=False, done=True)
- f3 = FutureStub(cancelled=False, done=True, exception=IOError())
- f4 = FutureStub(cancelled=True, done=True)
+class FutureTests(unittest.TestCase):
+ def test_repr(self):
+ self.assertEqual(repr(PENDING_FUTURE), '<Future state=pending>')
+ self.assertEqual(repr(RUNNING_FUTURE), '<Future state=running>')
+ self.assertEqual(repr(CANCELLED_FUTURE), '<Future state=cancelled>')
+ self.assertEqual(repr(CANCELLED_AND_NOTIFIED_FUTURE),
+ '<Future state=cancelled>')
+ self.assertEqual(repr(EXCEPTION_FUTURE),
+ '<Future state=finished raised IOError>')
+ self.assertEqual(repr(SUCCESSFUL_FUTURE),
+ '<Future state=finished returned int>')
+
+ create_future
+
+ def test_cancel(self):
+ f1 = create_future(state=PENDING)
+ f2 = create_future(state=RUNNING)
+ f3 = create_future(state=CANCELLED)
+ f4 = create_future(state=CANCELLED_AND_NOTIFIED)
+ f5 = create_future(state=FINISHED, exception=IOError())
+ f6 = create_future(state=FINISHED, result=5)
+
+ self.assertTrue(f1.cancel())
+ self.assertEquals(f1._state, CANCELLED)
+
+ self.assertFalse(f2.cancel())
+ self.assertEquals(f2._state, RUNNING)
+
+ self.assertTrue(f3.cancel())
+ self.assertEquals(f3._state, CANCELLED)
+
+ self.assertTrue(f4.cancel())
+ self.assertEquals(f4._state, CANCELLED_AND_NOTIFIED)
+
+ self.assertFalse(f5.cancel())
+ self.assertEquals(f5._state, FINISHED)
+
+ self.assertFalse(f6.cancel())
+ self.assertEquals(f6._state, FINISHED)
+
+ def test_cancelled(self):
+ self.assertFalse(PENDING_FUTURE.cancelled())
+ self.assertFalse(RUNNING_FUTURE.cancelled())
+ self.assertTrue(CANCELLED_FUTURE.cancelled())
+ self.assertTrue(CANCELLED_AND_NOTIFIED_FUTURE.cancelled())
+ self.assertFalse(EXCEPTION_FUTURE.cancelled())
+ self.assertFalse(SUCCESSFUL_FUTURE.cancelled())
+
+ def test_done(self):
+ self.assertFalse(PENDING_FUTURE.done())
+ self.assertFalse(RUNNING_FUTURE.done())
+ self.assertTrue(CANCELLED_FUTURE.done())
+ self.assertTrue(CANCELLED_AND_NOTIFIED_FUTURE.done())
+ self.assertTrue(EXCEPTION_FUTURE.done())
+ self.assertTrue(SUCCESSFUL_FUTURE.done())
+
+ def test_result_with_timeout(self):
+ self.assertRaises(futures.TimeoutError,
+ PENDING_FUTURE.result, timeout=0)
+ self.assertRaises(futures.TimeoutError,
+ RUNNING_FUTURE.result, timeout=0)
+ self.assertRaises(futures.CancelledError,
+ CANCELLED_FUTURE.result, timeout=0)
+ self.assertRaises(futures.CancelledError,
+ CANCELLED_AND_NOTIFIED_FUTURE.result, timeout=0)
+ self.assertRaises(IOError, EXCEPTION_FUTURE.result, timeout=0)
+ self.assertEqual(SUCCESSFUL_FUTURE.result(timeout=0), 42)
+
+ def test_result_with_success(self):
+ def notification():
+ time.sleep(0.1)
+ with f1._condition:
+ f1._state = FINISHED
+ f1._result = 42
+ f1._condition.notify_all()
+
+ f1 = create_future(state=PENDING)
+ t = threading.Thread(target=notification)
+ t.start()
+
+ self.assertEquals(f1.result(timeout=1), 42)
+
+ def test_result_with_cancel(self):
+ def notification():
+ time.sleep(0.1)
+ with f1._condition:
+ f1._state = CANCELLED
+ f1._condition.notify_all()
- fs = [f1, f2, f3, f4]
+ f1 = create_future(state=PENDING)
+ t = threading.Thread(target=notification)
+ t.start()
+
+ self.assertRaises(futures.CancelledError, f1.result, timeout=1)
+
+ def test_exception_with_timeout(self):
+ self.assertRaises(futures.TimeoutError,
+ PENDING_FUTURE.exception, timeout=0)
+ self.assertRaises(futures.TimeoutError,
+ RUNNING_FUTURE.exception, timeout=0)
+ self.assertRaises(futures.CancelledError,
+ CANCELLED_FUTURE.exception, timeout=0)
+ self.assertRaises(futures.CancelledError,
+ CANCELLED_AND_NOTIFIED_FUTURE.exception, timeout=0)
+ self.assertTrue(isinstance(EXCEPTION_FUTURE.exception(timeout=0),
+ IOError))
+ self.assertEqual(SUCCESSFUL_FUTURE.exception(timeout=0), None)
+
+class FutureListTests(unittest.TestCase):
+ def test_wait_RETURN_IMMEDIATELY(self):
+ f = futures.FutureList(futures=None, event_sink=None)
+ f.wait(return_when=futures.RETURN_IMMEDIATELY)
+
+ def test_wait_timeout(self):
+ f = futures.FutureList([PENDING_FUTURE],
+ futures._base.ThreadEventSink())
+
+ for t in [futures.FIRST_COMPLETED,
+ futures.FIRST_EXCEPTION,
+ futures.ALL_COMPLETED]:
+ f.wait(timeout=0.1, return_when=t)
+ self.assertFalse(PENDING_FUTURE.done())
+
+ def test_wait_all_done(self):
+ f = futures.FutureList([CANCELLED_FUTURE,
+ CANCELLED_AND_NOTIFIED_FUTURE,
+ SUCCESSFUL_FUTURE,
+ EXCEPTION_FUTURE],
+ futures._base.ThreadEventSink())
+
+ f.wait(return_when=futures.ALL_COMPLETED)
+
+ def test_filters(self):
+ fs = [PENDING_FUTURE,
+ RUNNING_FUTURE,
+ CANCELLED_FUTURE,
+ CANCELLED_AND_NOTIFIED_FUTURE,
+ EXCEPTION_FUTURE,
+ SUCCESSFUL_FUTURE]
f = futures.FutureList(fs, None)
- self.assertEqual(f.running_futures(), [f1])
- self.assertEqual(f.cancelled_futures(), [f4])
- self.assertEqual(f.done_futures(), [f2, f3, f4])
- self.assertEqual(f.successful_futures(), [f2])
- self.assertEqual(f.exception_futures(), [f3])
+ self.assertEqual(list(f.running_futures()), [RUNNING_FUTURE])
+ self.assertEqual(list(f.cancelled_futures()),
+ [CANCELLED_FUTURE,
+ CANCELLED_AND_NOTIFIED_FUTURE])
+ self.assertEqual(list(f.done_futures()),
+ [CANCELLED_FUTURE,
+ CANCELLED_AND_NOTIFIED_FUTURE,
+ EXCEPTION_FUTURE,
+ SUCCESSFUL_FUTURE])
+ self.assertEqual(list(f.successful_futures()),
+ [SUCCESSFUL_FUTURE])
+ self.assertEqual(list(f.exception_futures()),
+ [EXCEPTION_FUTURE])
def test_has_running_futures(self):
- f1 = FutureStub(cancelled=False, done=False)
- f2 = FutureStub(cancelled=False, done=True)
-
- self.assertTrue(
- futures.FutureList([f1], None).has_running_futures())
self.assertFalse(
- futures.FutureList([f2], None).has_running_futures())
+ futures.FutureList([PENDING_FUTURE,
+ CANCELLED_FUTURE,
+ CANCELLED_AND_NOTIFIED_FUTURE,
+ SUCCESSFUL_FUTURE,
+ EXCEPTION_FUTURE],
+ None).has_running_futures())
+ self.assertTrue(
+ futures.FutureList([RUNNING_FUTURE],
+ None).has_running_futures())
def test_has_cancelled_futures(self):
- f1 = FutureStub(cancelled=True, done=True)
- f2 = FutureStub(cancelled=False, done=True)
+ self.assertFalse(
+ futures.FutureList([PENDING_FUTURE,
+ RUNNING_FUTURE,
+ SUCCESSFUL_FUTURE,
+ EXCEPTION_FUTURE],
+ None).has_cancelled_futures())
+ self.assertTrue(
+ futures.FutureList([CANCELLED_FUTURE],
+ None).has_cancelled_futures())
self.assertTrue(
- futures.FutureList([f1], None).has_cancelled_futures())
- self.assertFalse(
- futures.FutureList([f2], None).has_cancelled_futures())
+ futures.FutureList([CANCELLED_AND_NOTIFIED_FUTURE],
+ None).has_cancelled_futures())
def test_has_done_futures(self):
- f1 = FutureStub(cancelled=True, done=True)
- f2 = FutureStub(cancelled=False, done=True)
- f3 = FutureStub(cancelled=False, done=False)
+ self.assertFalse(
+ futures.FutureList([PENDING_FUTURE,
+ RUNNING_FUTURE],
+ None).has_done_futures())
+ self.assertTrue(
+ futures.FutureList([CANCELLED_FUTURE],
+ None).has_done_futures())
self.assertTrue(
- futures.FutureList([f1], None).has_done_futures())
+ futures.FutureList([CANCELLED_AND_NOTIFIED_FUTURE],
+ None).has_done_futures())
+
self.assertTrue(
- futures.FutureList([f2], None).has_done_futures())
- self.assertFalse(
- futures.FutureList([f3], None).has_done_futures())
+ futures.FutureList([EXCEPTION_FUTURE],
+ None).has_done_futures())
+
+ self.assertTrue(
+ futures.FutureList([SUCCESSFUL_FUTURE],
+ None).has_done_futures())
def test_has_successful_futures(self):
- f1 = FutureStub(cancelled=False, done=True)
- f2 = FutureStub(cancelled=False, done=True, exception=IOError())
- f3 = FutureStub(cancelled=False, done=False)
- f4 = FutureStub(cancelled=True, done=True)
+ self.assertFalse(
+ futures.FutureList([PENDING_FUTURE,
+ RUNNING_FUTURE,
+ CANCELLED_FUTURE,
+ CANCELLED_AND_NOTIFIED_FUTURE,
+ EXCEPTION_FUTURE],
+ None).has_successful_futures())
self.assertTrue(
- futures.FutureList([f1], None).has_successful_futures())
- self.assertFalse(
- futures.FutureList([f2], None).has_successful_futures())
- self.assertFalse(
- futures.FutureList([f3], None).has_successful_futures())
- self.assertFalse(
- futures.FutureList([f4], None).has_successful_futures())
+ futures.FutureList([SUCCESSFUL_FUTURE],
+ None).has_successful_futures())
def test_has_exception_futures(self):
- f1 = FutureStub(cancelled=False, done=True)
- f2 = FutureStub(cancelled=False, done=True, exception=IOError())
- f3 = FutureStub(cancelled=False, done=False)
- f4 = FutureStub(cancelled=True, done=True)
-
self.assertFalse(
- futures.FutureList([f1], None).has_exception_futures())
+ futures.FutureList([PENDING_FUTURE,
+ RUNNING_FUTURE,
+ CANCELLED_FUTURE,
+ CANCELLED_AND_NOTIFIED_FUTURE,
+ SUCCESSFUL_FUTURE],
+ None).has_exception_futures())
+
self.assertTrue(
- futures.FutureList([f2], None).has_exception_futures())
- self.assertFalse(
- futures.FutureList([f3], None).has_exception_futures())
- self.assertFalse(
- futures.FutureList([f4], None).has_exception_futures())
+ futures.FutureList([EXCEPTION_FUTURE],
+ None).has_exception_futures())
def test_get_item(self):
- f1 = FutureStub(cancelled=False, done=False)
- f2 = FutureStub(cancelled=False, done=True)
- f3 = FutureStub(cancelled=False, done=True)
-
- fs = [f1, f2, f3]
+ fs = [PENDING_FUTURE, RUNNING_FUTURE, CANCELLED_FUTURE]
f = futures.FutureList(fs, None)
- self.assertEqual(f[0], f1)
- self.assertEqual(f[1], f2)
- self.assertEqual(f[2], f3)
+ self.assertEqual(f[0], PENDING_FUTURE)
+ self.assertEqual(f[1], RUNNING_FUTURE)
+ self.assertEqual(f[2], CANCELLED_FUTURE)
self.assertRaises(IndexError, f.__getitem__, 3)
def test_len(self):
- f1 = FutureStub(cancelled=False, done=False)
- f2 = FutureStub(cancelled=False, done=True)
- f3 = FutureStub(cancelled=False, done=True)
-
- f = futures.FutureList([f1, f2, f3], None)
+ f = futures.FutureList([PENDING_FUTURE,
+ RUNNING_FUTURE,
+ CANCELLED_FUTURE],
+ None)
self.assertEqual(len(f), 3)
def test_iter(self):
- f1 = FutureStub(cancelled=False, done=False)
- f2 = FutureStub(cancelled=False, done=True)
- f3 = FutureStub(cancelled=False, done=True)
-
- fs = [f1, f2, f3]
+ fs = [PENDING_FUTURE, RUNNING_FUTURE, CANCELLED_FUTURE]
f = futures.FutureList(fs, None)
self.assertEqual(list(iter(f)), fs)
def test_contains(self):
- f1 = FutureStub(cancelled=False, done=False)
- f2 = FutureStub(cancelled=False, done=True)
- f3 = FutureStub(cancelled=False, done=True)
-
- f = futures.FutureList([f1, f2], None)
- self.assertTrue(f1 in f)
- self.assertTrue(f2 in f)
- self.assertFalse(f3 in f)
+ f = futures.FutureList([PENDING_FUTURE,
+ RUNNING_FUTURE],
+ None)
+ self.assertTrue(PENDING_FUTURE in f)
+ self.assertTrue(RUNNING_FUTURE in f)
+ self.assertFalse(CANCELLED_FUTURE in f)
def test_repr(self):
- running = FutureStub(cancelled=False, done=False)
- result = FutureStub(cancelled=False, done=True)
- exception = FutureStub(cancelled=False, done=True, exception=IOError())
- cancelled = FutureStub(cancelled=True, done=True)
+ pending = create_future(state=PENDING)
+ cancelled = create_future(state=CANCELLED)
+ cancelled2 = create_future(state=CANCELLED_AND_NOTIFIED)
+ running = create_future(state=RUNNING)
+ finished = create_future(state=FINISHED)
f = futures.FutureList(
- [running] * 4 + [result] * 3 + [exception] * 2 + [cancelled],
+ [PENDING_FUTURE] * 4 + [CANCELLED_FUTURE] * 2 +
+ [CANCELLED_AND_NOTIFIED_FUTURE] +
+ [RUNNING_FUTURE] * 2 +
+ [SUCCESSFUL_FUTURE, EXCEPTION_FUTURE] * 3,
None)
self.assertEqual(repr(f),
- '<FutureList #futures=10 '
- '[#success=3 #exception=2 #cancelled=1]>')
+ '<FutureList #futures=15 '
+ '[#pending=4 #cancelled=3 #running=2 #finished=6]>')
+
def test_main():
test.support.run_unittest(CancelTests,
# ProcessPoolWaitTests,
ThreadPoolWaitTests,
+ FutureTests,
FutureListTests,
ShutdownTest)