summaryrefslogtreecommitdiff
path: root/python3/test_futures.py
diff options
context:
space:
mode:
Diffstat (limited to 'python3/test_futures.py')
-rw-r--r--python3/test_futures.py994
1 files changed, 491 insertions, 503 deletions
diff --git a/python3/test_futures.py b/python3/test_futures.py
index b7dee65..98eea27 100644
--- a/python3/test_futures.py
+++ b/python3/test_futures.py
@@ -1,17 +1,24 @@
-import test.support
-
-import unittest
+import io
+import logging
+import multiprocessing
+import sys
import threading
+import test.support
import time
-import multiprocessing
+import unittest
+
+if sys.platform.startswith('win'):
+ import ctypes
+ import ctypes.wintypes
import futures
-import futures._base
from futures._base import (
- PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future)
+ PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future,
+ LOGGER, STDERR_HANDLER, wait)
+import futures.process
def create_future(state=PENDING, exception=None, result=None):
- f = Future(0)
+ f = Future()
f._state = state
f._exception = exception
f._result = result
@@ -24,68 +31,104 @@ CANCELLED_AND_NOTIFIED_FUTURE = create_future(state=CANCELLED_AND_NOTIFIED)
EXCEPTION_FUTURE = create_future(state=FINISHED, exception=IOError())
SUCCESSFUL_FUTURE = create_future(state=FINISHED, result=42)
+def mul(x, y):
+ return x * y
+
class Call(object):
+ """A call that can be submitted to a future.Executor for testing.
+
+ The call signals when it is called and waits for an event before finishing.
+ """
CALL_LOCKS = {}
+ def _create_event(self):
+ if sys.platform.startswith('win'):
+ class SECURITY_ATTRIBUTES(ctypes.Structure):
+ _fields_ = [("nLength", ctypes.wintypes.DWORD),
+ ("lpSecurityDescriptor", ctypes.wintypes.LPVOID),
+ ("bInheritHandle", ctypes.wintypes.BOOL)]
+
+ s = SECURITY_ATTRIBUTES()
+ s.nLength = ctypes.sizeof(s)
+ s.lpSecurityDescriptor = None
+ s.bInheritHandle = True
+
+ handle = ctypes.windll.kernel32.CreateEventA(ctypes.pointer(s),
+ True,
+ False,
+ None)
+ assert handle is not None
+ return handle
+ else:
+ event = multiprocessing.Event()
+ self.CALL_LOCKS[id(event)] = event
+ return id(event)
+
+ def _wait_on_event(self, handle):
+ if sys.platform.startswith('win'):
+ r = ctypes.windll.kernel32.WaitForSingleObject(handle, 5 * 1000)
+ assert r == 0
+ else:
+ self.CALL_LOCKS[handle].wait()
+
+ def _signal_event(self, handle):
+ if sys.platform.startswith('win'):
+ r = ctypes.windll.kernel32.SetEvent(handle)
+ assert r != 0
+ else:
+ self.CALL_LOCKS[handle].set()
+
def __init__(self, manual_finish=False, result=42):
- called_event = multiprocessing.Event()
- can_finish = multiprocessing.Event()
+ self._called_event = self._create_event()
+ self._can_finish = self._create_event()
self._result = result
- self._called_event_id = id(called_event)
- self._can_finish_event_id = id(can_finish)
-
- self.CALL_LOCKS[self._called_event_id] = called_event
- self.CALL_LOCKS[self._can_finish_event_id] = can_finish
if not manual_finish:
- self._can_finish.set()
-
- @property
- def _can_finish(self):
- return self.CALL_LOCKS[self._can_finish_event_id]
-
- @property
- def _called_event(self):
- return self.CALL_LOCKS[self._called_event_id]
+ self._signal_event(self._can_finish)
def wait_on_called(self):
- self._called_event.wait()
+ self._wait_on_event(self._called_event)
def set_can(self):
- self._can_finish.set()
-
- def called(self):
- return self._called_event.is_set()
+ self._signal_event(self._can_finish)
def __call__(self):
- if self._called_event.is_set(): print('called twice')
+ self._signal_event(self._called_event)
+ self._wait_on_event(self._can_finish)
- self._called_event.set()
- self._can_finish.wait()
return self._result
def close(self):
- del self.CALL_LOCKS[self._called_event_id]
- del self.CALL_LOCKS[self._can_finish_event_id]
+ self.set_can()
+ if sys.platform.startswith('win'):
+ ctypes.windll.kernel32.CloseHandle(self._called_event)
+ ctypes.windll.kernel32.CloseHandle(self._can_finish)
+ else:
+ del self.CALL_LOCKS[self._called_event]
+ del self.CALL_LOCKS[self._can_finish]
class ExceptionCall(Call):
def __call__(self):
- assert not self._called_event.is_set(), 'already called'
-
- self._called_event.set()
- self._can_finish.wait()
+ self._signal_event(self._called_event)
+ self._wait_on_event(self._can_finish)
raise ZeroDivisionError()
+class MapCall(Call):
+ def __init__(self, result=42):
+ super().__init__(manual_finish=True, result=result)
+
+ def __call__(self, manual_finish):
+ if manual_finish:
+ super().__call__()
+ return self._result
+
class ExecutorShutdownTest(unittest.TestCase):
def test_run_after_shutdown(self):
- call1 = Call()
- try:
- self.executor.shutdown()
- self.assertRaises(RuntimeError,
- self.executor.run_to_futures,
- [call1])
- finally:
- call1.close()
+ self.executor.shutdown()
+ self.assertRaises(RuntimeError,
+ self.executor.submit,
+ pow, 2, 5)
+
def _start_some_futures(self):
call1 = Call(manual_finish=True)
@@ -93,13 +136,14 @@ class ExecutorShutdownTest(unittest.TestCase):
call3 = Call(manual_finish=True)
try:
- self.executor.run_to_futures([call1, call2, call3],
- return_when=futures.RETURN_IMMEDIATELY)
-
+ self.executor.submit(call1)
+ self.executor.submit(call2)
+ self.executor.submit(call3)
+
call1.wait_on_called()
call2.wait_on_called()
call3.wait_on_called()
-
+
call1.set_can()
call2.set_can()
call3.set_can()
@@ -110,10 +154,10 @@ class ExecutorShutdownTest(unittest.TestCase):
class ThreadPoolShutdownTest(ExecutorShutdownTest):
def setUp(self):
- self.executor = futures.ThreadPoolExecutor(max_threads=5)
+ self.executor = futures.ThreadPoolExecutor(max_workers=5)
def tearDown(self):
- self.executor.shutdown()
+ self.executor.shutdown(wait=True)
def test_threads_terminate(self):
self._start_some_futures()
@@ -123,7 +167,7 @@ class ThreadPoolShutdownTest(ExecutorShutdownTest):
t.join()
def test_context_manager_shutdown(self):
- with futures.ThreadPoolExecutor(max_threads=5) as e:
+ with futures.ThreadPoolExecutor(max_workers=5) as e:
executor = e
self.assertEqual(list(e.map(abs, range(-5, 5))),
[5, 4, 3, 2, 1, 0, 1, 2, 3, 4])
@@ -132,7 +176,7 @@ class ThreadPoolShutdownTest(ExecutorShutdownTest):
t.join()
def test_del_shutdown(self):
- executor = futures.ThreadPoolExecutor(max_threads=5)
+ executor = futures.ThreadPoolExecutor(max_workers=5)
executor.map(abs, range(-5, 5))
threads = executor._threads
del executor
@@ -142,32 +186,31 @@ class ThreadPoolShutdownTest(ExecutorShutdownTest):
class ProcessPoolShutdownTest(ExecutorShutdownTest):
def setUp(self):
- self.executor = futures.ProcessPoolExecutor(max_processes=5)
+ self.executor = futures.ProcessPoolExecutor(max_workers=5)
def tearDown(self):
- self.executor.shutdown()
+ self.executor.shutdown(wait=True)
def test_processes_terminate(self):
self._start_some_futures()
self.assertEqual(len(self.executor._processes), 5)
+ processes = self.executor._processes
self.executor.shutdown()
- self.executor._queue_management_thread.join()
- for p in self.executor._processes:
+ for p in processes:
p.join()
def test_context_manager_shutdown(self):
- with futures.ProcessPoolExecutor(max_processes=5) as e:
+ with futures.ProcessPoolExecutor(max_workers=5) as e:
executor = e
self.assertEqual(list(e.map(abs, range(-5, 5))),
[5, 4, 3, 2, 1, 0, 1, 2, 3, 4])
- executor._queue_management_thread.join()
for p in self.executor._processes:
p.join()
def test_del_shutdown(self):
- executor = futures.ProcessPoolExecutor(max_processes=5)
+ executor = futures.ProcessPoolExecutor(max_workers=5)
list(executor.map(abs, range(-5, 5)))
queue_management_thread = executor._queue_management_thread
processes = executor._processes
@@ -177,313 +220,317 @@ class ProcessPoolShutdownTest(ExecutorShutdownTest):
for p in processes:
p.join()
-class WaitsTest(unittest.TestCase):
- def test_concurrent_waits(self):
- def wait_for_ALL_COMPLETED():
- fs.wait(return_when=futures.ALL_COMPLETED)
- self.assertTrue(f1.done())
- self.assertTrue(f2.done())
- self.assertTrue(f3.done())
- self.assertTrue(f4.done())
- all_completed.release()
-
- def wait_for_FIRST_COMPLETED():
- fs.wait(return_when=futures.FIRST_COMPLETED)
- self.assertTrue(f1.done())
- self.assertFalse(f2.done()) # XXX
- self.assertFalse(f3.done())
- self.assertFalse(f4.done())
- first_completed.release()
-
- def wait_for_FIRST_EXCEPTION():
- fs.wait(return_when=futures.FIRST_EXCEPTION)
- self.assertTrue(f1.done())
- self.assertTrue(f2.done())
- self.assertFalse(f3.done()) # XXX
- self.assertFalse(f4.done())
- first_exception.release()
-
- all_completed = threading.Semaphore(0)
- first_completed = threading.Semaphore(0)
- first_exception = threading.Semaphore(0)
+class WaitTests(unittest.TestCase):
+ def test_first_completed(self):
+ def wait_test():
+ while not future1._waiters:
+ pass
+ call1.set_can()
call1 = Call(manual_finish=True)
- call2 = ExceptionCall(manual_finish=True)
- call3 = Call(manual_finish=True)
- call4 = Call()
-
+ call2 = Call(manual_finish=True)
try:
- fs = self.executor.run_to_futures(
- [call1, call2, call3, call4],
- return_when=futures.RETURN_IMMEDIATELY)
- f1, f2, f3, f4 = fs
-
- threads = []
- for wait_test in [wait_for_ALL_COMPLETED,
- wait_for_FIRST_COMPLETED,
- wait_for_FIRST_EXCEPTION]:
- t = threading.Thread(target=wait_test)
- t.start()
- threads.append(t)
-
- time.sleep(1) # give threads enough time to execute wait
-
- call1.set_can()
- first_completed.acquire()
- call2.set_can()
- first_exception.acquire()
- call3.set_can()
- all_completed.acquire()
-
- self.executor.shutdown()
+ future1 = self.executor.submit(call1)
+ future2 = self.executor.submit(call2)
+
+ t = threading.Thread(target=wait_test)
+ t.start()
+ done, not_done = futures.wait(
+ [CANCELLED_FUTURE, future1, future2],
+ return_when=futures.FIRST_COMPLETED)
+
+ self.assertEquals(set([future1]), done)
+ self.assertEquals(set([CANCELLED_FUTURE, future2]), not_done)
finally:
call1.close()
call2.close()
- call3.close()
- call4.close()
-class ThreadPoolWaitTests(WaitsTest):
- def setUp(self):
- self.executor = futures.ThreadPoolExecutor(max_threads=1)
+ def test_first_completed_one_already_completed(self):
+ call1 = Call(manual_finish=True)
+ try:
+ future1 = self.executor.submit(call1)
- def tearDown(self):
- self.executor.shutdown()
+ finished, pending = futures.wait(
+ [SUCCESSFUL_FUTURE, future1],
+ return_when=futures.FIRST_COMPLETED)
-class ProcessPoolWaitTests(WaitsTest):
- def setUp(self):
- self.executor = futures.ProcessPoolExecutor(max_processes=1)
+ self.assertEquals(set([SUCCESSFUL_FUTURE]), finished)
+ self.assertEquals(set([future1]), pending)
+ finally:
+ call1.close()
- def tearDown(self):
- self.executor.shutdown()
+ def test_first_exception(self):
+ def wait_test():
+ while not future1._waiters:
+ pass
+ call1.set_can()
+ call2.set_can()
-class CancelTests(unittest.TestCase):
- def test_cancel_states(self):
call1 = Call(manual_finish=True)
- call2 = Call()
- call3 = Call()
- call4 = Call()
-
+ call2 = ExceptionCall(manual_finish=True)
+ call3 = Call(manual_finish=True)
try:
- fs = self.executor.run_to_futures(
- [call1, call2, call3, call4],
- return_when=futures.RETURN_IMMEDIATELY)
- f1, f2, f3, f4 = fs
-
- call1.wait_on_called()
- self.assertEqual(f1.cancel(), False)
- self.assertEqual(f2.cancel(), True)
- self.assertEqual(f4.cancel(), True)
- self.assertEqual(f1.cancelled(), False)
- self.assertEqual(f2.cancelled(), True)
- self.assertEqual(f3.cancelled(), False)
- self.assertEqual(f4.cancelled(), True)
- self.assertEqual(f1.done(), False)
- self.assertEqual(f2.done(), True)
- self.assertEqual(f3.done(), False)
- self.assertEqual(f4.done(), True)
-
- call1.set_can()
- fs.wait(return_when=futures.ALL_COMPLETED)
- self.assertEqual(f1.result(), 42)
- self.assertRaises(futures.CancelledError, f2.result)
- self.assertRaises(futures.CancelledError, f2.exception)
- self.assertEqual(f3.result(), 42)
- self.assertRaises(futures.CancelledError, f4.result)
- self.assertRaises(futures.CancelledError, f4.exception)
-
- self.assertEqual(call2.called(), False)
- self.assertEqual(call4.called(), False)
+ future1 = self.executor.submit(call1)
+ future2 = self.executor.submit(call2)
+ future3 = self.executor.submit(call3)
+
+ t = threading.Thread(target=wait_test)
+ t.start()
+ finished, pending = futures.wait(
+ [future1, future2, future3],
+ return_when=futures.FIRST_EXCEPTION)
+
+ self.assertEquals(set([future1, future2]), finished)
+ self.assertEquals(set([future3]), pending)
finally:
call1.close()
call2.close()
call3.close()
- call4.close()
- def test_wait_for_individual_cancel_while_waiting(self):
- def end_call():
- # Wait until the main thread is waiting on the results of the
- # future.
- time.sleep(1)
- f2.cancel()
+ def test_first_exception_some_already_complete(self):
+ def wait_test():
+ while not future1._waiters:
+ pass
call1.set_can()
- call1 = Call(manual_finish=True)
- call2 = Call()
-
+ call1 = ExceptionCall(manual_finish=True)
+ call2 = Call(manual_finish=True)
try:
- fs = self.executor.run_to_futures(
- [call1, call2],
- return_when=futures.RETURN_IMMEDIATELY)
- f1, f2 = fs
-
- call1.wait_on_called()
- t = threading.Thread(target=end_call)
+ future1 = self.executor.submit(call1)
+ future2 = self.executor.submit(call2)
+
+ t = threading.Thread(target=wait_test)
t.start()
- self.assertRaises(futures.CancelledError, f2.result)
- self.assertRaises(futures.CancelledError, f2.exception)
- t.join()
+ finished, pending = futures.wait(
+ [SUCCESSFUL_FUTURE,
+ CANCELLED_FUTURE,
+ CANCELLED_AND_NOTIFIED_FUTURE,
+ future1, future2],
+ return_when=futures.FIRST_EXCEPTION)
+
+ self.assertEquals(set([SUCCESSFUL_FUTURE,
+ CANCELLED_AND_NOTIFIED_FUTURE,
+ future1]), finished)
+ self.assertEquals(set([CANCELLED_FUTURE, future2]), pending)
+
+
finally:
call1.close()
call2.close()
- def test_wait_with_already_cancelled_futures(self):
+ def test_first_exception_one_already_failed(self):
call1 = Call(manual_finish=True)
- call2 = Call()
- call3 = Call()
- call4 = Call()
+ try:
+ future1 = self.executor.submit(call1)
+
+ finished, pending = futures.wait(
+ [EXCEPTION_FUTURE, future1],
+ return_when=futures.FIRST_EXCEPTION)
+ self.assertEquals(set([EXCEPTION_FUTURE]), finished)
+ self.assertEquals(set([future1]), pending)
+ finally:
+ call1.close()
+
+ def test_all_completed(self):
+ def wait_test():
+ while not future1._waiters:
+ pass
+ call1.set_can()
+ call2.set_can()
+
+ call1 = Call(manual_finish=True)
+ call2 = Call(manual_finish=True)
try:
- fs = self.executor.run_to_futures(
- [call1, call2, call3, call4],
- return_when=futures.RETURN_IMMEDIATELY)
- f1, f2, f3, f4 = fs
-
- call1.wait_on_called()
- self.assertTrue(f2.cancel())
- self.assertTrue(f3.cancel())
+ future1 = self.executor.submit(call1)
+ future2 = self.executor.submit(call2)
+
+ t = threading.Thread(target=wait_test)
+ t.start()
+ finished, pending = futures.wait(
+ [future1, future2],
+ return_when=futures.ALL_COMPLETED)
+
+ self.assertEquals(set([future1, future2]), finished)
+ self.assertEquals(set(), pending)
+
+
+ finally:
+ call1.close()
+ call2.close()
+
+ def test_all_completed_some_already_completed(self):
+ def wait_test():
+ while not future1._waiters:
+ pass
+
+ future4.cancel()
call1.set_can()
-
- fs.wait(return_when=futures.ALL_COMPLETED)
+ call2.set_can()
+ call3.set_can()
+
+ self.assertLessEqual(
+ futures.process.EXTRA_QUEUED_CALLS,
+ 1,
+ 'this test assumes that future4 will be cancelled before it is '
+ 'queued to run - which might not be the case if '
+ 'ProcessPoolExecutor is too aggresive in scheduling futures')
+ call1 = Call(manual_finish=True)
+ call2 = Call(manual_finish=True)
+ call3 = Call(manual_finish=True)
+ call4 = Call(manual_finish=True)
+ try:
+ future1 = self.executor.submit(call1)
+ future2 = self.executor.submit(call2)
+ future3 = self.executor.submit(call3)
+ future4 = self.executor.submit(call4)
+
+ t = threading.Thread(target=wait_test)
+ t.start()
+ finished, pending = futures.wait(
+ [SUCCESSFUL_FUTURE,
+ CANCELLED_AND_NOTIFIED_FUTURE,
+ future1, future2, future3, future4],
+ return_when=futures.ALL_COMPLETED)
+
+ self.assertEquals(set([SUCCESSFUL_FUTURE,
+ CANCELLED_AND_NOTIFIED_FUTURE,
+ future1, future2, future3, future4]),
+ finished)
+ self.assertEquals(set(), pending)
finally:
call1.close()
call2.close()
call3.close()
call4.close()
- def test_cancel_all(self):
- call1 = Call(manual_finish=True)
- call2 = Call()
- call3 = Call()
- call4 = Call()
+ def test_timeout(self):
+ def wait_test():
+ while not future1._waiters:
+ pass
+ call1.set_can()
+ call1 = Call(manual_finish=True)
+ call2 = Call(manual_finish=True)
try:
- fs = self.executor.run_to_futures(
- [call1, call2, call3, call4],
- return_when=futures.RETURN_IMMEDIATELY)
- f1, f2, f3, f4 = fs
-
- call1.wait_on_called()
- self.assertRaises(futures.TimeoutError, fs.cancel, timeout=0)
- call1.set_can()
- fs.cancel()
-
- self.assertFalse(f1.cancelled())
- self.assertTrue(f2.cancelled())
- self.assertTrue(f3.cancelled())
- self.assertTrue(f4.cancelled())
+ future1 = self.executor.submit(call1)
+ future2 = self.executor.submit(call2)
+
+ t = threading.Thread(target=wait_test)
+ t.start()
+ finished, pending = futures.wait(
+ [CANCELLED_AND_NOTIFIED_FUTURE,
+ EXCEPTION_FUTURE,
+ SUCCESSFUL_FUTURE,
+ future1, future2],
+ timeout=1,
+ return_when=futures.ALL_COMPLETED)
+
+ self.assertEquals(set([CANCELLED_AND_NOTIFIED_FUTURE,
+ EXCEPTION_FUTURE,
+ SUCCESSFUL_FUTURE,
+ future1]), finished)
+ self.assertEquals(set([future2]), pending)
+
+
finally:
call1.close()
call2.close()
- call3.close()
- call4.close()
-class ThreadPoolCancelTests(CancelTests):
+
+class ThreadPoolWaitTests(WaitTests):
def setUp(self):
- self.executor = futures.ThreadPoolExecutor(max_threads=1)
+ self.executor = futures.ThreadPoolExecutor(max_workers=1)
def tearDown(self):
- self.executor.shutdown()
+ self.executor.shutdown(wait=True)
-class ProcessPoolCancelTests(WaitsTest):
+class ProcessPoolWaitTests(WaitTests):
def setUp(self):
- self.executor = futures.ProcessPoolExecutor(max_processes=1)
+ self.executor = futures.ProcessPoolExecutor(max_workers=1)
def tearDown(self):
- self.executor.shutdown()
-
-class ExecutorTest(unittest.TestCase):
- # Executor.shutdown() and context manager usage is tested by
- # ExecutorShutdownTest.
- def test_run_to_futures(self):
- call1 = Call(result=1)
- call2 = Call(result=2)
- call3 = Call(manual_finish=True)
- call4 = Call()
- call5 = Call()
+ self.executor.shutdown(wait=True)
+
+class AsCompletedTests(unittest.TestCase):
+ # TODO(brian@sweetapp.com): Should have a test with a non-zero timeout.
+ def test_no_timeout(self):
+ def wait_test():
+ while not future1._waiters:
+ pass
+ call1.set_can()
+ call2.set_can()
+ call1 = Call(manual_finish=True)
+ call2 = Call(manual_finish=True)
try:
- f1, f2, f3, f4, f5 = self.executor.run_to_futures(
- [call1, call2, call3, call4, call5],
- return_when=futures.RETURN_IMMEDIATELY)
-
- call3.wait_on_called()
+ future1 = self.executor.submit(call1)
+ future2 = self.executor.submit(call2)
- # ProcessPoolExecutor uses a thread to propogate results into the
- # future. Calling result() ensures that the thread has done its work
- # before doing the next set of checks.
- f1.result()
- f2.result()
-
- self.assertTrue(f1.done())
- self.assertFalse(f1.running())
- self.assertEqual(f1.index, 0)
-
- self.assertTrue(f2.done())
- self.assertFalse(f2.running())
- self.assertEqual(f2.index, 1)
-
- self.assertFalse(f3.done())
- self.assertTrue(f3.running())
- self.assertEqual(f3.index, 2)
-
- # ProcessPoolExecutor may mark some futures as running before they
- # actually are so don't check these ones.
- self.assertFalse(f4.done())
- self.assertEqual(f4.index, 3)
-
- self.assertFalse(f5.done())
- self.assertEqual(f5.index, 4)
+ t = threading.Thread(target=wait_test)
+ t.start()
+ completed = set(futures.as_completed(
+ [CANCELLED_AND_NOTIFIED_FUTURE,
+ EXCEPTION_FUTURE,
+ SUCCESSFUL_FUTURE,
+ future1, future2]))
+ self.assertEquals(set(
+ [CANCELLED_AND_NOTIFIED_FUTURE,
+ EXCEPTION_FUTURE,
+ SUCCESSFUL_FUTURE,
+ future1, future2]),
+ completed)
finally:
- call3.set_can() # Let the call finish executing.
call1.close()
call2.close()
- call3.close()
- call4.close()
- call5.close()
- def test_run_to_results(self):
- call1 = Call(result=1)
- call2 = Call(result=2)
- call3 = Call(result=3)
+ def test_zero_timeout(self):
+ call1 = Call(manual_finish=True)
try:
- self.assertEqual(
- list(self.executor.run_to_results([call1, call2, call3])),
- [1, 2, 3])
+ future1 = self.executor.submit(call1)
+ completed_futures = set()
+ try:
+ for future in futures.as_completed(
+ [CANCELLED_AND_NOTIFIED_FUTURE,
+ EXCEPTION_FUTURE,
+ SUCCESSFUL_FUTURE,
+ future1],
+ timeout=0):
+ completed_futures.add(future)
+ except futures.TimeoutError:
+ pass
+
+ self.assertEquals(set([CANCELLED_AND_NOTIFIED_FUTURE,
+ EXCEPTION_FUTURE,
+ SUCCESSFUL_FUTURE]),
+ completed_futures)
finally:
call1.close()
- call2.close()
- call3.close()
- def test_run_to_results_exception(self):
- call1 = Call(result=1)
- call2 = Call(result=2)
- call3 = ExceptionCall()
- try:
- i = self.executor.run_to_results([call1, call2, call3])
-
- self.assertEqual(i.__next__(), 1)
- self.assertEqual(i.__next__(), 2)
- self.assertRaises(ZeroDivisionError, i.__next__)
- finally:
- call1.close()
- call2.close()
- call3.close()
+class ThreadPoolAsCompletedTests(AsCompletedTests):
+ def setUp(self):
+ self.executor = futures.ThreadPoolExecutor(max_workers=1)
- def test_run_to_results_timeout(self):
- call1 = Call(result=1)
- call2 = Call(result=2)
- call3 = Call(manual_finish=True)
+ def tearDown(self):
+ self.executor.shutdown(wait=True)
- try:
- i = self.executor.run_to_results([call1, call2, call3], timeout=1)
- self.assertEqual(i.__next__(), 1)
- self.assertEqual(i.__next__(), 2)
- self.assertRaises(futures.TimeoutError, i.__next__)
- call3.set_can()
- finally:
- call1.close()
- call2.close()
- call3.close()
+class ProcessPoolAsCompletedTests(AsCompletedTests):
+ def setUp(self):
+ self.executor = futures.ProcessPoolExecutor(max_workers=1)
+
+ def tearDown(self):
+ self.executor.shutdown(wait=True)
+
+class ExecutorTest(unittest.TestCase):
+ # Executor.shutdown() and context manager usage is tested by
+ # ExecutorShutdownTest.
+ def test_submit(self):
+ future = self.executor.submit(pow, 2, 8)
+ self.assertEquals(256, future.result())
+
+ def test_submit_keyword(self):
+ future = self.executor.submit(mul, 2, y=8)
+ self.assertEquals(16, future.result())
def test_map(self):
self.assertEqual(
@@ -496,36 +543,150 @@ class ExecutorTest(unittest.TestCase):
self.assertEqual(i.__next__(), (0, 1))
self.assertRaises(ZeroDivisionError, i.__next__)
+ def test_map_timeout(self):
+ results = []
+ timeout_call = MapCall()
+ try:
+ try:
+ for i in self.executor.map(timeout_call,
+ [False, False, True],
+ timeout=1):
+ results.append(i)
+ except futures.TimeoutError:
+ pass
+ else:
+ self.fail('expected TimeoutError')
+ finally:
+ timeout_call.close()
+
+ self.assertEquals([42, 42], results)
+
class ThreadPoolExecutorTest(ExecutorTest):
def setUp(self):
- self.executor = futures.ThreadPoolExecutor(max_threads=1)
+ self.executor = futures.ThreadPoolExecutor(max_workers=1)
def tearDown(self):
- self.executor.shutdown()
+ self.executor.shutdown(wait=True)
class ProcessPoolExecutorTest(ExecutorTest):
def setUp(self):
- self.executor = futures.ProcessPoolExecutor(max_processes=1)
+ self.executor = futures.ProcessPoolExecutor(max_workers=1)
def tearDown(self):
- self.executor.shutdown()
+ self.executor.shutdown(wait=True)
class FutureTests(unittest.TestCase):
- # Future.index() is tested by ExecutorTest
- # Future.cancel() is further tested by CancelTests.
+ def test_done_callback_with_result(self):
+ callback_result = None
+ def fn(callback_future):
+ nonlocal callback_result
+ callback_result = callback_future.result()
+
+ f = Future()
+ f.add_done_callback(fn)
+ f.set_result(5)
+ self.assertEquals(5, callback_result)
+
+ def test_done_callback_with_exception(self):
+ callback_exception = None
+ def fn(callback_future):
+ nonlocal callback_exception
+ callback_exception = callback_future.exception()
+
+ f = Future()
+ f.add_done_callback(fn)
+ f.set_exception(Exception('test'))
+ self.assertEquals(('test',), callback_exception.args)
+
+ def test_done_callback_with_cancel(self):
+ was_cancelled = None
+ def fn(callback_future):
+ nonlocal was_cancelled
+ was_cancelled = callback_future.cancelled()
+
+ f = Future()
+ f.add_done_callback(fn)
+ self.assertTrue(f.cancel())
+ self.assertTrue(was_cancelled)
+
+ def test_done_callback_raises(self):
+ LOGGER.removeHandler(STDERR_HANDLER)
+ logging_stream = io.StringIO()
+ handler = logging.StreamHandler(logging_stream)
+ LOGGER.addHandler(handler)
+ try:
+ raising_was_called = False
+ fn_was_called = False
+
+ def raising_fn(callback_future):
+ nonlocal raising_was_called
+ raising_was_called = True
+ raise Exception('doh!')
+
+ def fn(callback_future):
+ nonlocal fn_was_called
+ fn_was_called = True
+
+ f = Future()
+ f.add_done_callback(raising_fn)
+ f.add_done_callback(fn)
+ f.set_result(5)
+ self.assertTrue(raising_was_called)
+ self.assertTrue(fn_was_called)
+ self.assertIn('Exception: doh!', logging_stream.getvalue())
+ finally:
+ LOGGER.removeHandler(handler)
+ LOGGER.addHandler(STDERR_HANDLER)
+
+ def test_done_callback_already_successful(self):
+ callback_result = None
+ def fn(callback_future):
+ nonlocal callback_result
+ callback_result = callback_future.result()
+
+ f = Future()
+ f.set_result(5)
+ f.add_done_callback(fn)
+ self.assertEquals(5, callback_result)
+
+ def test_done_callback_already_failed(self):
+ callback_exception = None
+ def fn(callback_future):
+ nonlocal callback_exception
+ callback_exception = callback_future.exception()
+
+ f = Future()
+ f.set_exception(Exception('test'))
+ f.add_done_callback(fn)
+ self.assertEquals(('test',), callback_exception.args)
+
+ def test_done_callback_already_cancelled(self):
+ was_cancelled = None
+ def fn(callback_future):
+ nonlocal was_cancelled
+ was_cancelled = callback_future.cancelled()
+
+ f = Future()
+ self.assertTrue(f.cancel())
+ f.add_done_callback(fn)
+ self.assertTrue(was_cancelled)
def test_repr(self):
- self.assertEqual(repr(PENDING_FUTURE), '<Future state=pending>')
- self.assertEqual(repr(RUNNING_FUTURE), '<Future state=running>')
- self.assertEqual(repr(CANCELLED_FUTURE), '<Future state=cancelled>')
- self.assertEqual(repr(CANCELLED_AND_NOTIFIED_FUTURE),
- '<Future state=cancelled>')
- self.assertEqual(repr(EXCEPTION_FUTURE),
- '<Future state=finished raised IOError>')
- self.assertEqual(repr(SUCCESSFUL_FUTURE),
- '<Future state=finished returned int>')
-
- create_future
+ self.assertRegexpMatches(repr(PENDING_FUTURE),
+ '<Future at 0x[0-9a-f]+ state=pending>')
+ self.assertRegexpMatches(repr(RUNNING_FUTURE),
+ '<Future at 0x[0-9a-f]+ state=running>')
+ self.assertRegexpMatches(repr(CANCELLED_FUTURE),
+ '<Future at 0x[0-9a-f]+ state=cancelled>')
+ self.assertRegexpMatches(repr(CANCELLED_AND_NOTIFIED_FUTURE),
+ '<Future at 0x[0-9a-f]+ state=cancelled>')
+ self.assertRegexpMatches(
+ repr(EXCEPTION_FUTURE),
+ '<Future at 0x[0-9a-f]+ state=finished raised IOError>')
+ self.assertRegexpMatches(
+ repr(SUCCESSFUL_FUTURE),
+ '<Future at 0x[0-9a-f]+ state=finished returned int>')
+
def test_cancel(self):
f1 = create_future(state=PENDING)
@@ -590,13 +751,11 @@ class FutureTests(unittest.TestCase):
self.assertEqual(SUCCESSFUL_FUTURE.result(timeout=0), 42)
def test_result_with_success(self):
+ # TODO(brian@sweetapp.com): This test is timing dependant.
def notification():
# Wait until the main thread is waiting for the result.
time.sleep(1)
- with f1._condition:
- f1._state = FINISHED
- f1._result = 42
- f1._condition.notify_all()
+ f1.set_result(42)
f1 = create_future(state=PENDING)
t = threading.Thread(target=notification)
@@ -605,12 +764,11 @@ class FutureTests(unittest.TestCase):
self.assertEquals(f1.result(timeout=5), 42)
def test_result_with_cancel(self):
+ # TODO(brian@sweetapp.com): This test is timing dependant.
def notification():
# Wait until the main thread is waiting for the result.
time.sleep(1)
- with f1._condition:
- f1._state = CANCELLED
- f1._condition.notify_all()
+ f1.cancel()
f1 = create_future(state=PENDING)
t = threading.Thread(target=notification)
@@ -646,186 +804,16 @@ class FutureTests(unittest.TestCase):
self.assertTrue(isinstance(f1.exception(timeout=5), IOError))
-class FutureListTests(unittest.TestCase):
- # FutureList.wait() is further tested by WaitsTest.
- # FutureList.cancel() is tested by CancelTests.
- def test_wait_RETURN_IMMEDIATELY(self):
- f = futures.FutureList(futures=None, event_sink=None)
- f.wait(return_when=futures.RETURN_IMMEDIATELY)
-
- def test_wait_timeout(self):
- f = futures.FutureList([PENDING_FUTURE],
- futures._base.ThreadEventSink())
-
- for t in [futures.FIRST_COMPLETED,
- futures.FIRST_EXCEPTION,
- futures.ALL_COMPLETED]:
- f.wait(timeout=0.1, return_when=t)
- self.assertFalse(PENDING_FUTURE.done())
-
- def test_wait_all_done(self):
- f = futures.FutureList([CANCELLED_FUTURE,
- CANCELLED_AND_NOTIFIED_FUTURE,
- SUCCESSFUL_FUTURE,
- EXCEPTION_FUTURE],
- futures._base.ThreadEventSink())
-
- f.wait(return_when=futures.ALL_COMPLETED)
-
- def test_filters(self):
- fs = [PENDING_FUTURE,
- RUNNING_FUTURE,
- CANCELLED_FUTURE,
- CANCELLED_AND_NOTIFIED_FUTURE,
- EXCEPTION_FUTURE,
- SUCCESSFUL_FUTURE]
- f = futures.FutureList(fs, None)
-
- self.assertEqual(list(f.running_futures()), [RUNNING_FUTURE])
- self.assertEqual(list(f.cancelled_futures()),
- [CANCELLED_FUTURE,
- CANCELLED_AND_NOTIFIED_FUTURE])
- self.assertEqual(list(f.done_futures()),
- [CANCELLED_FUTURE,
- CANCELLED_AND_NOTIFIED_FUTURE,
- EXCEPTION_FUTURE,
- SUCCESSFUL_FUTURE])
- self.assertEqual(list(f.successful_futures()),
- [SUCCESSFUL_FUTURE])
- self.assertEqual(list(f.exception_futures()),
- [EXCEPTION_FUTURE])
-
- def test_has_running_futures(self):
- self.assertFalse(
- futures.FutureList([PENDING_FUTURE,
- CANCELLED_FUTURE,
- CANCELLED_AND_NOTIFIED_FUTURE,
- SUCCESSFUL_FUTURE,
- EXCEPTION_FUTURE],
- None).has_running_futures())
- self.assertTrue(
- futures.FutureList([RUNNING_FUTURE],
- None).has_running_futures())
-
- def test_has_cancelled_futures(self):
- self.assertFalse(
- futures.FutureList([PENDING_FUTURE,
- RUNNING_FUTURE,
- SUCCESSFUL_FUTURE,
- EXCEPTION_FUTURE],
- None).has_cancelled_futures())
- self.assertTrue(
- futures.FutureList([CANCELLED_FUTURE],
- None).has_cancelled_futures())
-
- self.assertTrue(
- futures.FutureList([CANCELLED_AND_NOTIFIED_FUTURE],
- None).has_cancelled_futures())
-
- def test_has_done_futures(self):
- self.assertFalse(
- futures.FutureList([PENDING_FUTURE,
- RUNNING_FUTURE],
- None).has_done_futures())
- self.assertTrue(
- futures.FutureList([CANCELLED_FUTURE],
- None).has_done_futures())
-
- self.assertTrue(
- futures.FutureList([CANCELLED_AND_NOTIFIED_FUTURE],
- None).has_done_futures())
-
- self.assertTrue(
- futures.FutureList([EXCEPTION_FUTURE],
- None).has_done_futures())
-
- self.assertTrue(
- futures.FutureList([SUCCESSFUL_FUTURE],
- None).has_done_futures())
-
- def test_has_successful_futures(self):
- self.assertFalse(
- futures.FutureList([PENDING_FUTURE,
- RUNNING_FUTURE,
- CANCELLED_FUTURE,
- CANCELLED_AND_NOTIFIED_FUTURE,
- EXCEPTION_FUTURE],
- None).has_successful_futures())
-
- self.assertTrue(
- futures.FutureList([SUCCESSFUL_FUTURE],
- None).has_successful_futures())
-
- def test_has_exception_futures(self):
- self.assertFalse(
- futures.FutureList([PENDING_FUTURE,
- RUNNING_FUTURE,
- CANCELLED_FUTURE,
- CANCELLED_AND_NOTIFIED_FUTURE,
- SUCCESSFUL_FUTURE],
- None).has_exception_futures())
-
- self.assertTrue(
- futures.FutureList([EXCEPTION_FUTURE],
- None).has_exception_futures())
-
- def test_get_item(self):
- fs = [PENDING_FUTURE, RUNNING_FUTURE, CANCELLED_FUTURE]
- f = futures.FutureList(fs, None)
- self.assertEqual(f[0], PENDING_FUTURE)
- self.assertEqual(f[1], RUNNING_FUTURE)
- self.assertEqual(f[2], CANCELLED_FUTURE)
- self.assertRaises(IndexError, f.__getitem__, 3)
-
- def test_len(self):
- f = futures.FutureList([PENDING_FUTURE,
- RUNNING_FUTURE,
- CANCELLED_FUTURE],
- None)
- self.assertEqual(len(f), 3)
-
- def test_iter(self):
- fs = [PENDING_FUTURE, RUNNING_FUTURE, CANCELLED_FUTURE]
- f = futures.FutureList(fs, None)
- self.assertEqual(list(iter(f)), fs)
-
- def test_contains(self):
- f = futures.FutureList([PENDING_FUTURE,
- RUNNING_FUTURE],
- None)
- self.assertTrue(PENDING_FUTURE in f)
- self.assertTrue(RUNNING_FUTURE in f)
- self.assertFalse(CANCELLED_FUTURE in f)
-
- def test_repr(self):
- pending = create_future(state=PENDING)
- cancelled = create_future(state=CANCELLED)
- cancelled2 = create_future(state=CANCELLED_AND_NOTIFIED)
- running = create_future(state=RUNNING)
- finished = create_future(state=FINISHED)
-
- f = futures.FutureList(
- [PENDING_FUTURE] * 4 + [CANCELLED_FUTURE] * 2 +
- [CANCELLED_AND_NOTIFIED_FUTURE] +
- [RUNNING_FUTURE] * 2 +
- [SUCCESSFUL_FUTURE, EXCEPTION_FUTURE] * 3,
- None)
-
- self.assertEqual(repr(f),
- '<FutureList #futures=15 '
- '[#pending=4 #cancelled=3 #running=2 #finished=6]>')
-
def test_main():
- test.support.run_unittest(ProcessPoolCancelTests,
- ThreadPoolCancelTests,
- ProcessPoolExecutorTest,
+ test.support.run_unittest(ProcessPoolExecutorTest,
ThreadPoolExecutorTest,
ProcessPoolWaitTests,
ThreadPoolWaitTests,
+ ProcessPoolAsCompletedTests,
+ ThreadPoolAsCompletedTests,
FutureTests,
- FutureListTests,
ProcessPoolShutdownTest,
ThreadPoolShutdownTest)
if __name__ == "__main__":
- test_main() \ No newline at end of file
+ test_main()