From b32b0bfba4f6c8cd4f0b71cf7cfbad979c6c2695 Mon Sep 17 00:00:00 2001 From: "brian.quinlan" Date: Sun, 10 May 2009 11:37:27 +0000 Subject: Same names change and doc additions. --- crawl.py | 4 +- futures/_base.py | 156 ++++++++++++++++++++++++++++++++++++++++++++++------ futures/process.py | 27 +++++---- futures/thread.py | 15 +++-- primes.py | 19 +++---- test_futures.py | 159 +++++++++++++++++++++++++++++++++-------------------- 6 files changed, 269 insertions(+), 111 deletions(-) diff --git a/crawl.py b/crawl.py index 80d56e0..bc01946 100644 --- a/crawl.py +++ b/crawl.py @@ -28,7 +28,7 @@ def download_urls_sequential(urls, timeout=60): def download_urls_with_executor(urls, executor, timeout=60): try: url_to_content = {} - fs = executor.run( + fs = executor.run_to_futures( (functools.partial(load_url, url, timeout) for url in urls), timeout=timeout) for url, future in zip(urls, fs.successful_futures()): @@ -37,8 +37,6 @@ def download_urls_with_executor(urls, executor, timeout=60): finally: executor.shutdown() -import functools -import time def main(): for name, fn in [('sequential', functools.partial(download_urls_sequential, URLS)), diff --git a/futures/_base.py b/futures/_base.py index 1debfbb..01dafd3 100644 --- a/futures/_base.py +++ b/futures/_base.py @@ -52,14 +52,20 @@ def set_future_result(future, event_sink, result): event_sink.add_result() future._condition.notify_all() -class CancelledError(Exception): +class Error(Exception): pass -class TimeoutError(Exception): +class CancelledError(Error): + pass + +class TimeoutError(Error): pass class Future(object): + """Represents the result of an asynchronous computation.""" + def __init__(self): + """Initializes the Future. Should not be called by clients.""" self._condition = threading.Condition() self._state = PENDING self._result = None @@ -79,6 +85,11 @@ class Future(object): return '' % _STATE_TO_DESCRIPTION_MAP[self._state] def cancel(self): + """Cancel the future if possible. + + Returns True if the future was cancelled, False otherwise. A future + cannot be cancelled if it is running or has already completed. + """ with self._condition: if self._state in [RUNNING, FINISHED]: return False @@ -89,10 +100,12 @@ class Future(object): return True def cancelled(self): + """Return True if the future has cancelled.""" with self._condition: return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED] def done(self): + """Return True of the future was cancelled or finished executing.""" with self._condition: return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED] @@ -103,6 +116,21 @@ class Future(object): return self._result def result(self, timeout=None): + """Return the result of the call that the future represents. + + Args: + timeout: The number of seconds to wait for the result if the future + isn't done. If None, then there is no limit on the wait time. + + Returns: + The result of the call that the future represents. + + Raises: + CancelledError: If the future was cancelled. + TimeoutError: If the future didn't finish executing before the given + timeout. + Exception: If the call raised then that exception will be raised. + """ with self._condition: if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: raise CancelledError() @@ -119,6 +147,23 @@ class Future(object): raise TimeoutError() def exception(self, timeout=None): + """Return the exception raised by the call that the future represents. + + Args: + timeout: The number of seconds to wait for the exception if the + future isn't done. If None, then there is no limit on the wait + time. + + Returns: + The exception raised by the call that the future represents or None + if the call completed without raising. + + Raises: + CancelledError: If the future was cancelled. + TimeoutError: If the future didn't finish executing before the given + timeout. + """ + with self._condition: if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: raise CancelledError() @@ -192,10 +237,34 @@ class ThreadEventSink(object): class FutureList(object): def __init__(self, futures, event_sink): + """Initializes the FutureList. Should not be called by clients.""" self._futures = futures self._event_sink = event_sink def wait(self, timeout=None, return_when=ALL_COMPLETED): + """Wait for the futures in the list to complete. + + Args: + timeout: The maximum number of seconds to wait. If None, then there + is no limit on the wait time. + return_when: Indicates when the method should return. The options + are: + + FIRST_COMPLETED - Return when any future finishes or is + cancelled. + FIRST_EXCEPTION - Return when any future finishes by raising an + exception. If no future raises and exception + then it is equivalent to ALL_COMPLETED. + ALL_COMPLETED - Return when all futures finish or are cancelled. + RETURN_IMMEDIATELY - Return without waiting (this is not likely + to be a useful option but it is there to + be symmetrical with the + executor.run_to_futures() method. + + Raises: + TimeoutError: If the wait condition wasn't satisfied before the + given timeout. + """ if return_when == RETURN_IMMEDIATELY: return @@ -234,6 +303,16 @@ class FutureList(object): self._event_sink.remove(completed_tracker) def cancel(self, timeout=None): + """Cancel the futures in the list. + + Args: + timeout: The maximum number of seconds to wait. If None, then there + is no limit on the wait time. + + Raises: + TimeoutError: If all the futures were not finished before the + given timeout. + """ for f in self: f.cancel() self.wait(timeout=timeout, return_when=ALL_COMPLETED) @@ -303,28 +382,56 @@ class FutureList(object): states[FINISHED])) class Executor(object): - def run(self, calls, timeout=None, return_when=ALL_COMPLETED): + def run_to_futures(self, calls, timeout=None, return_when=ALL_COMPLETED): + """Return a list of futures representing the given calls. + + Args: + calls: A sequence of callables that take no arguments. These will + be bound to Futures and returned. + timeout: The maximum number of seconds to wait. If None, then there + is no limit on the wait time. + return_when: Indicates when the method should return. The options + are: + + FIRST_COMPLETED - Return when any future finishes or is + cancelled. + FIRST_EXCEPTION - Return when any future finishes by raising an + exception. If no future raises and exception + then it is equivalent to ALL_COMPLETED. + ALL_COMPLETED - Return when all futures finish or are cancelled. + RETURN_IMMEDIATELY - Return without waiting (this is not likely + to be a useful option but it is there to + be symmetrical with the + executor.run_to_futures() method. + + Returns: + A FuturesList containing futures for the given calls. + """ raise NotImplementedError() - def runXXX(self, calls, timeout=None): - """Execute the given calls and + def run_to_results(self, calls, timeout=None): + """Returns a iterator of the results of the given calls. - Arguments: - calls: A sequence of functions that will be called without arguments - and whose results with be returned. - timeout: The maximum number of seconds to wait for the complete results. - None indicates that there is no timeout. + Args: + calls: A sequence of callables that take no arguments. These will + be called and their results returned. + timeout: The maximum number of seconds to wait. If None, then there + is no limit on the wait time. - Yields: - The results of the given calls in the order that they are given. + Returns: + An iterator over the results of the given calls. Equivalent to: + (call() for call in calls) but the calls may be evaluated + out-of-order. - Exceptions: - TimeoutError: if it takes more than timeout + Raises: + TimeoutError: If all the given calls were not completed before the + given timeout. + Exception: If any call() raises. """ if timeout is not None: end_time = timeout + time.time() - fs = self.run(calls, return_when=RETURN_IMMEDIATELY) + fs = self.run_to_futures(calls, return_when=RETURN_IMMEDIATELY) try: for future in fs: @@ -339,10 +446,27 @@ class Executor(object): pass def map(self, fn, iter, timeout=None): + """Returns a iterator equivalent to map(fn, iter). + + Args: + fn: A callable taking a single argument. + timeout: The maximum number of seconds to wait. If None, then there + is no limit on the wait time. + + Returns: + An iterator equivalent to: map(fn, iter) but the calls may be + evaluated out-of-order. + + Raises: + TimeoutError: If the entire result iterator could not be generated + before the given timeout. + Exception: If fn(x) raises for any x in iter. + """ calls = [functools.partial(fn, a) for a in iter] - return self.runXXX(calls, timeout) + return self.run_to_results(calls, timeout) def shutdown(self): + """Clean-up. No other methods can be called afterwards.""" raise NotImplementedError() def __enter__(self): diff --git a/futures/process.py b/futures/process.py index 06a8c25..c4dad74 100644 --- a/futures/process.py +++ b/futures/process.py @@ -3,9 +3,8 @@ from futures._base import (PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, ALL_COMPLETED, - LOGGER, set_future_exception, set_future_result, - Executor, Future, FutureList,ThreadEventSink) + Executor, Future, FutureList, ThreadEventSink) import queue import multiprocessing @@ -54,18 +53,22 @@ class ProcessPoolExecutor(Executor): max_processes = 16 self._max_processes = max_processes + # Make the call queue slightly larger than the number of processes to + # prevent the worker processes from starving but to make future.cancel() + # responsive. self._call_queue = multiprocessing.Queue(self._max_processes + 1) self._result_queue = multiprocessing.Queue() self._work_ids = queue.Queue() self._queue_management_thread = None self._processes = set() - self._shutdown = False + + # Shutdown is a two-step process. + self._shutdown_thread = False self._shutdown_process_event = multiprocessing.Event() - self._lock = threading.Lock() + self._shutdown_lock = threading.Lock() self._queue_count = 0 self._pending_work_items = {} - def _add_call_item_to_queue(self): while True: try: @@ -96,7 +99,7 @@ class ProcessPoolExecutor(Executor): result_item = self._result_queue.get(block=True, timeout=0.1) except queue.Empty: - if self._shutdown and not self._pending_work_items: + if self._shutdown_thread and not self._pending_work_items: self._shutdown_process_event.set() return else: @@ -129,10 +132,10 @@ class ProcessPoolExecutor(Executor): p.start() self._processes.add(p) - def run(self, calls, timeout=None, return_when=ALL_COMPLETED): - with self._lock: - if self._shutdown: - raise RuntimeError() + def run_to_futures(self, calls, timeout=None, return_when=ALL_COMPLETED): + with self._shutdown_lock: + if self._shutdown_thread: + raise RuntimeError('cannot run new futures after shutdown') futures = [] event_sink = ThreadEventSink() @@ -151,5 +154,5 @@ class ProcessPoolExecutor(Executor): return fl def shutdown(self): - with self._lock: - self._shutdown = True + with self._shutdown_lock: + self._shutdown_thread = True diff --git a/futures/thread.py b/futures/thread.py index d1c1409..c1f15ae 100644 --- a/futures/thread.py +++ b/futures/thread.py @@ -5,7 +5,7 @@ from futures._base import (PENDING, RUNNING, CANCELLED, ALL_COMPLETED, LOGGER, set_future_exception, set_future_result, - Executor, Future, FutureList,ThreadEventSink) + Executor, Future, FutureList, ThreadEventSink) import queue import threading @@ -43,14 +43,13 @@ class ThreadPoolExecutor(Executor): self._work_queue = queue.Queue() self._threads = set() self._shutdown = False - self._lock = threading.Lock() + self._shutdown_lock = threading.Lock() def _worker(self): try: while True: try: - work_item = self._work_queue.get(block=True, - timeout=0.1) + work_item = self._work_queue.get(block=True, timeout=0.1) except queue.Empty: if self._shutdown: return @@ -67,10 +66,10 @@ class ThreadPoolExecutor(Executor): t.start() self._threads.add(t) - def run(self, calls, timeout=None, return_when=ALL_COMPLETED): - with self._lock: + def run_to_futures(self, calls, timeout=None, return_when=ALL_COMPLETED): + with self._shutdown_lock: if self._shutdown: - raise RuntimeError() + raise RuntimeError('cannot run new futures after shutdown') futures = [] event_sink = ThreadEventSink() @@ -86,5 +85,5 @@ class ThreadPoolExecutor(Executor): return fl def shutdown(self): - with self._lock: + with self._shutdown_lock: self._shutdown = True diff --git a/primes.py b/primes.py index aff6697..7e83ea0 100644 --- a/primes.py +++ b/primes.py @@ -10,30 +10,25 @@ PRIMES = [ 115797848077099] def is_prime(n): - n = abs(n) - i = 2 - while i <= math.sqrt(n): + if n % 2 == 0: + return False + + sqrt_n = int(math.floor(math.sqrt(n))) + for i in range(3, sqrt_n + 1, 2): if n % i == 0: return False - i += 1 return True def sequential(): return list(map(is_prime, PRIMES)) def with_process_pool_executor(): - executor = futures.ProcessPoolExecutor(10) - try: + with futures.ProcessPoolExecutor(10) as executor: return list(executor.map(is_prime, PRIMES)) - finally: - executor.shutdown() def with_thread_pool_executor(): - executor = futures.ThreadPoolExecutor(10) - try: + with futures.ThreadPoolExecutor(10) as executor: return list(executor.map(is_prime, PRIMES)) - finally: - executor.shutdown() def main(): for name, fn in [('sequential', sequential), diff --git a/test_futures.py b/test_futures.py index 0093b0e..64e541b 100644 --- a/test_futures.py +++ b/test_futures.py @@ -25,20 +25,34 @@ CANCELLED_AND_NOTIFIED_FUTURE = create_future(state=CANCELLED_AND_NOTIFIED) EXCEPTION_FUTURE = create_future(state=FINISHED, exception=IOError()) SUCCESSFUL_FUTURE = create_future(state=FINISHED, result=42) - class Call(object): + CALL_LOCKS = {} def __init__(self, manual_finish=False): - self._called_event = threading.Event() + called_event = multiprocessing.Event() + can_finish = multiprocessing.Event() + + self._called_event_id = id(called_event) + self._can_finish_event_id = id(can_finish) + + self.CALL_LOCKS[self._called_event_id] = called_event + self.CALL_LOCKS[self._can_finish_event_id] = can_finish - self._can_finished = threading.Event() if not manual_finish: - self._can_finished.set() + self._can_finish.set() + + @property + def _can_finish(self): + return self.CALL_LOCKS[self._can_finish_event_id] + + @property + def _called_event(self): + return self.CALL_LOCKS[self._called_event_id] def wait_on_called(self): self._called_event.wait() def set_can(self): - self._can_finished.set() + self._can_finish.set() def called(self): return self._called_event.is_set() @@ -47,36 +61,36 @@ class Call(object): if self._called_event.is_set(): print('called twice') self._called_event.set() - self._can_finished.wait() + self._can_finish.wait() return 42 + def __del__(self): + del self.CALL_LOCKS[self._called_event_id] + del self.CALL_LOCKS[self._can_finish_event_id] + class ExceptionCall(Call): def __call__(self): assert not self._called_event.is_set(), 'already called' self._called_event.set() - self._can_finished.wait() + self._can_finish.wait() raise ZeroDivisionError() class ShutdownTest(unittest.TestCase): def test_run_after_shutdown(self): - self.executor = futures.ThreadPoolExecutor(max_threads=1) - call1 = Call() self.executor.shutdown() self.assertRaises(RuntimeError, - self.executor.run, + self.executor.run_to_futures, [call1]) - def test_threads_terminate(self): - self.executor = futures.ThreadPoolExecutor(max_threads=5) - + def _start_some_futures(self): call1 = Call(manual_finish=True) call2 = Call(manual_finish=True) call3 = Call(manual_finish=True) - self.executor.run([call1, call2, call3], - return_when=futures.RETURN_IMMEDIATELY) + self.executor.run_to_futures([call1, call2, call3], + return_when=futures.RETURN_IMMEDIATELY) call1.wait_on_called() call2.wait_on_called() @@ -86,11 +100,33 @@ class ShutdownTest(unittest.TestCase): call2.set_can() call3.set_can() +class ThreadPoolShutdownTest(ShutdownTest): + def setUp(self): + self.executor = futures.ThreadPoolExecutor(max_threads=5) + + def tearDown(self): + self.executor.shutdown() + + def test_threads_terminate(self): + self._start_some_futures() self.assertEqual(len(self.executor._threads), 3) self.executor.shutdown() for t in self.executor._threads: t.join() - + +class ProcessPoolShutdownTest(ShutdownTest): + def setUp(self): + self.executor = futures.ProcessPoolExecutor(max_processes=5) + + def tearDown(self): + self.executor.shutdown() + + def test_processes_terminate(self): + self._start_some_futures() + self.assertEqual(len(self.executor._processes), 5) + self.executor.shutdown() + for p in self.executor._processes: + p.join() class WaitsTest(unittest.TestCase): def test(self): @@ -105,7 +141,7 @@ class WaitsTest(unittest.TestCase): def wait_for_FIRST_COMPLETED(): fs.wait(return_when=futures.FIRST_COMPLETED) self.assertTrue(f1.done()) - self.assertFalse(f2.done()) + self.assertFalse(f2.done()) # XXX self.assertFalse(f3.done()) self.assertFalse(f4.done()) first_completed.release() @@ -114,7 +150,7 @@ class WaitsTest(unittest.TestCase): fs.wait(return_when=futures.FIRST_EXCEPTION) self.assertTrue(f1.done()) self.assertTrue(f2.done()) - self.assertFalse(f3.done()) + self.assertFalse(f3.done()) # XXX self.assertFalse(f4.done()) first_exception.release() @@ -127,46 +163,54 @@ class WaitsTest(unittest.TestCase): call3 = Call(manual_finish=True) call4 = Call() - fs = self.executor.run([call1, call2, call3, call4], - return_when=futures.RETURN_IMMEDIATELY) + fs = self.executor.run_to_futures( + [call1, call2, call3, call4], + return_when=futures.RETURN_IMMEDIATELY) f1, f2, f3, f4 = fs threads = [] - for call in [wait_for_ALL_COMPLETED, - wait_for_FIRST_COMPLETED, - wait_for_FIRST_EXCEPTION]: - t = threading.Thread(target=call) + for wait_test in [wait_for_ALL_COMPLETED, + wait_for_FIRST_COMPLETED, + wait_for_FIRST_EXCEPTION]: + t = threading.Thread(target=wait_test) t.start() threads.append(t) time.sleep(1) # give threads enough time to execute wait + call1.set_can() first_completed.acquire() call2.set_can() first_exception.acquire() call3.set_can() - call4.set_can() all_completed.acquire() self.executor.shutdown() class ThreadPoolWaitTests(WaitsTest): - executor = futures.ThreadPoolExecutor(max_threads=1) + def setUp(self): + self.executor = futures.ThreadPoolExecutor(max_threads=1) + + def tearDown(self): + self.executor.shutdown() class ProcessPoolWaitTests(WaitsTest): - executor = futures.ProcessPoolExecutor(max_processes=1) + def setUp(self): + self.executor = futures.ProcessPoolExecutor(max_processes=1) + + def tearDown(self): + self.executor.shutdown() class CancelTests(unittest.TestCase): def test_cancel_states(self): - executor = futures.ThreadPoolExecutor(max_threads=1) - call1 = Call(manual_finish=True) call2 = Call() call3 = Call() call4 = Call() - fs = executor.run([call1, call2, call3, call4], - return_when=futures.RETURN_IMMEDIATELY) + fs = self.executor.run_to_futures( + [call1, call2, call3, call4], + return_when=futures.RETURN_IMMEDIATELY) f1, f2, f3, f4 = fs call1.wait_on_called() @@ -193,7 +237,6 @@ class CancelTests(unittest.TestCase): self.assertEqual(call2.called(), False) self.assertEqual(call4.called(), False) - executor.shutdown() def test_wait_for_individual_cancel(self): def end_call(): @@ -201,12 +244,12 @@ class CancelTests(unittest.TestCase): f2.cancel() call1.set_can() - executor = futures.ThreadPoolExecutor(max_threads=1) - call1 = Call(manual_finish=True) call2 = Call() - fs = executor.run([call1, call2], return_when=futures.RETURN_IMMEDIATELY) + fs = self.executor.run_to_futures( + [call1, call2], + return_when=futures.RETURN_IMMEDIATELY) f1, f2 = fs call1.wait_on_called() @@ -215,18 +258,16 @@ class CancelTests(unittest.TestCase): self.assertRaises(futures.CancelledError, f2.result) self.assertRaises(futures.CancelledError, f2.exception) t.join() - executor.shutdown() def test_wait_with_already_cancelled_futures(self): - executor = futures.ThreadPoolExecutor(max_threads=1) - call1 = Call(manual_finish=True) call2 = Call() call3 = Call() call4 = Call() - fs = executor.run([call1, call2, call3, call4], - return_when=futures.RETURN_IMMEDIATELY) + fs = self.executor.run_to_futures( + [call1, call2, call3, call4], + return_when=futures.RETURN_IMMEDIATELY) f1, f2, f3, f4 = fs call1.wait_on_called() @@ -236,18 +277,16 @@ class CancelTests(unittest.TestCase): time.sleep(0.1) fs.wait(return_when=futures.ALL_COMPLETED) - executor.shutdown() def test_cancel_all(self): - executor = futures.ThreadPoolExecutor(max_threads=1) - call1 = Call(manual_finish=True) call2 = Call() call3 = Call() call4 = Call() - fs = executor.run([call1, call2, call3, call4], - return_when=futures.RETURN_IMMEDIATELY) + fs = self.executor.run_to_futures( + [call1, call2, call3, call4], + return_when=futures.RETURN_IMMEDIATELY) f1, f2, f3, f4 = fs call1.wait_on_called() @@ -259,22 +298,20 @@ class CancelTests(unittest.TestCase): self.assertTrue(f2.cancelled()) self.assertTrue(f3.cancelled()) self.assertTrue(f4.cancelled()) - executor.shutdown() - def test_cancel_repr(self): - executor = futures.ThreadPoolExecutor(max_threads=1) +class ThreadPoolCancelTests(CancelTests): + def setUp(self): + self.executor = futures.ThreadPoolExecutor(max_threads=1) - call1 = Call(manual_finish=True) - call2 = Call() + def tearDown(self): + self.executor.shutdown() - fs = executor.run([call1, call2], return_when=futures.RETURN_IMMEDIATELY) - f1, f2 = fs +class ProcessPoolCancelTests(WaitsTest): + def setUp(self): + self.executor = futures.ProcessPoolExecutor(max_processes=1) - call1.wait_on_called() - call1.set_can() - f2.cancel() - self.assertEqual(repr(f2), '') - executor.shutdown() + def tearDown(self): + self.executor.shutdown() class FutureTests(unittest.TestCase): def test_repr(self): @@ -552,12 +589,14 @@ class FutureListTests(unittest.TestCase): '[#pending=4 #cancelled=3 #running=2 #finished=6]>') def test_main(): - test.support.run_unittest(CancelTests, -# ProcessPoolWaitTests, + test.support.run_unittest(ProcessPoolCancelTests, + ThreadPoolCancelTests, + ProcessPoolWaitTests, ThreadPoolWaitTests, FutureTests, FutureListTests, - ShutdownTest) + ProcessPoolShutdownTest, + ThreadPoolShutdownTest) if __name__ == "__main__": test_main() \ No newline at end of file -- cgit v1.2.1