diff options
author | brian.quinlan <devnull@localhost> | 2009-05-08 19:44:02 +0000 |
---|---|---|
committer | brian.quinlan <devnull@localhost> | 2009-05-08 19:44:02 +0000 |
commit | 0a24721b25c89aed65030b940b19ac5fc2c27aaa (patch) | |
tree | db2f4d1b5f8167c60684d1cbaa3dfd6da1952275 | |
parent | 18bb9b9bdb0889127e2ffeb6a6893bf0e5f38bf1 (diff) | |
download | futures-0a24721b25c89aed65030b940b19ac5fc2c27aaa.tar.gz |
Tests work again and wait works correctly if some futures are already complete.
-rw-r--r-- | futures/__init__.py | 5 | ||||
-rw-r--r-- | futures/_base.py | 258 | ||||
-rw-r--r-- | futures/process.py | 28 | ||||
-rw-r--r-- | futures/thread.py | 46 | ||||
-rw-r--r-- | test_futures.py | 433 |
5 files changed, 524 insertions, 246 deletions
diff --git a/futures/__init__.py b/futures/__init__.py index 33a3e38..5f599ad 100644 --- a/futures/__init__.py +++ b/futures/__init__.py @@ -1,3 +1,6 @@ -from futures._base import CancelledException, TimeoutException, Future, FutureList, FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED, RETURN_IMMEDIATELY +from futures._base import (FIRST_COMPLETED, FIRST_EXCEPTION, + ALL_COMPLETED, RETURN_IMMEDIATELY, + CancelledError, TimeoutError, + Future, FutureList) from futures.thread import ThreadPoolExecutor from futures.process import ProcessPoolExecutor diff --git a/futures/_base.py b/futures/_base.py index d7d3337..1debfbb 100644 --- a/futures/_base.py +++ b/futures/_base.py @@ -1,26 +1,61 @@ +import functools +import logging import threading +import time FIRST_COMPLETED = 0 FIRST_EXCEPTION = 1 ALL_COMPLETED = 2 RETURN_IMMEDIATELY = 3 +# Possible future states PENDING = 0 RUNNING = 1 -CANCELLED = 2 -FINISHED = 3 +CANCELLED = 2 # The future was cancelled... +CANCELLED_AND_NOTIFIED = 3 # ...and .add_cancelled() was called. +FINISHED = 4 + +FUTURE_STATES = [ + PENDING, + RUNNING, + CANCELLED, + CANCELLED_AND_NOTIFIED, + FINISHED +] _STATE_TO_DESCRIPTION_MAP = { PENDING: "pending", RUNNING: "running", CANCELLED: "cancelled", + CANCELLED_AND_NOTIFIED: "cancelled", FINISHED: "finished" } -class CancelledException(Exception): +LOGGER = logging.getLogger("futures") +_handler = logging.StreamHandler() +LOGGER.addHandler(_handler) +del _handler + +def set_future_exception(future, event_sink, exception): + with future._condition: + future._exception = exception + with event_sink._condition: + future._state = FINISHED + event_sink.add_exception() + future._condition.notify_all() + +def set_future_result(future, event_sink, result): + with future._condition: + future._result = result + with event_sink._condition: + future._state = FINISHED + event_sink.add_result() + future._condition.notify_all() + +class CancelledError(Exception): pass -class TimeoutException(Exception): +class TimeoutError(Exception): pass class Future(object): @@ -48,16 +83,18 @@ class Future(object): if self._state in [RUNNING, FINISHED]: return False - self._state = CANCELLED + if self._state not in [CANCELLED, CANCELLED_AND_NOTIFIED]: + self._state = CANCELLED + self._condition.notify_all() return True def cancelled(self): with self._condition: - return self._state == CANCELLED + return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED] def done(self): with self._condition: - return self._state in [CANCELLED, FINISHED] + return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED] def __get_result(self): if self._exception: @@ -67,45 +104,35 @@ class Future(object): def result(self, timeout=None): with self._condition: - if self._state == CANCELLED: - raise CancelledException() + if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: + raise CancelledError() elif self._state == FINISHED: return self.__get_result() self._condition.wait(timeout) - if self._state == CANCELLED: - raise CancelledException() + if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: + raise CancelledError() elif self._state == FINISHED: return self.__get_result() else: - raise TimeoutException() + raise TimeoutError() def exception(self, timeout=None): with self._condition: - if self._state == CANCELLED: - raise CancelledException() + if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: + raise CancelledError() elif self._state == FINISHED: return self._exception self._condition.wait(timeout) - if self._state == CANCELLED: - raise CancelledException() + if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: + raise CancelledError() elif self._state == FINISHED: return self._exception else: - raise TimeoutException() - -class _NullWaitTracker(object): - def add_result(self): - pass - - def add_exception(self): - pass - - def add_cancelled(self): - pass + raise TimeoutError() class _FirstCompletedWaitTracker(object): def __init__(self): @@ -122,9 +149,9 @@ class _FirstCompletedWaitTracker(object): class _AllCompletedWaitTracker(object): def __init__(self, pending_calls, stop_on_exception): - self.event = threading.Event() self.pending_calls = pending_calls self.stop_on_exception = stop_on_exception + self.event = threading.Event() def add_result(self): self.pending_calls -= 1 @@ -132,9 +159,10 @@ class _AllCompletedWaitTracker(object): self.event.set() def add_exception(self): - self.add_result() if self.stop_on_exception: self.event.set() + else: + self.add_result() def add_cancelled(self): self.add_result() @@ -147,85 +175,108 @@ class ThreadEventSink(object): def add(self, e): self._waiters.append(e) + def remove(self, e): + self._waiters.remove(e) + def add_result(self): - with self._condition: - for waiter in self._waiters: - waiter.add_result() + for waiter in self._waiters: + waiter.add_result() def add_exception(self): - with self._condition: - for waiter in self._waiters: - waiter.add_exception() + for waiter in self._waiters: + waiter.add_exception() def add_cancelled(self): - with self._condition: - for waiter in self._waiters: - waiter.add_cancelled() + for waiter in self._waiters: + waiter.add_cancelled() class FutureList(object): def __init__(self, futures, event_sink): self._futures = futures self._event_sink = event_sink - def wait(self, timeout=None, run_until=ALL_COMPLETED): + def wait(self, timeout=None, return_when=ALL_COMPLETED): + if return_when == RETURN_IMMEDIATELY: + return + with self._event_sink._condition: - if all(f.done() for f in self): + # Make a quick exit if every future is already done. This check is + # necessary because, if every future is in the + # CANCELLED_AND_NOTIFIED or FINISHED state then the WaitTracker will + # never receive any + if all(f._state in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED] + for f in self): return - if run_until == FIRST_COMPLETED: - m = _FirstCompletedWaitTracker() - elif run_until == FIRST_EXCEPTION: - m = _AllCompletedWaitTracker(len(self), stop_on_exception=True) - elif run_until == ALL_COMPLETED: - m = _AllCompletedWaitTracker(len(self), stop_on_exception=False) - elif run_until == RETURN_IMMEDIATELY: - m = _NullWaitTracker() + if return_when == FIRST_COMPLETED: + completed_tracker = _FirstCompletedWaitTracker() else: - raise ValueError() - - self._event_sink.add(m) - - if run_until != RETURN_IMMEDIATELY: - m.event.wait(timeout) + # Calculate how many events are expected before every future + # is complete. This can be done without holding the futures' + # locks because a future cannot transition itself into either + # of the states being looked for. + pending_count = sum( + f._state not in [CANCELLED_AND_NOTIFIED, FINISHED] + for f in self) + + if return_when == FIRST_EXCEPTION: + completed_tracker = _AllCompletedWaitTracker( + pending_count, stop_on_exception=True) + elif return_when == ALL_COMPLETED: + completed_tracker = _AllCompletedWaitTracker( + pending_count, stop_on_exception=False) + + self._event_sink.add(completed_tracker) + + try: + completed_tracker.event.wait(timeout) + finally: + self._event_sink.remove(completed_tracker) def cancel(self, timeout=None): for f in self: f.cancel() - self.wait(timeout=timeout, run_until=ALL_COMPLETED) + self.wait(timeout=timeout, return_when=ALL_COMPLETED) if any(not f.done() for f in self): - raise TimeoutException() + raise TimeoutError() def has_running_futures(self): - return bool(self.running_futures()) + return any(self.running_futures()) def has_cancelled_futures(self): - return bool(self.cancelled_futures()) + return any(self.cancelled_futures()) def has_done_futures(self): - return bool(self.done_futures()) + return any(self.done_futures()) def has_successful_futures(self): - return bool(self.successful_futures()) + return any(self.successful_futures()) def has_exception_futures(self): - return bool(self.exception_futures()) - - def running_futures(self): - return [f for f in self if not f.done() and not f.cancelled()] - + return any(self.exception_futures()) + + def has_running_futures(self): + return any(self.running_futures()) + def cancelled_futures(self): - return [f for f in self if f.cancelled()] + return (f for f in self + if f._state in [CANCELLED, CANCELLED_AND_NOTIFIED]) def done_futures(self): - return [f for f in self if f.done()] + return (f for f in self + if f._state in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]) def successful_futures(self): - return [f for f in self - if f.done() and not f.cancelled() and f.exception() is None] + return (f for f in self + if f._state == FINISHED and f._exception is None) def exception_futures(self): - return [f for f in self if f.done() and f.exception() is not None] + return (f for f in self + if f._state == FINISHED and f._exception is not None) + def running_futures(self): + return (f for f in self if f._state == RUNNING) + def __getitem__(self, i): return self._futures[i] @@ -239,23 +290,64 @@ class FutureList(object): return f in self._futures def __repr__(self): + states = {state: 0 for state in FUTURE_STATES} + for f in self: + states[f._state] += 1 + return ('<FutureList #futures=%d ' - '[#success=%d #exception=%d #cancelled=%d]>' % ( + '[#pending=%d #cancelled=%d #running=%d #finished=%d]>' % ( len(self), - len(self.successful_futures()), - len(self.exception_futures()), - len(self.cancelled_futures()))) + states[PENDING], + states[CANCELLED] + states[CANCELLED_AND_NOTIFIED], + states[RUNNING], + states[FINISHED])) -import functools class Executor(object): - def map(self, fn, iter): + def run(self, calls, timeout=None, return_when=ALL_COMPLETED): + raise NotImplementedError() + + def runXXX(self, calls, timeout=None): + """Execute the given calls and + + Arguments: + calls: A sequence of functions that will be called without arguments + and whose results with be returned. + timeout: The maximum number of seconds to wait for the complete results. + None indicates that there is no timeout. + + Yields: + The results of the given calls in the order that they are given. + + Exceptions: + TimeoutError: if it takes more than timeout + """ + if timeout is not None: + end_time = timeout + time.time() + + fs = self.run(calls, return_when=RETURN_IMMEDIATELY) + + try: + for future in fs: + if timeout is None: + yield future.result() + else: + yield future.result(end_time - time.time()) + finally: + try: + fs.cancel(timeout=0) + except TimeoutError: + pass + + def map(self, fn, iter, timeout=None): calls = [functools.partial(fn, a) for a in iter] - return self.runXXX(calls) + return self.runXXX(calls, timeout) - def runXXX(self, calls): - fs = self.run(calls, timeout=None, run_until=FIRST_EXCEPTION) + def shutdown(self): + raise NotImplementedError() - if fs.has_exception_futures(): - raise fs.exception_futures()[0].exception() - else: - return [f.result() for f in fs] + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.shutdown() + return False diff --git a/futures/process.py b/futures/process.py index 499eeb6..06a8c25 100644 --- a/futures/process.py +++ b/futures/process.py @@ -1,6 +1,12 @@ #!/usr/bin/env python -from futures._base import RUNNING, FINISHED, Executor, ALL_COMPLETED, ThreadEventSink, Future, FutureList +from futures._base import (PENDING, RUNNING, CANCELLED, + CANCELLED_AND_NOTIFIED, FINISHED, + ALL_COMPLETED, + LOGGER, + set_future_exception, set_future_result, + Executor, Future, FutureList,ThreadEventSink) + import queue import multiprocessing import threading @@ -98,17 +104,13 @@ class ProcessPoolExecutor(Executor): del self._pending_work_items[result_item.work_id] if result_item.exception: - with work_item.future._condition: - work_item.future._exception = result_item.exception - work_item.future._state = FINISHED - work_item.future._condition.notify_all() - work_item.completion_tracker.add_exception() + set_future_exception(work_item.future, + work_item.completion_tracker, + result_item.exception) else: - with work_item.future._condition: - work_item.future._result = result_item.result - work_item.future._state = FINISHED - work_item.future._condition.notify_all() - work_item.completion_tracker.add_result() + set_future_result(work_item.future, + work_item.completion_tracker, + result_item.result) def _adjust_process_count(self): if self._queue_management_thread is None: @@ -127,7 +129,7 @@ class ProcessPoolExecutor(Executor): p.start() self._processes.add(p) - def run(self, calls, timeout=None, run_until=ALL_COMPLETED): + def run(self, calls, timeout=None, return_when=ALL_COMPLETED): with self._lock: if self._shutdown: raise RuntimeError() @@ -145,7 +147,7 @@ class ProcessPoolExecutor(Executor): self._adjust_process_count() fl = FutureList(futures, event_sink) - fl.wait(timeout=timeout, run_until=run_until) + fl.wait(timeout=timeout, return_when=return_when) return fl def shutdown(self): diff --git a/futures/thread.py b/futures/thread.py index 3f1b807..d1c1409 100644 --- a/futures/thread.py +++ b/futures/thread.py @@ -1,6 +1,11 @@ #!/usr/bin/env python -from futures._base import RUNNING, FINISHED, Executor, ALL_COMPLETED, ThreadEventSink, Future, FutureList +from futures._base import (PENDING, RUNNING, CANCELLED, + CANCELLED_AND_NOTIFIED, FINISHED, + ALL_COMPLETED, + LOGGER, + set_future_exception, set_future_result, + Executor, Future, FutureList,ThreadEventSink) import queue import threading @@ -11,29 +16,26 @@ class _WorkItem(object): self.completion_tracker = completion_tracker def run(self): - if self.future.cancelled(): - with self.future._condition: - self.future._condition.notify_all() - self.completion_tracker.add_cancelled() - return - with self.future._condition: - self.future._state = RUNNING + if self.future._state == PENDING: + self.future._state = RUNNING + elif self.future._state == CANCELLED: + with self.completion_tracker._condition: + self.future._state = CANCELLED_AND_NOTIFIED + self.completion_tracker.add_cancelled() + return + else: + LOGGER.critical('Future %s in unexpected state: %d', + id(self.future), + self.future._state) + return try: - r = self.call() + result = self.call() except BaseException as e: - with self.future._condition: - self.future._exception = e - self.future._state = FINISHED - self.future._condition.notify_all() - self.completion_tracker.add_exception() + set_future_exception(self.future, self.completion_tracker, e) else: - with self.future._condition: - self.future._result = r - self.future._state = FINISHED - self.future._condition.notify_all() - self.completion_tracker.add_result() + set_future_result(self.future, self.completion_tracker, result) class ThreadPoolExecutor(Executor): def __init__(self, max_threads): @@ -55,7 +57,7 @@ class ThreadPoolExecutor(Executor): else: work_item.run() except BaseException as e: - print('Out e:', e) + LOGGER.critical('Exception in worker', exc_info=True) def _adjust_thread_count(self): for _ in range(len(self._threads), @@ -65,7 +67,7 @@ class ThreadPoolExecutor(Executor): t.start() self._threads.add(t) - def run(self, calls, timeout=None, run_until=ALL_COMPLETED): + def run(self, calls, timeout=None, return_when=ALL_COMPLETED): with self._lock: if self._shutdown: raise RuntimeError() @@ -80,7 +82,7 @@ class ThreadPoolExecutor(Executor): self._adjust_thread_count() fl = FutureList(futures, event_sink) - fl.wait(timeout=timeout, run_until=run_until) + fl.wait(timeout=timeout, return_when=return_when) return fl def shutdown(self): diff --git a/test_futures.py b/test_futures.py index 91ac955..0093b0e 100644 --- a/test_futures.py +++ b/test_futures.py @@ -7,6 +7,24 @@ import time import multiprocessing import futures +import futures._base +from futures._base import ( + PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future) + +def create_future(state=PENDING, exception=None, result=None): + f = Future() + f._state = state + f._exception = exception + f._result = result + return f + +PENDING_FUTURE = create_future(state=PENDING) +RUNNING_FUTURE = create_future(state=RUNNING) +CANCELLED_FUTURE = create_future(state=CANCELLED) +CANCELLED_AND_NOTIFIED_FUTURE = create_future(state=CANCELLED_AND_NOTIFIED) +EXCEPTION_FUTURE = create_future(state=FINISHED, exception=IOError()) +SUCCESSFUL_FUTURE = create_future(state=FINISHED, result=42) + class Call(object): def __init__(self, manual_finish=False): @@ -40,21 +58,6 @@ class ExceptionCall(Call): self._can_finished.wait() raise ZeroDivisionError() -class FutureStub(object): - def __init__(self, cancelled, done, exception=None): - self._cancelled = cancelled - self._done = done - self._exception = exception - - def cancelled(self): - return self._cancelled - - def done(self): - return self._done - - def exception(self): - return self._exception - class ShutdownTest(unittest.TestCase): def test_run_after_shutdown(self): self.executor = futures.ThreadPoolExecutor(max_threads=1) @@ -73,7 +76,7 @@ class ShutdownTest(unittest.TestCase): call3 = Call(manual_finish=True) self.executor.run([call1, call2, call3], - run_until=futures.RETURN_IMMEDIATELY) + return_when=futures.RETURN_IMMEDIATELY) call1.wait_on_called() call2.wait_on_called() @@ -91,26 +94,33 @@ class ShutdownTest(unittest.TestCase): class WaitsTest(unittest.TestCase): def test(self): - def aaa(): - fs.wait(run_until=futures.ALL_COMPLETED) + def wait_for_ALL_COMPLETED(): + fs.wait(return_when=futures.ALL_COMPLETED) self.assertTrue(f1.done()) self.assertTrue(f2.done()) self.assertTrue(f3.done()) self.assertTrue(f4.done()) + all_completed.release() - def bbb(): - fs.wait(run_until=futures.FIRST_COMPLETED) + def wait_for_FIRST_COMPLETED(): + fs.wait(return_when=futures.FIRST_COMPLETED) self.assertTrue(f1.done()) self.assertFalse(f2.done()) self.assertFalse(f3.done()) self.assertFalse(f4.done()) + first_completed.release() - def ccc(): - fs.wait(run_until=futures.FIRST_EXCEPTION) + def wait_for_FIRST_EXCEPTION(): + fs.wait(return_when=futures.FIRST_EXCEPTION) self.assertTrue(f1.done()) self.assertTrue(f2.done()) self.assertFalse(f3.done()) self.assertFalse(f4.done()) + first_exception.release() + + all_completed = threading.Semaphore(0) + first_completed = threading.Semaphore(0) + first_exception = threading.Semaphore(0) call1 = Call(manual_finish=True) call2 = ExceptionCall(manual_finish=True) @@ -118,26 +128,26 @@ class WaitsTest(unittest.TestCase): call4 = Call() fs = self.executor.run([call1, call2, call3, call4], - run_until=futures.RETURN_IMMEDIATELY) + return_when=futures.RETURN_IMMEDIATELY) f1, f2, f3, f4 = fs threads = [] - for call in [aaa, bbb, ccc] * 3: + for call in [wait_for_ALL_COMPLETED, + wait_for_FIRST_COMPLETED, + wait_for_FIRST_EXCEPTION]: t = threading.Thread(target=call) t.start() threads.append(t) - time.sleep(1) + time.sleep(1) # give threads enough time to execute wait call1.set_can() - time.sleep(1) + first_completed.acquire() call2.set_can() - time.sleep(1) + first_exception.acquire() call3.set_can() - time.sleep(1) call4.set_can() + all_completed.acquire() - for t in threads: - t.join() self.executor.shutdown() class ThreadPoolWaitTests(WaitsTest): @@ -156,7 +166,7 @@ class CancelTests(unittest.TestCase): call4 = Call() fs = executor.run([call1, call2, call3, call4], - run_until=futures.RETURN_IMMEDIATELY) + return_when=futures.RETURN_IMMEDIATELY) f1, f2, f3, f4 = fs call1.wait_on_called() @@ -173,13 +183,13 @@ class CancelTests(unittest.TestCase): self.assertEqual(f4.done(), True) call1.set_can() - fs.wait(run_until=futures.ALL_COMPLETED) + fs.wait(return_when=futures.ALL_COMPLETED) self.assertEqual(f1.result(), 42) - self.assertRaises(futures.CancelledException, f2.result) - self.assertRaises(futures.CancelledException, f2.exception) + self.assertRaises(futures.CancelledError, f2.result) + self.assertRaises(futures.CancelledError, f2.exception) self.assertEqual(f3.result(), 42) - self.assertRaises(futures.CancelledException, f4.result) - self.assertRaises(futures.CancelledException, f4.exception) + self.assertRaises(futures.CancelledError, f4.result) + self.assertRaises(futures.CancelledError, f4.exception) self.assertEqual(call2.called(), False) self.assertEqual(call4.called(), False) @@ -196,17 +206,38 @@ class CancelTests(unittest.TestCase): call1 = Call(manual_finish=True) call2 = Call() - fs = executor.run([call1, call2], run_until=futures.RETURN_IMMEDIATELY) + fs = executor.run([call1, call2], return_when=futures.RETURN_IMMEDIATELY) f1, f2 = fs call1.wait_on_called() t = threading.Thread(target=end_call) t.start() - self.assertRaises(futures.CancelledException, f2.result) - self.assertRaises(futures.CancelledException, f2.exception) + self.assertRaises(futures.CancelledError, f2.result) + self.assertRaises(futures.CancelledError, f2.exception) t.join() executor.shutdown() + def test_wait_with_already_cancelled_futures(self): + executor = futures.ThreadPoolExecutor(max_threads=1) + + call1 = Call(manual_finish=True) + call2 = Call() + call3 = Call() + call4 = Call() + + fs = executor.run([call1, call2, call3, call4], + return_when=futures.RETURN_IMMEDIATELY) + f1, f2, f3, f4 = fs + + call1.wait_on_called() + self.assertTrue(f2.cancel()) + self.assertTrue(f3.cancel()) + call1.set_can() + time.sleep(0.1) + + fs.wait(return_when=futures.ALL_COMPLETED) + executor.shutdown() + def test_cancel_all(self): executor = futures.ThreadPoolExecutor(max_threads=1) @@ -216,11 +247,11 @@ class CancelTests(unittest.TestCase): call4 = Call() fs = executor.run([call1, call2, call3, call4], - run_until=futures.RETURN_IMMEDIATELY) + return_when=futures.RETURN_IMMEDIATELY) f1, f2, f3, f4 = fs call1.wait_on_called() - self.assertRaises(futures.TimeoutException, fs.cancel, timeout=0) + self.assertRaises(futures.TimeoutError, fs.cancel, timeout=0) call1.set_can() fs.cancel() @@ -236,7 +267,7 @@ class CancelTests(unittest.TestCase): call1 = Call(manual_finish=True) call2 = Call() - fs = executor.run([call1, call2], run_until=futures.RETURN_IMMEDIATELY) + fs = executor.run([call1, call2], return_when=futures.RETURN_IMMEDIATELY) f1, f2 = fs call1.wait_on_called() @@ -245,138 +276,286 @@ class CancelTests(unittest.TestCase): self.assertEqual(repr(f2), '<Future state=cancelled>') executor.shutdown() -class FutureListTests(unittest.TestCase): - def test_cancel_states(self): - f1 = FutureStub(cancelled=False, done=False) - f2 = FutureStub(cancelled=False, done=True) - f3 = FutureStub(cancelled=False, done=True, exception=IOError()) - f4 = FutureStub(cancelled=True, done=True) +class FutureTests(unittest.TestCase): + def test_repr(self): + self.assertEqual(repr(PENDING_FUTURE), '<Future state=pending>') + self.assertEqual(repr(RUNNING_FUTURE), '<Future state=running>') + self.assertEqual(repr(CANCELLED_FUTURE), '<Future state=cancelled>') + self.assertEqual(repr(CANCELLED_AND_NOTIFIED_FUTURE), + '<Future state=cancelled>') + self.assertEqual(repr(EXCEPTION_FUTURE), + '<Future state=finished raised IOError>') + self.assertEqual(repr(SUCCESSFUL_FUTURE), + '<Future state=finished returned int>') + + create_future + + def test_cancel(self): + f1 = create_future(state=PENDING) + f2 = create_future(state=RUNNING) + f3 = create_future(state=CANCELLED) + f4 = create_future(state=CANCELLED_AND_NOTIFIED) + f5 = create_future(state=FINISHED, exception=IOError()) + f6 = create_future(state=FINISHED, result=5) + + self.assertTrue(f1.cancel()) + self.assertEquals(f1._state, CANCELLED) + + self.assertFalse(f2.cancel()) + self.assertEquals(f2._state, RUNNING) + + self.assertTrue(f3.cancel()) + self.assertEquals(f3._state, CANCELLED) + + self.assertTrue(f4.cancel()) + self.assertEquals(f4._state, CANCELLED_AND_NOTIFIED) + + self.assertFalse(f5.cancel()) + self.assertEquals(f5._state, FINISHED) + + self.assertFalse(f6.cancel()) + self.assertEquals(f6._state, FINISHED) + + def test_cancelled(self): + self.assertFalse(PENDING_FUTURE.cancelled()) + self.assertFalse(RUNNING_FUTURE.cancelled()) + self.assertTrue(CANCELLED_FUTURE.cancelled()) + self.assertTrue(CANCELLED_AND_NOTIFIED_FUTURE.cancelled()) + self.assertFalse(EXCEPTION_FUTURE.cancelled()) + self.assertFalse(SUCCESSFUL_FUTURE.cancelled()) + + def test_done(self): + self.assertFalse(PENDING_FUTURE.done()) + self.assertFalse(RUNNING_FUTURE.done()) + self.assertTrue(CANCELLED_FUTURE.done()) + self.assertTrue(CANCELLED_AND_NOTIFIED_FUTURE.done()) + self.assertTrue(EXCEPTION_FUTURE.done()) + self.assertTrue(SUCCESSFUL_FUTURE.done()) + + def test_result_with_timeout(self): + self.assertRaises(futures.TimeoutError, + PENDING_FUTURE.result, timeout=0) + self.assertRaises(futures.TimeoutError, + RUNNING_FUTURE.result, timeout=0) + self.assertRaises(futures.CancelledError, + CANCELLED_FUTURE.result, timeout=0) + self.assertRaises(futures.CancelledError, + CANCELLED_AND_NOTIFIED_FUTURE.result, timeout=0) + self.assertRaises(IOError, EXCEPTION_FUTURE.result, timeout=0) + self.assertEqual(SUCCESSFUL_FUTURE.result(timeout=0), 42) + + def test_result_with_success(self): + def notification(): + time.sleep(0.1) + with f1._condition: + f1._state = FINISHED + f1._result = 42 + f1._condition.notify_all() + + f1 = create_future(state=PENDING) + t = threading.Thread(target=notification) + t.start() + + self.assertEquals(f1.result(timeout=1), 42) + + def test_result_with_cancel(self): + def notification(): + time.sleep(0.1) + with f1._condition: + f1._state = CANCELLED + f1._condition.notify_all() - fs = [f1, f2, f3, f4] + f1 = create_future(state=PENDING) + t = threading.Thread(target=notification) + t.start() + + self.assertRaises(futures.CancelledError, f1.result, timeout=1) + + def test_exception_with_timeout(self): + self.assertRaises(futures.TimeoutError, + PENDING_FUTURE.exception, timeout=0) + self.assertRaises(futures.TimeoutError, + RUNNING_FUTURE.exception, timeout=0) + self.assertRaises(futures.CancelledError, + CANCELLED_FUTURE.exception, timeout=0) + self.assertRaises(futures.CancelledError, + CANCELLED_AND_NOTIFIED_FUTURE.exception, timeout=0) + self.assertTrue(isinstance(EXCEPTION_FUTURE.exception(timeout=0), + IOError)) + self.assertEqual(SUCCESSFUL_FUTURE.exception(timeout=0), None) + +class FutureListTests(unittest.TestCase): + def test_wait_RETURN_IMMEDIATELY(self): + f = futures.FutureList(futures=None, event_sink=None) + f.wait(return_when=futures.RETURN_IMMEDIATELY) + + def test_wait_timeout(self): + f = futures.FutureList([PENDING_FUTURE], + futures._base.ThreadEventSink()) + + for t in [futures.FIRST_COMPLETED, + futures.FIRST_EXCEPTION, + futures.ALL_COMPLETED]: + f.wait(timeout=0.1, return_when=t) + self.assertFalse(PENDING_FUTURE.done()) + + def test_wait_all_done(self): + f = futures.FutureList([CANCELLED_FUTURE, + CANCELLED_AND_NOTIFIED_FUTURE, + SUCCESSFUL_FUTURE, + EXCEPTION_FUTURE], + futures._base.ThreadEventSink()) + + f.wait(return_when=futures.ALL_COMPLETED) + + def test_filters(self): + fs = [PENDING_FUTURE, + RUNNING_FUTURE, + CANCELLED_FUTURE, + CANCELLED_AND_NOTIFIED_FUTURE, + EXCEPTION_FUTURE, + SUCCESSFUL_FUTURE] f = futures.FutureList(fs, None) - self.assertEqual(f.running_futures(), [f1]) - self.assertEqual(f.cancelled_futures(), [f4]) - self.assertEqual(f.done_futures(), [f2, f3, f4]) - self.assertEqual(f.successful_futures(), [f2]) - self.assertEqual(f.exception_futures(), [f3]) + self.assertEqual(list(f.running_futures()), [RUNNING_FUTURE]) + self.assertEqual(list(f.cancelled_futures()), + [CANCELLED_FUTURE, + CANCELLED_AND_NOTIFIED_FUTURE]) + self.assertEqual(list(f.done_futures()), + [CANCELLED_FUTURE, + CANCELLED_AND_NOTIFIED_FUTURE, + EXCEPTION_FUTURE, + SUCCESSFUL_FUTURE]) + self.assertEqual(list(f.successful_futures()), + [SUCCESSFUL_FUTURE]) + self.assertEqual(list(f.exception_futures()), + [EXCEPTION_FUTURE]) def test_has_running_futures(self): - f1 = FutureStub(cancelled=False, done=False) - f2 = FutureStub(cancelled=False, done=True) - - self.assertTrue( - futures.FutureList([f1], None).has_running_futures()) self.assertFalse( - futures.FutureList([f2], None).has_running_futures()) + futures.FutureList([PENDING_FUTURE, + CANCELLED_FUTURE, + CANCELLED_AND_NOTIFIED_FUTURE, + SUCCESSFUL_FUTURE, + EXCEPTION_FUTURE], + None).has_running_futures()) + self.assertTrue( + futures.FutureList([RUNNING_FUTURE], + None).has_running_futures()) def test_has_cancelled_futures(self): - f1 = FutureStub(cancelled=True, done=True) - f2 = FutureStub(cancelled=False, done=True) + self.assertFalse( + futures.FutureList([PENDING_FUTURE, + RUNNING_FUTURE, + SUCCESSFUL_FUTURE, + EXCEPTION_FUTURE], + None).has_cancelled_futures()) + self.assertTrue( + futures.FutureList([CANCELLED_FUTURE], + None).has_cancelled_futures()) self.assertTrue( - futures.FutureList([f1], None).has_cancelled_futures()) - self.assertFalse( - futures.FutureList([f2], None).has_cancelled_futures()) + futures.FutureList([CANCELLED_AND_NOTIFIED_FUTURE], + None).has_cancelled_futures()) def test_has_done_futures(self): - f1 = FutureStub(cancelled=True, done=True) - f2 = FutureStub(cancelled=False, done=True) - f3 = FutureStub(cancelled=False, done=False) + self.assertFalse( + futures.FutureList([PENDING_FUTURE, + RUNNING_FUTURE], + None).has_done_futures()) + self.assertTrue( + futures.FutureList([CANCELLED_FUTURE], + None).has_done_futures()) self.assertTrue( - futures.FutureList([f1], None).has_done_futures()) + futures.FutureList([CANCELLED_AND_NOTIFIED_FUTURE], + None).has_done_futures()) + self.assertTrue( - futures.FutureList([f2], None).has_done_futures()) - self.assertFalse( - futures.FutureList([f3], None).has_done_futures()) + futures.FutureList([EXCEPTION_FUTURE], + None).has_done_futures()) + + self.assertTrue( + futures.FutureList([SUCCESSFUL_FUTURE], + None).has_done_futures()) def test_has_successful_futures(self): - f1 = FutureStub(cancelled=False, done=True) - f2 = FutureStub(cancelled=False, done=True, exception=IOError()) - f3 = FutureStub(cancelled=False, done=False) - f4 = FutureStub(cancelled=True, done=True) + self.assertFalse( + futures.FutureList([PENDING_FUTURE, + RUNNING_FUTURE, + CANCELLED_FUTURE, + CANCELLED_AND_NOTIFIED_FUTURE, + EXCEPTION_FUTURE], + None).has_successful_futures()) self.assertTrue( - futures.FutureList([f1], None).has_successful_futures()) - self.assertFalse( - futures.FutureList([f2], None).has_successful_futures()) - self.assertFalse( - futures.FutureList([f3], None).has_successful_futures()) - self.assertFalse( - futures.FutureList([f4], None).has_successful_futures()) + futures.FutureList([SUCCESSFUL_FUTURE], + None).has_successful_futures()) def test_has_exception_futures(self): - f1 = FutureStub(cancelled=False, done=True) - f2 = FutureStub(cancelled=False, done=True, exception=IOError()) - f3 = FutureStub(cancelled=False, done=False) - f4 = FutureStub(cancelled=True, done=True) - self.assertFalse( - futures.FutureList([f1], None).has_exception_futures()) + futures.FutureList([PENDING_FUTURE, + RUNNING_FUTURE, + CANCELLED_FUTURE, + CANCELLED_AND_NOTIFIED_FUTURE, + SUCCESSFUL_FUTURE], + None).has_exception_futures()) + self.assertTrue( - futures.FutureList([f2], None).has_exception_futures()) - self.assertFalse( - futures.FutureList([f3], None).has_exception_futures()) - self.assertFalse( - futures.FutureList([f4], None).has_exception_futures()) + futures.FutureList([EXCEPTION_FUTURE], + None).has_exception_futures()) def test_get_item(self): - f1 = FutureStub(cancelled=False, done=False) - f2 = FutureStub(cancelled=False, done=True) - f3 = FutureStub(cancelled=False, done=True) - - fs = [f1, f2, f3] + fs = [PENDING_FUTURE, RUNNING_FUTURE, CANCELLED_FUTURE] f = futures.FutureList(fs, None) - self.assertEqual(f[0], f1) - self.assertEqual(f[1], f2) - self.assertEqual(f[2], f3) + self.assertEqual(f[0], PENDING_FUTURE) + self.assertEqual(f[1], RUNNING_FUTURE) + self.assertEqual(f[2], CANCELLED_FUTURE) self.assertRaises(IndexError, f.__getitem__, 3) def test_len(self): - f1 = FutureStub(cancelled=False, done=False) - f2 = FutureStub(cancelled=False, done=True) - f3 = FutureStub(cancelled=False, done=True) - - f = futures.FutureList([f1, f2, f3], None) + f = futures.FutureList([PENDING_FUTURE, + RUNNING_FUTURE, + CANCELLED_FUTURE], + None) self.assertEqual(len(f), 3) def test_iter(self): - f1 = FutureStub(cancelled=False, done=False) - f2 = FutureStub(cancelled=False, done=True) - f3 = FutureStub(cancelled=False, done=True) - - fs = [f1, f2, f3] + fs = [PENDING_FUTURE, RUNNING_FUTURE, CANCELLED_FUTURE] f = futures.FutureList(fs, None) self.assertEqual(list(iter(f)), fs) def test_contains(self): - f1 = FutureStub(cancelled=False, done=False) - f2 = FutureStub(cancelled=False, done=True) - f3 = FutureStub(cancelled=False, done=True) - - f = futures.FutureList([f1, f2], None) - self.assertTrue(f1 in f) - self.assertTrue(f2 in f) - self.assertFalse(f3 in f) + f = futures.FutureList([PENDING_FUTURE, + RUNNING_FUTURE], + None) + self.assertTrue(PENDING_FUTURE in f) + self.assertTrue(RUNNING_FUTURE in f) + self.assertFalse(CANCELLED_FUTURE in f) def test_repr(self): - running = FutureStub(cancelled=False, done=False) - result = FutureStub(cancelled=False, done=True) - exception = FutureStub(cancelled=False, done=True, exception=IOError()) - cancelled = FutureStub(cancelled=True, done=True) + pending = create_future(state=PENDING) + cancelled = create_future(state=CANCELLED) + cancelled2 = create_future(state=CANCELLED_AND_NOTIFIED) + running = create_future(state=RUNNING) + finished = create_future(state=FINISHED) f = futures.FutureList( - [running] * 4 + [result] * 3 + [exception] * 2 + [cancelled], + [PENDING_FUTURE] * 4 + [CANCELLED_FUTURE] * 2 + + [CANCELLED_AND_NOTIFIED_FUTURE] + + [RUNNING_FUTURE] * 2 + + [SUCCESSFUL_FUTURE, EXCEPTION_FUTURE] * 3, None) self.assertEqual(repr(f), - '<FutureList #futures=10 ' - '[#success=3 #exception=2 #cancelled=1]>') + '<FutureList #futures=15 ' + '[#pending=4 #cancelled=3 #running=2 #finished=6]>') + def test_main(): test.support.run_unittest(CancelTests, # ProcessPoolWaitTests, ThreadPoolWaitTests, + FutureTests, FutureListTests, ShutdownTest) |