From e63029bf8fa0b37f42add3d6914509dd79c81cb6 Mon Sep 17 00:00:00 2001 From: "brian.quinlan" Date: Wed, 20 May 2009 18:32:26 +0000 Subject: Better tests and stuff --- crawl.py | 3 +- futures/_base.py | 45 +++--- futures/process.py | 11 +- futures/thread.py | 4 +- test_futures.py | 420 +++++++++++++++++++++++++++++++++++++++-------------- 5 files changed, 341 insertions(+), 142 deletions(-) diff --git a/crawl.py b/crawl.py index bc01946..10e35c3 100644 --- a/crawl.py +++ b/crawl.py @@ -31,7 +31,8 @@ def download_urls_with_executor(urls, executor, timeout=60): fs = executor.run_to_futures( (functools.partial(load_url, url, timeout) for url in urls), timeout=timeout) - for url, future in zip(urls, fs.successful_futures()): + for future in fs.successful_futures(): + url = urls[future.index] url_to_content[url] = future.result() return url_to_content finally: diff --git a/futures/_base.py b/futures/_base.py index 01dafd3..19cabe8 100644 --- a/futures/_base.py +++ b/futures/_base.py @@ -64,12 +64,13 @@ class TimeoutError(Error): class Future(object): """Represents the result of an asynchronous computation.""" - def __init__(self): - """Initializes the Future. Should not be called by clients.""" + def __init__(self, index): + """Initializes the future. Should not be called by clients.""" self._condition = threading.Condition() self._state = PENDING self._result = None self._exception = None + self._index = index def __repr__(self): with self._condition: @@ -84,6 +85,11 @@ class Future(object): self._result.__class__.__name__) return '' % _STATE_TO_DESCRIPTION_MAP[self._state] + @property + def index(self): + """The index of the future in its FutureList.""" + return self._index + def cancel(self): """Cancel the future if possible. @@ -104,6 +110,10 @@ class Future(object): with self._condition: return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED] + def running(self): + with self._condition: + return self._state == RUNNING + def done(self): """Return True of the future was cancelled or finished executing.""" with self._condition: @@ -334,9 +344,6 @@ class FutureList(object): def has_exception_futures(self): return any(self.exception_futures()) - def has_running_futures(self): - return any(self.running_futures()) - def cancelled_futures(self): return (f for f in self if f._state in [CANCELLED, CANCELLED_AND_NOTIFIED]) @@ -356,17 +363,17 @@ class FutureList(object): def running_futures(self): return (f for f in self if f._state == RUNNING) - def __getitem__(self, i): - return self._futures[i] - def __len__(self): return len(self._futures) + def __getitem__(self, i): + return self._futures[i] + def __iter__(self): return iter(self._futures) - def __contains__(self, f): - return f in self._futures + def __contains__(self, future): + return future in self._futures def __repr__(self): states = {state: 0 for state in FUTURE_STATES} @@ -399,10 +406,7 @@ class Executor(object): exception. If no future raises and exception then it is equivalent to ALL_COMPLETED. ALL_COMPLETED - Return when all futures finish or are cancelled. - RETURN_IMMEDIATELY - Return without waiting (this is not likely - to be a useful option but it is there to - be symmetrical with the - executor.run_to_futures() method. + RETURN_IMMEDIATELY - Return without waiting. Returns: A FuturesList containing futures for the given calls. @@ -445,24 +449,25 @@ class Executor(object): except TimeoutError: pass - def map(self, fn, iter, timeout=None): + def map(self, func, *iterables, timeout=None): """Returns a iterator equivalent to map(fn, iter). Args: - fn: A callable taking a single argument. + func: A callable that will take take as many arguments as there + are passed iterables. timeout: The maximum number of seconds to wait. If None, then there is no limit on the wait time. Returns: - An iterator equivalent to: map(fn, iter) but the calls may be - evaluated out-of-order. + An iterator equivalent to: map(func, *iterables) but the calls may + be evaluated out-of-order. Raises: TimeoutError: If the entire result iterator could not be generated before the given timeout. - Exception: If fn(x) raises for any x in iter. + Exception: If fn(*args) raises for any values. """ - calls = [functools.partial(fn, a) for a in iter] + calls = [functools.partial(func, *args) for args in zip(*iterables)] return self.run_to_results(calls, timeout) def shutdown(self): diff --git a/futures/process.py b/futures/process.py index c4dad74..ad6dc6d 100644 --- a/futures/process.py +++ b/futures/process.py @@ -47,11 +47,8 @@ def _process_worker(call_queue, result_queue, shutdown): class ProcessPoolExecutor(Executor): def __init__(self, max_processes=None): if max_processes is None: - try: - max_processes = multiprocessing.cpu_count() - except NotImplementedError: - max_processes = 16 - + max_processes = multiprocessing.cpu_count() + self._max_processes = max_processes # Make the call queue slightly larger than the number of processes to # prevent the worker processes from starving but to make future.cancel() @@ -140,8 +137,8 @@ class ProcessPoolExecutor(Executor): futures = [] event_sink = ThreadEventSink() self._queue_count - for call in calls: - f = Future() + for index, call in enumerate(calls): + f = Future(index) self._pending_work_items[self._queue_count] = _WorkItem( call, f, event_sink) self._work_ids.put(self._queue_count) diff --git a/futures/thread.py b/futures/thread.py index c1f15ae..9e3275f 100644 --- a/futures/thread.py +++ b/futures/thread.py @@ -73,8 +73,8 @@ class ThreadPoolExecutor(Executor): futures = [] event_sink = ThreadEventSink() - for call in calls: - f = Future() + for index, call in enumerate(calls): + f = Future(index) w = _WorkItem(call, f, event_sink) self._work_queue.put(w) futures.append(f) diff --git a/test_futures.py b/test_futures.py index 64e541b..0572f81 100644 --- a/test_futures.py +++ b/test_futures.py @@ -12,7 +12,7 @@ from futures._base import ( PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future) def create_future(state=PENDING, exception=None, result=None): - f = Future() + f = Future(0) f._state = state f._exception = exception f._result = result @@ -27,10 +27,11 @@ SUCCESSFUL_FUTURE = create_future(state=FINISHED, result=42) class Call(object): CALL_LOCKS = {} - def __init__(self, manual_finish=False): + def __init__(self, manual_finish=False, result=42): called_event = multiprocessing.Event() can_finish = multiprocessing.Event() + self._result = result self._called_event_id = id(called_event) self._can_finish_event_id = id(can_finish) @@ -62,9 +63,9 @@ class Call(object): self._called_event.set() self._can_finish.wait() - return 42 + return self._result - def __del__(self): + def close(self): del self.CALL_LOCKS[self._called_event_id] del self.CALL_LOCKS[self._can_finish_event_id] @@ -76,31 +77,39 @@ class ExceptionCall(Call): self._can_finish.wait() raise ZeroDivisionError() -class ShutdownTest(unittest.TestCase): +class ExecutorShutdownTest(unittest.TestCase): def test_run_after_shutdown(self): call1 = Call() - self.executor.shutdown() - self.assertRaises(RuntimeError, - self.executor.run_to_futures, - [call1]) + try: + self.executor.shutdown() + self.assertRaises(RuntimeError, + self.executor.run_to_futures, + [call1]) + finally: + call1.close() def _start_some_futures(self): call1 = Call(manual_finish=True) call2 = Call(manual_finish=True) call3 = Call(manual_finish=True) - self.executor.run_to_futures([call1, call2, call3], - return_when=futures.RETURN_IMMEDIATELY) - - call1.wait_on_called() - call2.wait_on_called() - call3.wait_on_called() - - call1.set_can() - call2.set_can() - call3.set_can() - -class ThreadPoolShutdownTest(ShutdownTest): + try: + self.executor.run_to_futures([call1, call2, call3], + return_when=futures.RETURN_IMMEDIATELY) + + call1.wait_on_called() + call2.wait_on_called() + call3.wait_on_called() + + call1.set_can() + call2.set_can() + call3.set_can() + finally: + call1.close() + call2.close() + call3.close() + +class ThreadPoolShutdownTest(ExecutorShutdownTest): def setUp(self): self.executor = futures.ThreadPoolExecutor(max_threads=5) @@ -114,7 +123,16 @@ class ThreadPoolShutdownTest(ShutdownTest): for t in self.executor._threads: t.join() -class ProcessPoolShutdownTest(ShutdownTest): + def test_context_manager_shutdown(self): + with futures.ThreadPoolExecutor(max_threads=5) as e: + executor = e + self.assertEqual(list(e.map(abs, range(-5, 5))), + [5, 4, 3, 2, 1, 0, 1, 2, 3, 4]) + + for t in executor._threads: + t.join() + +class ProcessPoolShutdownTest(ExecutorShutdownTest): def setUp(self): self.executor = futures.ProcessPoolExecutor(max_processes=5) @@ -128,8 +146,17 @@ class ProcessPoolShutdownTest(ShutdownTest): for p in self.executor._processes: p.join() + def test_context_manager_shutdown(self): + with futures.ProcessPoolExecutor(max_processes=5) as e: + executor = e + self.assertEqual(list(e.map(abs, range(-5, 5))), + [5, 4, 3, 2, 1, 0, 1, 2, 3, 4]) + + for p in self.executor._processes: + p.join() + class WaitsTest(unittest.TestCase): - def test(self): + def test_concurrent_waits(self): def wait_for_ALL_COMPLETED(): fs.wait(return_when=futures.ALL_COMPLETED) self.assertTrue(f1.done()) @@ -163,29 +190,35 @@ class WaitsTest(unittest.TestCase): call3 = Call(manual_finish=True) call4 = Call() - fs = self.executor.run_to_futures( - [call1, call2, call3, call4], - return_when=futures.RETURN_IMMEDIATELY) - f1, f2, f3, f4 = fs - - threads = [] - for wait_test in [wait_for_ALL_COMPLETED, - wait_for_FIRST_COMPLETED, - wait_for_FIRST_EXCEPTION]: - t = threading.Thread(target=wait_test) - t.start() - threads.append(t) - - time.sleep(1) # give threads enough time to execute wait - - call1.set_can() - first_completed.acquire() - call2.set_can() - first_exception.acquire() - call3.set_can() - all_completed.acquire() - - self.executor.shutdown() + try: + fs = self.executor.run_to_futures( + [call1, call2, call3, call4], + return_when=futures.RETURN_IMMEDIATELY) + f1, f2, f3, f4 = fs + + threads = [] + for wait_test in [wait_for_ALL_COMPLETED, + wait_for_FIRST_COMPLETED, + wait_for_FIRST_EXCEPTION]: + t = threading.Thread(target=wait_test) + t.start() + threads.append(t) + + time.sleep(1) # give threads enough time to execute wait + + call1.set_can() + first_completed.acquire() + call2.set_can() + first_exception.acquire() + call3.set_can() + all_completed.acquire() + + self.executor.shutdown() + finally: + call1.close() + call2.close() + call3.close() + call4.close() class ThreadPoolWaitTests(WaitsTest): def setUp(self): @@ -208,35 +241,41 @@ class CancelTests(unittest.TestCase): call3 = Call() call4 = Call() - fs = self.executor.run_to_futures( - [call1, call2, call3, call4], - return_when=futures.RETURN_IMMEDIATELY) - f1, f2, f3, f4 = fs - - call1.wait_on_called() - self.assertEqual(f1.cancel(), False) - self.assertEqual(f2.cancel(), True) - self.assertEqual(f4.cancel(), True) - self.assertEqual(f1.cancelled(), False) - self.assertEqual(f2.cancelled(), True) - self.assertEqual(f3.cancelled(), False) - self.assertEqual(f4.cancelled(), True) - self.assertEqual(f1.done(), False) - self.assertEqual(f2.done(), True) - self.assertEqual(f3.done(), False) - self.assertEqual(f4.done(), True) - - call1.set_can() - fs.wait(return_when=futures.ALL_COMPLETED) - self.assertEqual(f1.result(), 42) - self.assertRaises(futures.CancelledError, f2.result) - self.assertRaises(futures.CancelledError, f2.exception) - self.assertEqual(f3.result(), 42) - self.assertRaises(futures.CancelledError, f4.result) - self.assertRaises(futures.CancelledError, f4.exception) - - self.assertEqual(call2.called(), False) - self.assertEqual(call4.called(), False) + try: + fs = self.executor.run_to_futures( + [call1, call2, call3, call4], + return_when=futures.RETURN_IMMEDIATELY) + f1, f2, f3, f4 = fs + + call1.wait_on_called() + self.assertEqual(f1.cancel(), False) + self.assertEqual(f2.cancel(), True) + self.assertEqual(f4.cancel(), True) + self.assertEqual(f1.cancelled(), False) + self.assertEqual(f2.cancelled(), True) + self.assertEqual(f3.cancelled(), False) + self.assertEqual(f4.cancelled(), True) + self.assertEqual(f1.done(), False) + self.assertEqual(f2.done(), True) + self.assertEqual(f3.done(), False) + self.assertEqual(f4.done(), True) + + call1.set_can() + fs.wait(return_when=futures.ALL_COMPLETED) + self.assertEqual(f1.result(), 42) + self.assertRaises(futures.CancelledError, f2.result) + self.assertRaises(futures.CancelledError, f2.exception) + self.assertEqual(f3.result(), 42) + self.assertRaises(futures.CancelledError, f4.result) + self.assertRaises(futures.CancelledError, f4.exception) + + self.assertEqual(call2.called(), False) + self.assertEqual(call4.called(), False) + finally: + call1.close() + call2.close() + call3.close() + call4.close() def test_wait_for_individual_cancel(self): def end_call(): @@ -247,17 +286,21 @@ class CancelTests(unittest.TestCase): call1 = Call(manual_finish=True) call2 = Call() - fs = self.executor.run_to_futures( - [call1, call2], - return_when=futures.RETURN_IMMEDIATELY) - f1, f2 = fs - - call1.wait_on_called() - t = threading.Thread(target=end_call) - t.start() - self.assertRaises(futures.CancelledError, f2.result) - self.assertRaises(futures.CancelledError, f2.exception) - t.join() + try: + fs = self.executor.run_to_futures( + [call1, call2], + return_when=futures.RETURN_IMMEDIATELY) + f1, f2 = fs + + call1.wait_on_called() + t = threading.Thread(target=end_call) + t.start() + self.assertRaises(futures.CancelledError, f2.result) + self.assertRaises(futures.CancelledError, f2.exception) + t.join() + finally: + call1.close() + call2.close() def test_wait_with_already_cancelled_futures(self): call1 = Call(manual_finish=True) @@ -265,18 +308,24 @@ class CancelTests(unittest.TestCase): call3 = Call() call4 = Call() - fs = self.executor.run_to_futures( - [call1, call2, call3, call4], - return_when=futures.RETURN_IMMEDIATELY) - f1, f2, f3, f4 = fs - - call1.wait_on_called() - self.assertTrue(f2.cancel()) - self.assertTrue(f3.cancel()) - call1.set_can() - time.sleep(0.1) - - fs.wait(return_when=futures.ALL_COMPLETED) + try: + fs = self.executor.run_to_futures( + [call1, call2, call3, call4], + return_when=futures.RETURN_IMMEDIATELY) + f1, f2, f3, f4 = fs + + call1.wait_on_called() + self.assertTrue(f2.cancel()) + self.assertTrue(f3.cancel()) + call1.set_can() + time.sleep(0.1) + + fs.wait(return_when=futures.ALL_COMPLETED) + finally: + call1.close() + call2.close() + call3.close() + call4.close() def test_cancel_all(self): call1 = Call(manual_finish=True) @@ -284,20 +333,26 @@ class CancelTests(unittest.TestCase): call3 = Call() call4 = Call() - fs = self.executor.run_to_futures( - [call1, call2, call3, call4], - return_when=futures.RETURN_IMMEDIATELY) - f1, f2, f3, f4 = fs - - call1.wait_on_called() - self.assertRaises(futures.TimeoutError, fs.cancel, timeout=0) - call1.set_can() - fs.cancel() - - self.assertFalse(f1.cancelled()) - self.assertTrue(f2.cancelled()) - self.assertTrue(f3.cancelled()) - self.assertTrue(f4.cancelled()) + try: + fs = self.executor.run_to_futures( + [call1, call2, call3, call4], + return_when=futures.RETURN_IMMEDIATELY) + f1, f2, f3, f4 = fs + + call1.wait_on_called() + self.assertRaises(futures.TimeoutError, fs.cancel, timeout=0) + call1.set_can() + fs.cancel() + + self.assertFalse(f1.cancelled()) + self.assertTrue(f2.cancelled()) + self.assertTrue(f3.cancelled()) + self.assertTrue(f4.cancelled()) + finally: + call1.close() + call2.close() + call3.close() + call4.close() class ThreadPoolCancelTests(CancelTests): def setUp(self): @@ -313,7 +368,122 @@ class ProcessPoolCancelTests(WaitsTest): def tearDown(self): self.executor.shutdown() +class ExecutorTest(unittest.TestCase): + # Executor.shutdown() and context manager usage is tested by + # ExecutorShutdownTest. + def test_run_to_futures(self): + call1 = Call(result=1) + call2 = Call(result=2) + call3 = Call(manual_finish=True) + call4 = Call() + call5 = Call() + + try: + f1, f2, f3, f4, f5 = self.executor.run_to_futures( + [call1, call2, call3, call4, call5], + return_when=futures.RETURN_IMMEDIATELY) + + call3.wait_on_called() + + self.assertTrue(f1.done()) + self.assertFalse(f1.running()) + self.assertEqual(f1.index, 0) + + self.assertTrue(f2.done()) + self.assertFalse(f2.running()) + self.assertEqual(f2.index, 1) + + self.assertFalse(f3.done()) + self.assertTrue(f3.running()) + self.assertEqual(f3.index, 2) + + # ProcessPoolExecutor may mark some futures as running before they + # actually are. + self.assertFalse(f4.done()) + self.assertEqual(f4.index, 3) + + self.assertFalse(f5.done()) + self.assertEqual(f5.index, 4) + finally: + call1.close() + call2.close() + call3.close() + call4.close() + call5.close() + + def test_run_to_results(self): + call1 = Call(result=1) + call2 = Call(result=2) + call3 = Call(result=3) + try: + self.assertEqual( + list(self.executor.run_to_results([call1, call2, call3])), + [1, 2, 3]) + finally: + call1.close() + call2.close() + call3.close() + + def test_run_to_results_exception(self): + call1 = Call(result=1) + call2 = Call(result=2) + call3 = ExceptionCall() + try: + i = self.executor.run_to_results([call1, call2, call3]) + + self.assertEqual(i.__next__(), 1) + self.assertEqual(i.__next__(), 2) + self.assertRaises(ZeroDivisionError, i.__next__) + finally: + call1.close() + call2.close() + call3.close() + + def test_run_to_results_timeout(self): + call1 = Call(result=1) + call2 = Call(result=2) + call3 = Call(manual_finish=True) + + try: + i = self.executor.run_to_results([call1, call2, call3], timeout=1) + self.assertEqual(i.__next__(), 1) + self.assertEqual(i.__next__(), 2) + self.assertRaises(futures.TimeoutError, i.__next__) + call3.set_can() + finally: + call1.close() + call2.close() + call3.close() + + def test_map(self): + self.assertEqual( + list(self.executor.map(pow, range(10), range(10))), + list(map(pow, range(10), range(10)))) + + def test_map_exception(self): + i = self.executor.map(divmod, [1, 1, 1, 1], [2, 3, 0, 5]) + self.assertEqual(i.__next__(), (0, 1)) + self.assertEqual(i.__next__(), (0, 1)) + self.assertRaises(ZeroDivisionError, i.__next__) + +class ThreadPoolExecutorTest(ExecutorTest): + def setUp(self): + self.executor = futures.ThreadPoolExecutor(max_threads=1) + + def tearDown(self): + self.executor.shutdown() + +class ProcessPoolExecutorTest(ExecutorTest): + def setUp(self): + self.executor = futures.ProcessPoolExecutor(max_processes=1) + + def tearDown(self): + self.executor.shutdown() + class FutureTests(unittest.TestCase): + # Future.index() is tested by ExecutorTest + # Future.cancel() is further tested by CancelTests. + def test_repr(self): self.assertEqual(repr(PENDING_FUTURE), '') self.assertEqual(repr(RUNNING_FUTURE), '') @@ -369,6 +539,14 @@ class FutureTests(unittest.TestCase): self.assertTrue(EXCEPTION_FUTURE.done()) self.assertTrue(SUCCESSFUL_FUTURE.done()) + def test_running(self): + self.assertFalse(PENDING_FUTURE.running()) + self.assertTrue(RUNNING_FUTURE.running()) + self.assertFalse(CANCELLED_FUTURE.running()) + self.assertFalse(CANCELLED_AND_NOTIFIED_FUTURE.running()) + self.assertFalse(EXCEPTION_FUTURE.running()) + self.assertFalse(SUCCESSFUL_FUTURE.running()) + def test_result_with_timeout(self): self.assertRaises(futures.TimeoutError, PENDING_FUTURE.result, timeout=0) @@ -421,7 +599,23 @@ class FutureTests(unittest.TestCase): IOError)) self.assertEqual(SUCCESSFUL_FUTURE.exception(timeout=0), None) + def test_exception_with_success(self): + def notification(): + time.sleep(0.1) + with f1._condition: + f1._state = FINISHED + f1._exception = IOError() + f1._condition.notify_all() + + f1 = create_future(state=PENDING) + t = threading.Thread(target=notification) + t.start() + + self.assertTrue(isinstance(f1.exception(timeout=1), IOError)) + class FutureListTests(unittest.TestCase): + # FutureList.wait() is further tested by WaitsTest. + # FutureList.cancel() is tested by CancelTests. def test_wait_RETURN_IMMEDIATELY(self): f = futures.FutureList(futures=None, event_sink=None) f.wait(return_when=futures.RETURN_IMMEDIATELY) @@ -591,6 +785,8 @@ class FutureListTests(unittest.TestCase): def test_main(): test.support.run_unittest(ProcessPoolCancelTests, ThreadPoolCancelTests, + ProcessPoolExecutorTest, + ThreadPoolExecutorTest, ProcessPoolWaitTests, ThreadPoolWaitTests, FutureTests, -- cgit v1.2.1