From e1c6c9758b70b3e9e1630b5e8545a9a1e3de7368 Mon Sep 17 00:00:00 2001 From: brian quinlan Date: Sat, 23 May 2009 09:18:19 +0000 Subject: Seperate into python2 and python3 directories --- crawl.py | 57 ---- futures/__init__.py | 6 - futures/_base.py | 482 -------------------------- futures/process.py | 155 --------- futures/thread.py | 89 ----- primes.py | 44 --- python2/crawl.py | 57 ++++ python2/futures/__init__.py | 6 + python2/futures/_base.py | 482 ++++++++++++++++++++++++++ python2/futures/process.py | 155 +++++++++ python2/futures/thread.py | 89 +++++ python2/primes.py | 44 +++ python2/test_futures.py | 798 ++++++++++++++++++++++++++++++++++++++++++++ python3/crawl.py | 57 ++++ python3/futures/__init__.py | 6 + python3/futures/_base.py | 482 ++++++++++++++++++++++++++ python3/futures/process.py | 155 +++++++++ python3/futures/thread.py | 89 +++++ python3/primes.py | 44 +++ python3/test_futures.py | 798 ++++++++++++++++++++++++++++++++++++++++++++ test_futures.py | 798 -------------------------------------------- 21 files changed, 3262 insertions(+), 1631 deletions(-) delete mode 100644 crawl.py delete mode 100644 futures/__init__.py delete mode 100644 futures/_base.py delete mode 100644 futures/process.py delete mode 100644 futures/thread.py delete mode 100644 primes.py create mode 100644 python2/crawl.py create mode 100644 python2/futures/__init__.py create mode 100644 python2/futures/_base.py create mode 100644 python2/futures/process.py create mode 100644 python2/futures/thread.py create mode 100644 python2/primes.py create mode 100644 python2/test_futures.py create mode 100644 python3/crawl.py create mode 100644 python3/futures/__init__.py create mode 100644 python3/futures/_base.py create mode 100644 python3/futures/process.py create mode 100644 python3/futures/thread.py create mode 100644 python3/primes.py create mode 100644 python3/test_futures.py delete mode 100644 test_futures.py diff --git a/crawl.py b/crawl.py deleted file mode 100644 index 10e35c3..0000000 --- a/crawl.py +++ /dev/null @@ -1,57 +0,0 @@ -import datetime -import functools -import futures.thread -import time -import timeit -import urllib.request - -URLS = ['http://www.google.com/', - 'http://www.apple.com/', - 'http://www.ibm.com', - 'http://www.thisurlprobablydoesnotexist.com', - 'http://www.slashdot.org/', - 'http://www.python.org/', - 'http://www.sweetapp.com/'] * 5 - -def load_url(url, timeout): - return urllib.request.urlopen(url, timeout=timeout).read() - -def download_urls_sequential(urls, timeout=60): - url_to_content = {} - for url in urls: - try: - url_to_content[url] = load_url(url, timeout=timeout) - except: - pass - return url_to_content - -def download_urls_with_executor(urls, executor, timeout=60): - try: - url_to_content = {} - fs = executor.run_to_futures( - (functools.partial(load_url, url, timeout) for url in urls), - timeout=timeout) - for future in fs.successful_futures(): - url = urls[future.index] - url_to_content[url] = future.result() - return url_to_content - finally: - executor.shutdown() - -def main(): - for name, fn in [('sequential', - functools.partial(download_urls_sequential, URLS)), - ('processes', - functools.partial(download_urls_with_executor, - URLS, - futures.ProcessPoolExecutor(10))), - ('threads', - functools.partial(download_urls_with_executor, - URLS, - futures.ThreadPoolExecutor(10)))]: - print('%s: ' % name.ljust(12), end='') - start = time.time() - fn() - print('%.2f seconds' % (time.time() - start)) - -main() diff --git a/futures/__init__.py b/futures/__init__.py deleted file mode 100644 index 5f599ad..0000000 --- a/futures/__init__.py +++ /dev/null @@ -1,6 +0,0 @@ -from futures._base import (FIRST_COMPLETED, FIRST_EXCEPTION, - ALL_COMPLETED, RETURN_IMMEDIATELY, - CancelledError, TimeoutError, - Future, FutureList) -from futures.thread import ThreadPoolExecutor -from futures.process import ProcessPoolExecutor diff --git a/futures/_base.py b/futures/_base.py deleted file mode 100644 index 19cabe8..0000000 --- a/futures/_base.py +++ /dev/null @@ -1,482 +0,0 @@ -import functools -import logging -import threading -import time - -FIRST_COMPLETED = 0 -FIRST_EXCEPTION = 1 -ALL_COMPLETED = 2 -RETURN_IMMEDIATELY = 3 - -# Possible future states -PENDING = 0 -RUNNING = 1 -CANCELLED = 2 # The future was cancelled... -CANCELLED_AND_NOTIFIED = 3 # ...and .add_cancelled() was called. -FINISHED = 4 - -FUTURE_STATES = [ - PENDING, - RUNNING, - CANCELLED, - CANCELLED_AND_NOTIFIED, - FINISHED -] - -_STATE_TO_DESCRIPTION_MAP = { - PENDING: "pending", - RUNNING: "running", - CANCELLED: "cancelled", - CANCELLED_AND_NOTIFIED: "cancelled", - FINISHED: "finished" -} - -LOGGER = logging.getLogger("futures") -_handler = logging.StreamHandler() -LOGGER.addHandler(_handler) -del _handler - -def set_future_exception(future, event_sink, exception): - with future._condition: - future._exception = exception - with event_sink._condition: - future._state = FINISHED - event_sink.add_exception() - future._condition.notify_all() - -def set_future_result(future, event_sink, result): - with future._condition: - future._result = result - with event_sink._condition: - future._state = FINISHED - event_sink.add_result() - future._condition.notify_all() - -class Error(Exception): - pass - -class CancelledError(Error): - pass - -class TimeoutError(Error): - pass - -class Future(object): - """Represents the result of an asynchronous computation.""" - - def __init__(self, index): - """Initializes the future. Should not be called by clients.""" - self._condition = threading.Condition() - self._state = PENDING - self._result = None - self._exception = None - self._index = index - - def __repr__(self): - with self._condition: - if self._state == FINISHED: - if self._exception: - return '' % ( - _STATE_TO_DESCRIPTION_MAP[self._state], - self._exception.__class__.__name__) - else: - return '' % ( - _STATE_TO_DESCRIPTION_MAP[self._state], - self._result.__class__.__name__) - return '' % _STATE_TO_DESCRIPTION_MAP[self._state] - - @property - def index(self): - """The index of the future in its FutureList.""" - return self._index - - def cancel(self): - """Cancel the future if possible. - - Returns True if the future was cancelled, False otherwise. A future - cannot be cancelled if it is running or has already completed. - """ - with self._condition: - if self._state in [RUNNING, FINISHED]: - return False - - if self._state not in [CANCELLED, CANCELLED_AND_NOTIFIED]: - self._state = CANCELLED - self._condition.notify_all() - return True - - def cancelled(self): - """Return True if the future has cancelled.""" - with self._condition: - return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED] - - def running(self): - with self._condition: - return self._state == RUNNING - - def done(self): - """Return True of the future was cancelled or finished executing.""" - with self._condition: - return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED] - - def __get_result(self): - if self._exception: - raise self._exception - else: - return self._result - - def result(self, timeout=None): - """Return the result of the call that the future represents. - - Args: - timeout: The number of seconds to wait for the result if the future - isn't done. If None, then there is no limit on the wait time. - - Returns: - The result of the call that the future represents. - - Raises: - CancelledError: If the future was cancelled. - TimeoutError: If the future didn't finish executing before the given - timeout. - Exception: If the call raised then that exception will be raised. - """ - with self._condition: - if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: - raise CancelledError() - elif self._state == FINISHED: - return self.__get_result() - - self._condition.wait(timeout) - - if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: - raise CancelledError() - elif self._state == FINISHED: - return self.__get_result() - else: - raise TimeoutError() - - def exception(self, timeout=None): - """Return the exception raised by the call that the future represents. - - Args: - timeout: The number of seconds to wait for the exception if the - future isn't done. If None, then there is no limit on the wait - time. - - Returns: - The exception raised by the call that the future represents or None - if the call completed without raising. - - Raises: - CancelledError: If the future was cancelled. - TimeoutError: If the future didn't finish executing before the given - timeout. - """ - - with self._condition: - if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: - raise CancelledError() - elif self._state == FINISHED: - return self._exception - - self._condition.wait(timeout) - - if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: - raise CancelledError() - elif self._state == FINISHED: - return self._exception - else: - raise TimeoutError() - -class _FirstCompletedWaitTracker(object): - def __init__(self): - self.event = threading.Event() - - def add_result(self): - self.event.set() - - def add_exception(self): - self.event.set() - - def add_cancelled(self): - self.event.set() - -class _AllCompletedWaitTracker(object): - def __init__(self, pending_calls, stop_on_exception): - self.pending_calls = pending_calls - self.stop_on_exception = stop_on_exception - self.event = threading.Event() - - def add_result(self): - self.pending_calls -= 1 - if not self.pending_calls: - self.event.set() - - def add_exception(self): - if self.stop_on_exception: - self.event.set() - else: - self.add_result() - - def add_cancelled(self): - self.add_result() - -class ThreadEventSink(object): - def __init__(self): - self._condition = threading.Lock() - self._waiters = [] - - def add(self, e): - self._waiters.append(e) - - def remove(self, e): - self._waiters.remove(e) - - def add_result(self): - for waiter in self._waiters: - waiter.add_result() - - def add_exception(self): - for waiter in self._waiters: - waiter.add_exception() - - def add_cancelled(self): - for waiter in self._waiters: - waiter.add_cancelled() - -class FutureList(object): - def __init__(self, futures, event_sink): - """Initializes the FutureList. Should not be called by clients.""" - self._futures = futures - self._event_sink = event_sink - - def wait(self, timeout=None, return_when=ALL_COMPLETED): - """Wait for the futures in the list to complete. - - Args: - timeout: The maximum number of seconds to wait. If None, then there - is no limit on the wait time. - return_when: Indicates when the method should return. The options - are: - - FIRST_COMPLETED - Return when any future finishes or is - cancelled. - FIRST_EXCEPTION - Return when any future finishes by raising an - exception. If no future raises and exception - then it is equivalent to ALL_COMPLETED. - ALL_COMPLETED - Return when all futures finish or are cancelled. - RETURN_IMMEDIATELY - Return without waiting (this is not likely - to be a useful option but it is there to - be symmetrical with the - executor.run_to_futures() method. - - Raises: - TimeoutError: If the wait condition wasn't satisfied before the - given timeout. - """ - if return_when == RETURN_IMMEDIATELY: - return - - with self._event_sink._condition: - # Make a quick exit if every future is already done. This check is - # necessary because, if every future is in the - # CANCELLED_AND_NOTIFIED or FINISHED state then the WaitTracker will - # never receive any - if all(f._state in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED] - for f in self): - return - - if return_when == FIRST_COMPLETED: - completed_tracker = _FirstCompletedWaitTracker() - else: - # Calculate how many events are expected before every future - # is complete. This can be done without holding the futures' - # locks because a future cannot transition itself into either - # of the states being looked for. - pending_count = sum( - f._state not in [CANCELLED_AND_NOTIFIED, FINISHED] - for f in self) - - if return_when == FIRST_EXCEPTION: - completed_tracker = _AllCompletedWaitTracker( - pending_count, stop_on_exception=True) - elif return_when == ALL_COMPLETED: - completed_tracker = _AllCompletedWaitTracker( - pending_count, stop_on_exception=False) - - self._event_sink.add(completed_tracker) - - try: - completed_tracker.event.wait(timeout) - finally: - self._event_sink.remove(completed_tracker) - - def cancel(self, timeout=None): - """Cancel the futures in the list. - - Args: - timeout: The maximum number of seconds to wait. If None, then there - is no limit on the wait time. - - Raises: - TimeoutError: If all the futures were not finished before the - given timeout. - """ - for f in self: - f.cancel() - self.wait(timeout=timeout, return_when=ALL_COMPLETED) - if any(not f.done() for f in self): - raise TimeoutError() - - def has_running_futures(self): - return any(self.running_futures()) - - def has_cancelled_futures(self): - return any(self.cancelled_futures()) - - def has_done_futures(self): - return any(self.done_futures()) - - def has_successful_futures(self): - return any(self.successful_futures()) - - def has_exception_futures(self): - return any(self.exception_futures()) - - def cancelled_futures(self): - return (f for f in self - if f._state in [CANCELLED, CANCELLED_AND_NOTIFIED]) - - def done_futures(self): - return (f for f in self - if f._state in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]) - - def successful_futures(self): - return (f for f in self - if f._state == FINISHED and f._exception is None) - - def exception_futures(self): - return (f for f in self - if f._state == FINISHED and f._exception is not None) - - def running_futures(self): - return (f for f in self if f._state == RUNNING) - - def __len__(self): - return len(self._futures) - - def __getitem__(self, i): - return self._futures[i] - - def __iter__(self): - return iter(self._futures) - - def __contains__(self, future): - return future in self._futures - - def __repr__(self): - states = {state: 0 for state in FUTURE_STATES} - for f in self: - states[f._state] += 1 - - return ('' % ( - len(self), - states[PENDING], - states[CANCELLED] + states[CANCELLED_AND_NOTIFIED], - states[RUNNING], - states[FINISHED])) - -class Executor(object): - def run_to_futures(self, calls, timeout=None, return_when=ALL_COMPLETED): - """Return a list of futures representing the given calls. - - Args: - calls: A sequence of callables that take no arguments. These will - be bound to Futures and returned. - timeout: The maximum number of seconds to wait. If None, then there - is no limit on the wait time. - return_when: Indicates when the method should return. The options - are: - - FIRST_COMPLETED - Return when any future finishes or is - cancelled. - FIRST_EXCEPTION - Return when any future finishes by raising an - exception. If no future raises and exception - then it is equivalent to ALL_COMPLETED. - ALL_COMPLETED - Return when all futures finish or are cancelled. - RETURN_IMMEDIATELY - Return without waiting. - - Returns: - A FuturesList containing futures for the given calls. - """ - raise NotImplementedError() - - def run_to_results(self, calls, timeout=None): - """Returns a iterator of the results of the given calls. - - Args: - calls: A sequence of callables that take no arguments. These will - be called and their results returned. - timeout: The maximum number of seconds to wait. If None, then there - is no limit on the wait time. - - Returns: - An iterator over the results of the given calls. Equivalent to: - (call() for call in calls) but the calls may be evaluated - out-of-order. - - Raises: - TimeoutError: If all the given calls were not completed before the - given timeout. - Exception: If any call() raises. - """ - if timeout is not None: - end_time = timeout + time.time() - - fs = self.run_to_futures(calls, return_when=RETURN_IMMEDIATELY) - - try: - for future in fs: - if timeout is None: - yield future.result() - else: - yield future.result(end_time - time.time()) - finally: - try: - fs.cancel(timeout=0) - except TimeoutError: - pass - - def map(self, func, *iterables, timeout=None): - """Returns a iterator equivalent to map(fn, iter). - - Args: - func: A callable that will take take as many arguments as there - are passed iterables. - timeout: The maximum number of seconds to wait. If None, then there - is no limit on the wait time. - - Returns: - An iterator equivalent to: map(func, *iterables) but the calls may - be evaluated out-of-order. - - Raises: - TimeoutError: If the entire result iterator could not be generated - before the given timeout. - Exception: If fn(*args) raises for any values. - """ - calls = [functools.partial(func, *args) for args in zip(*iterables)] - return self.run_to_results(calls, timeout) - - def shutdown(self): - """Clean-up. No other methods can be called afterwards.""" - raise NotImplementedError() - - def __enter__(self): - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - self.shutdown() - return False diff --git a/futures/process.py b/futures/process.py deleted file mode 100644 index ad6dc6d..0000000 --- a/futures/process.py +++ /dev/null @@ -1,155 +0,0 @@ -#!/usr/bin/env python - -from futures._base import (PENDING, RUNNING, CANCELLED, - CANCELLED_AND_NOTIFIED, FINISHED, - ALL_COMPLETED, - set_future_exception, set_future_result, - Executor, Future, FutureList, ThreadEventSink) - -import queue -import multiprocessing -import threading - -class _WorkItem(object): - def __init__(self, call, future, completion_tracker): - self.call = call - self.future = future - self.completion_tracker = completion_tracker - -class _ResultItem(object): - def __init__(self, work_id, exception=None, result=None): - self.work_id = work_id - self.exception = exception - self.result = result - -class _CallItem(object): - def __init__(self, work_id, call): - self.work_id = work_id - self.call = call - -def _process_worker(call_queue, result_queue, shutdown): - while True: - try: - call_item = call_queue.get(block=True, timeout=0.1) - except queue.Empty: - if shutdown.is_set(): - return - else: - try: - r = call_item.call() - except BaseException as e: - result_queue.put(_ResultItem(call_item.work_id, - exception=e)) - else: - result_queue.put(_ResultItem(call_item.work_id, - result=r)) - -class ProcessPoolExecutor(Executor): - def __init__(self, max_processes=None): - if max_processes is None: - max_processes = multiprocessing.cpu_count() - - self._max_processes = max_processes - # Make the call queue slightly larger than the number of processes to - # prevent the worker processes from starving but to make future.cancel() - # responsive. - self._call_queue = multiprocessing.Queue(self._max_processes + 1) - self._result_queue = multiprocessing.Queue() - self._work_ids = queue.Queue() - self._queue_management_thread = None - self._processes = set() - - # Shutdown is a two-step process. - self._shutdown_thread = False - self._shutdown_process_event = multiprocessing.Event() - self._shutdown_lock = threading.Lock() - self._queue_count = 0 - self._pending_work_items = {} - - def _add_call_item_to_queue(self): - while True: - try: - work_id = self._work_ids.get(block=False) - except queue.Empty: - return - else: - work_item = self._pending_work_items[work_id] - - if work_item.future.cancelled(): - with work_item.future._condition: - work_item.future._condition.notify_all() - work_item.completion_tracker.add_cancelled() - continue - else: - with work_item.future._condition: - work_item.future._state = RUNNING - - self._call_queue.put(_CallItem(work_id, work_item.call), - block=True) - if self._call_queue.full(): - return - - def _result(self): - while True: - self._add_call_item_to_queue() - try: - result_item = self._result_queue.get(block=True, - timeout=0.1) - except queue.Empty: - if self._shutdown_thread and not self._pending_work_items: - self._shutdown_process_event.set() - return - else: - work_item = self._pending_work_items[result_item.work_id] - del self._pending_work_items[result_item.work_id] - - if result_item.exception: - set_future_exception(work_item.future, - work_item.completion_tracker, - result_item.exception) - else: - set_future_result(work_item.future, - work_item.completion_tracker, - result_item.result) - - def _adjust_process_count(self): - if self._queue_management_thread is None: - self._queue_management_thread = threading.Thread( - target=self._result) - self._queue_management_thread.daemon = True - self._queue_management_thread.start() - - for _ in range(len(self._processes), self._max_processes): - p = multiprocessing.Process( - target=_process_worker, - args=(self._call_queue, - self._result_queue, - self._shutdown_process_event)) - p.daemon = True - p.start() - self._processes.add(p) - - def run_to_futures(self, calls, timeout=None, return_when=ALL_COMPLETED): - with self._shutdown_lock: - if self._shutdown_thread: - raise RuntimeError('cannot run new futures after shutdown') - - futures = [] - event_sink = ThreadEventSink() - self._queue_count - for index, call in enumerate(calls): - f = Future(index) - self._pending_work_items[self._queue_count] = _WorkItem( - call, f, event_sink) - self._work_ids.put(self._queue_count) - futures.append(f) - self._queue_count += 1 - - self._adjust_process_count() - fl = FutureList(futures, event_sink) - fl.wait(timeout=timeout, return_when=return_when) - return fl - - def shutdown(self): - with self._shutdown_lock: - self._shutdown_thread = True diff --git a/futures/thread.py b/futures/thread.py deleted file mode 100644 index 9e3275f..0000000 --- a/futures/thread.py +++ /dev/null @@ -1,89 +0,0 @@ -#!/usr/bin/env python - -from futures._base import (PENDING, RUNNING, CANCELLED, - CANCELLED_AND_NOTIFIED, FINISHED, - ALL_COMPLETED, - LOGGER, - set_future_exception, set_future_result, - Executor, Future, FutureList, ThreadEventSink) -import queue -import threading - -class _WorkItem(object): - def __init__(self, call, future, completion_tracker): - self.call = call - self.future = future - self.completion_tracker = completion_tracker - - def run(self): - with self.future._condition: - if self.future._state == PENDING: - self.future._state = RUNNING - elif self.future._state == CANCELLED: - with self.completion_tracker._condition: - self.future._state = CANCELLED_AND_NOTIFIED - self.completion_tracker.add_cancelled() - return - else: - LOGGER.critical('Future %s in unexpected state: %d', - id(self.future), - self.future._state) - return - - try: - result = self.call() - except BaseException as e: - set_future_exception(self.future, self.completion_tracker, e) - else: - set_future_result(self.future, self.completion_tracker, result) - -class ThreadPoolExecutor(Executor): - def __init__(self, max_threads): - self._max_threads = max_threads - self._work_queue = queue.Queue() - self._threads = set() - self._shutdown = False - self._shutdown_lock = threading.Lock() - - def _worker(self): - try: - while True: - try: - work_item = self._work_queue.get(block=True, timeout=0.1) - except queue.Empty: - if self._shutdown: - return - else: - work_item.run() - except BaseException as e: - LOGGER.critical('Exception in worker', exc_info=True) - - def _adjust_thread_count(self): - for _ in range(len(self._threads), - min(self._max_threads, self._work_queue.qsize())): - t = threading.Thread(target=self._worker) - t.daemon = True - t.start() - self._threads.add(t) - - def run_to_futures(self, calls, timeout=None, return_when=ALL_COMPLETED): - with self._shutdown_lock: - if self._shutdown: - raise RuntimeError('cannot run new futures after shutdown') - - futures = [] - event_sink = ThreadEventSink() - for index, call in enumerate(calls): - f = Future(index) - w = _WorkItem(call, f, event_sink) - self._work_queue.put(w) - futures.append(f) - - self._adjust_thread_count() - fl = FutureList(futures, event_sink) - fl.wait(timeout=timeout, return_when=return_when) - return fl - - def shutdown(self): - with self._shutdown_lock: - self._shutdown = True diff --git a/primes.py b/primes.py deleted file mode 100644 index 7e83ea0..0000000 --- a/primes.py +++ /dev/null @@ -1,44 +0,0 @@ -import futures -import math -import time - -PRIMES = [ - 112272535095293, - 112582705942171, - 112272535095293, - 115280095190773, - 115797848077099] - -def is_prime(n): - if n % 2 == 0: - return False - - sqrt_n = int(math.floor(math.sqrt(n))) - for i in range(3, sqrt_n + 1, 2): - if n % i == 0: - return False - return True - -def sequential(): - return list(map(is_prime, PRIMES)) - -def with_process_pool_executor(): - with futures.ProcessPoolExecutor(10) as executor: - return list(executor.map(is_prime, PRIMES)) - -def with_thread_pool_executor(): - with futures.ThreadPoolExecutor(10) as executor: - return list(executor.map(is_prime, PRIMES)) - -def main(): - for name, fn in [('sequential', sequential), - ('processes', with_process_pool_executor), - ('threads', with_thread_pool_executor)]: - print('%s: ' % name.ljust(12), end='') - start = time.time() - if fn() != [True] * len(PRIMES): - print('failed') - else: - print('%.2f seconds' % (time.time() - start)) - -main() \ No newline at end of file diff --git a/python2/crawl.py b/python2/crawl.py new file mode 100644 index 0000000..10e35c3 --- /dev/null +++ b/python2/crawl.py @@ -0,0 +1,57 @@ +import datetime +import functools +import futures.thread +import time +import timeit +import urllib.request + +URLS = ['http://www.google.com/', + 'http://www.apple.com/', + 'http://www.ibm.com', + 'http://www.thisurlprobablydoesnotexist.com', + 'http://www.slashdot.org/', + 'http://www.python.org/', + 'http://www.sweetapp.com/'] * 5 + +def load_url(url, timeout): + return urllib.request.urlopen(url, timeout=timeout).read() + +def download_urls_sequential(urls, timeout=60): + url_to_content = {} + for url in urls: + try: + url_to_content[url] = load_url(url, timeout=timeout) + except: + pass + return url_to_content + +def download_urls_with_executor(urls, executor, timeout=60): + try: + url_to_content = {} + fs = executor.run_to_futures( + (functools.partial(load_url, url, timeout) for url in urls), + timeout=timeout) + for future in fs.successful_futures(): + url = urls[future.index] + url_to_content[url] = future.result() + return url_to_content + finally: + executor.shutdown() + +def main(): + for name, fn in [('sequential', + functools.partial(download_urls_sequential, URLS)), + ('processes', + functools.partial(download_urls_with_executor, + URLS, + futures.ProcessPoolExecutor(10))), + ('threads', + functools.partial(download_urls_with_executor, + URLS, + futures.ThreadPoolExecutor(10)))]: + print('%s: ' % name.ljust(12), end='') + start = time.time() + fn() + print('%.2f seconds' % (time.time() - start)) + +main() diff --git a/python2/futures/__init__.py b/python2/futures/__init__.py new file mode 100644 index 0000000..5f599ad --- /dev/null +++ b/python2/futures/__init__.py @@ -0,0 +1,6 @@ +from futures._base import (FIRST_COMPLETED, FIRST_EXCEPTION, + ALL_COMPLETED, RETURN_IMMEDIATELY, + CancelledError, TimeoutError, + Future, FutureList) +from futures.thread import ThreadPoolExecutor +from futures.process import ProcessPoolExecutor diff --git a/python2/futures/_base.py b/python2/futures/_base.py new file mode 100644 index 0000000..19cabe8 --- /dev/null +++ b/python2/futures/_base.py @@ -0,0 +1,482 @@ +import functools +import logging +import threading +import time + +FIRST_COMPLETED = 0 +FIRST_EXCEPTION = 1 +ALL_COMPLETED = 2 +RETURN_IMMEDIATELY = 3 + +# Possible future states +PENDING = 0 +RUNNING = 1 +CANCELLED = 2 # The future was cancelled... +CANCELLED_AND_NOTIFIED = 3 # ...and .add_cancelled() was called. +FINISHED = 4 + +FUTURE_STATES = [ + PENDING, + RUNNING, + CANCELLED, + CANCELLED_AND_NOTIFIED, + FINISHED +] + +_STATE_TO_DESCRIPTION_MAP = { + PENDING: "pending", + RUNNING: "running", + CANCELLED: "cancelled", + CANCELLED_AND_NOTIFIED: "cancelled", + FINISHED: "finished" +} + +LOGGER = logging.getLogger("futures") +_handler = logging.StreamHandler() +LOGGER.addHandler(_handler) +del _handler + +def set_future_exception(future, event_sink, exception): + with future._condition: + future._exception = exception + with event_sink._condition: + future._state = FINISHED + event_sink.add_exception() + future._condition.notify_all() + +def set_future_result(future, event_sink, result): + with future._condition: + future._result = result + with event_sink._condition: + future._state = FINISHED + event_sink.add_result() + future._condition.notify_all() + +class Error(Exception): + pass + +class CancelledError(Error): + pass + +class TimeoutError(Error): + pass + +class Future(object): + """Represents the result of an asynchronous computation.""" + + def __init__(self, index): + """Initializes the future. Should not be called by clients.""" + self._condition = threading.Condition() + self._state = PENDING + self._result = None + self._exception = None + self._index = index + + def __repr__(self): + with self._condition: + if self._state == FINISHED: + if self._exception: + return '' % ( + _STATE_TO_DESCRIPTION_MAP[self._state], + self._exception.__class__.__name__) + else: + return '' % ( + _STATE_TO_DESCRIPTION_MAP[self._state], + self._result.__class__.__name__) + return '' % _STATE_TO_DESCRIPTION_MAP[self._state] + + @property + def index(self): + """The index of the future in its FutureList.""" + return self._index + + def cancel(self): + """Cancel the future if possible. + + Returns True if the future was cancelled, False otherwise. A future + cannot be cancelled if it is running or has already completed. + """ + with self._condition: + if self._state in [RUNNING, FINISHED]: + return False + + if self._state not in [CANCELLED, CANCELLED_AND_NOTIFIED]: + self._state = CANCELLED + self._condition.notify_all() + return True + + def cancelled(self): + """Return True if the future has cancelled.""" + with self._condition: + return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED] + + def running(self): + with self._condition: + return self._state == RUNNING + + def done(self): + """Return True of the future was cancelled or finished executing.""" + with self._condition: + return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED] + + def __get_result(self): + if self._exception: + raise self._exception + else: + return self._result + + def result(self, timeout=None): + """Return the result of the call that the future represents. + + Args: + timeout: The number of seconds to wait for the result if the future + isn't done. If None, then there is no limit on the wait time. + + Returns: + The result of the call that the future represents. + + Raises: + CancelledError: If the future was cancelled. + TimeoutError: If the future didn't finish executing before the given + timeout. + Exception: If the call raised then that exception will be raised. + """ + with self._condition: + if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: + raise CancelledError() + elif self._state == FINISHED: + return self.__get_result() + + self._condition.wait(timeout) + + if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: + raise CancelledError() + elif self._state == FINISHED: + return self.__get_result() + else: + raise TimeoutError() + + def exception(self, timeout=None): + """Return the exception raised by the call that the future represents. + + Args: + timeout: The number of seconds to wait for the exception if the + future isn't done. If None, then there is no limit on the wait + time. + + Returns: + The exception raised by the call that the future represents or None + if the call completed without raising. + + Raises: + CancelledError: If the future was cancelled. + TimeoutError: If the future didn't finish executing before the given + timeout. + """ + + with self._condition: + if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: + raise CancelledError() + elif self._state == FINISHED: + return self._exception + + self._condition.wait(timeout) + + if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: + raise CancelledError() + elif self._state == FINISHED: + return self._exception + else: + raise TimeoutError() + +class _FirstCompletedWaitTracker(object): + def __init__(self): + self.event = threading.Event() + + def add_result(self): + self.event.set() + + def add_exception(self): + self.event.set() + + def add_cancelled(self): + self.event.set() + +class _AllCompletedWaitTracker(object): + def __init__(self, pending_calls, stop_on_exception): + self.pending_calls = pending_calls + self.stop_on_exception = stop_on_exception + self.event = threading.Event() + + def add_result(self): + self.pending_calls -= 1 + if not self.pending_calls: + self.event.set() + + def add_exception(self): + if self.stop_on_exception: + self.event.set() + else: + self.add_result() + + def add_cancelled(self): + self.add_result() + +class ThreadEventSink(object): + def __init__(self): + self._condition = threading.Lock() + self._waiters = [] + + def add(self, e): + self._waiters.append(e) + + def remove(self, e): + self._waiters.remove(e) + + def add_result(self): + for waiter in self._waiters: + waiter.add_result() + + def add_exception(self): + for waiter in self._waiters: + waiter.add_exception() + + def add_cancelled(self): + for waiter in self._waiters: + waiter.add_cancelled() + +class FutureList(object): + def __init__(self, futures, event_sink): + """Initializes the FutureList. Should not be called by clients.""" + self._futures = futures + self._event_sink = event_sink + + def wait(self, timeout=None, return_when=ALL_COMPLETED): + """Wait for the futures in the list to complete. + + Args: + timeout: The maximum number of seconds to wait. If None, then there + is no limit on the wait time. + return_when: Indicates when the method should return. The options + are: + + FIRST_COMPLETED - Return when any future finishes or is + cancelled. + FIRST_EXCEPTION - Return when any future finishes by raising an + exception. If no future raises and exception + then it is equivalent to ALL_COMPLETED. + ALL_COMPLETED - Return when all futures finish or are cancelled. + RETURN_IMMEDIATELY - Return without waiting (this is not likely + to be a useful option but it is there to + be symmetrical with the + executor.run_to_futures() method. + + Raises: + TimeoutError: If the wait condition wasn't satisfied before the + given timeout. + """ + if return_when == RETURN_IMMEDIATELY: + return + + with self._event_sink._condition: + # Make a quick exit if every future is already done. This check is + # necessary because, if every future is in the + # CANCELLED_AND_NOTIFIED or FINISHED state then the WaitTracker will + # never receive any + if all(f._state in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED] + for f in self): + return + + if return_when == FIRST_COMPLETED: + completed_tracker = _FirstCompletedWaitTracker() + else: + # Calculate how many events are expected before every future + # is complete. This can be done without holding the futures' + # locks because a future cannot transition itself into either + # of the states being looked for. + pending_count = sum( + f._state not in [CANCELLED_AND_NOTIFIED, FINISHED] + for f in self) + + if return_when == FIRST_EXCEPTION: + completed_tracker = _AllCompletedWaitTracker( + pending_count, stop_on_exception=True) + elif return_when == ALL_COMPLETED: + completed_tracker = _AllCompletedWaitTracker( + pending_count, stop_on_exception=False) + + self._event_sink.add(completed_tracker) + + try: + completed_tracker.event.wait(timeout) + finally: + self._event_sink.remove(completed_tracker) + + def cancel(self, timeout=None): + """Cancel the futures in the list. + + Args: + timeout: The maximum number of seconds to wait. If None, then there + is no limit on the wait time. + + Raises: + TimeoutError: If all the futures were not finished before the + given timeout. + """ + for f in self: + f.cancel() + self.wait(timeout=timeout, return_when=ALL_COMPLETED) + if any(not f.done() for f in self): + raise TimeoutError() + + def has_running_futures(self): + return any(self.running_futures()) + + def has_cancelled_futures(self): + return any(self.cancelled_futures()) + + def has_done_futures(self): + return any(self.done_futures()) + + def has_successful_futures(self): + return any(self.successful_futures()) + + def has_exception_futures(self): + return any(self.exception_futures()) + + def cancelled_futures(self): + return (f for f in self + if f._state in [CANCELLED, CANCELLED_AND_NOTIFIED]) + + def done_futures(self): + return (f for f in self + if f._state in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]) + + def successful_futures(self): + return (f for f in self + if f._state == FINISHED and f._exception is None) + + def exception_futures(self): + return (f for f in self + if f._state == FINISHED and f._exception is not None) + + def running_futures(self): + return (f for f in self if f._state == RUNNING) + + def __len__(self): + return len(self._futures) + + def __getitem__(self, i): + return self._futures[i] + + def __iter__(self): + return iter(self._futures) + + def __contains__(self, future): + return future in self._futures + + def __repr__(self): + states = {state: 0 for state in FUTURE_STATES} + for f in self: + states[f._state] += 1 + + return ('' % ( + len(self), + states[PENDING], + states[CANCELLED] + states[CANCELLED_AND_NOTIFIED], + states[RUNNING], + states[FINISHED])) + +class Executor(object): + def run_to_futures(self, calls, timeout=None, return_when=ALL_COMPLETED): + """Return a list of futures representing the given calls. + + Args: + calls: A sequence of callables that take no arguments. These will + be bound to Futures and returned. + timeout: The maximum number of seconds to wait. If None, then there + is no limit on the wait time. + return_when: Indicates when the method should return. The options + are: + + FIRST_COMPLETED - Return when any future finishes or is + cancelled. + FIRST_EXCEPTION - Return when any future finishes by raising an + exception. If no future raises and exception + then it is equivalent to ALL_COMPLETED. + ALL_COMPLETED - Return when all futures finish or are cancelled. + RETURN_IMMEDIATELY - Return without waiting. + + Returns: + A FuturesList containing futures for the given calls. + """ + raise NotImplementedError() + + def run_to_results(self, calls, timeout=None): + """Returns a iterator of the results of the given calls. + + Args: + calls: A sequence of callables that take no arguments. These will + be called and their results returned. + timeout: The maximum number of seconds to wait. If None, then there + is no limit on the wait time. + + Returns: + An iterator over the results of the given calls. Equivalent to: + (call() for call in calls) but the calls may be evaluated + out-of-order. + + Raises: + TimeoutError: If all the given calls were not completed before the + given timeout. + Exception: If any call() raises. + """ + if timeout is not None: + end_time = timeout + time.time() + + fs = self.run_to_futures(calls, return_when=RETURN_IMMEDIATELY) + + try: + for future in fs: + if timeout is None: + yield future.result() + else: + yield future.result(end_time - time.time()) + finally: + try: + fs.cancel(timeout=0) + except TimeoutError: + pass + + def map(self, func, *iterables, timeout=None): + """Returns a iterator equivalent to map(fn, iter). + + Args: + func: A callable that will take take as many arguments as there + are passed iterables. + timeout: The maximum number of seconds to wait. If None, then there + is no limit on the wait time. + + Returns: + An iterator equivalent to: map(func, *iterables) but the calls may + be evaluated out-of-order. + + Raises: + TimeoutError: If the entire result iterator could not be generated + before the given timeout. + Exception: If fn(*args) raises for any values. + """ + calls = [functools.partial(func, *args) for args in zip(*iterables)] + return self.run_to_results(calls, timeout) + + def shutdown(self): + """Clean-up. No other methods can be called afterwards.""" + raise NotImplementedError() + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.shutdown() + return False diff --git a/python2/futures/process.py b/python2/futures/process.py new file mode 100644 index 0000000..ad6dc6d --- /dev/null +++ b/python2/futures/process.py @@ -0,0 +1,155 @@ +#!/usr/bin/env python + +from futures._base import (PENDING, RUNNING, CANCELLED, + CANCELLED_AND_NOTIFIED, FINISHED, + ALL_COMPLETED, + set_future_exception, set_future_result, + Executor, Future, FutureList, ThreadEventSink) + +import queue +import multiprocessing +import threading + +class _WorkItem(object): + def __init__(self, call, future, completion_tracker): + self.call = call + self.future = future + self.completion_tracker = completion_tracker + +class _ResultItem(object): + def __init__(self, work_id, exception=None, result=None): + self.work_id = work_id + self.exception = exception + self.result = result + +class _CallItem(object): + def __init__(self, work_id, call): + self.work_id = work_id + self.call = call + +def _process_worker(call_queue, result_queue, shutdown): + while True: + try: + call_item = call_queue.get(block=True, timeout=0.1) + except queue.Empty: + if shutdown.is_set(): + return + else: + try: + r = call_item.call() + except BaseException as e: + result_queue.put(_ResultItem(call_item.work_id, + exception=e)) + else: + result_queue.put(_ResultItem(call_item.work_id, + result=r)) + +class ProcessPoolExecutor(Executor): + def __init__(self, max_processes=None): + if max_processes is None: + max_processes = multiprocessing.cpu_count() + + self._max_processes = max_processes + # Make the call queue slightly larger than the number of processes to + # prevent the worker processes from starving but to make future.cancel() + # responsive. + self._call_queue = multiprocessing.Queue(self._max_processes + 1) + self._result_queue = multiprocessing.Queue() + self._work_ids = queue.Queue() + self._queue_management_thread = None + self._processes = set() + + # Shutdown is a two-step process. + self._shutdown_thread = False + self._shutdown_process_event = multiprocessing.Event() + self._shutdown_lock = threading.Lock() + self._queue_count = 0 + self._pending_work_items = {} + + def _add_call_item_to_queue(self): + while True: + try: + work_id = self._work_ids.get(block=False) + except queue.Empty: + return + else: + work_item = self._pending_work_items[work_id] + + if work_item.future.cancelled(): + with work_item.future._condition: + work_item.future._condition.notify_all() + work_item.completion_tracker.add_cancelled() + continue + else: + with work_item.future._condition: + work_item.future._state = RUNNING + + self._call_queue.put(_CallItem(work_id, work_item.call), + block=True) + if self._call_queue.full(): + return + + def _result(self): + while True: + self._add_call_item_to_queue() + try: + result_item = self._result_queue.get(block=True, + timeout=0.1) + except queue.Empty: + if self._shutdown_thread and not self._pending_work_items: + self._shutdown_process_event.set() + return + else: + work_item = self._pending_work_items[result_item.work_id] + del self._pending_work_items[result_item.work_id] + + if result_item.exception: + set_future_exception(work_item.future, + work_item.completion_tracker, + result_item.exception) + else: + set_future_result(work_item.future, + work_item.completion_tracker, + result_item.result) + + def _adjust_process_count(self): + if self._queue_management_thread is None: + self._queue_management_thread = threading.Thread( + target=self._result) + self._queue_management_thread.daemon = True + self._queue_management_thread.start() + + for _ in range(len(self._processes), self._max_processes): + p = multiprocessing.Process( + target=_process_worker, + args=(self._call_queue, + self._result_queue, + self._shutdown_process_event)) + p.daemon = True + p.start() + self._processes.add(p) + + def run_to_futures(self, calls, timeout=None, return_when=ALL_COMPLETED): + with self._shutdown_lock: + if self._shutdown_thread: + raise RuntimeError('cannot run new futures after shutdown') + + futures = [] + event_sink = ThreadEventSink() + self._queue_count + for index, call in enumerate(calls): + f = Future(index) + self._pending_work_items[self._queue_count] = _WorkItem( + call, f, event_sink) + self._work_ids.put(self._queue_count) + futures.append(f) + self._queue_count += 1 + + self._adjust_process_count() + fl = FutureList(futures, event_sink) + fl.wait(timeout=timeout, return_when=return_when) + return fl + + def shutdown(self): + with self._shutdown_lock: + self._shutdown_thread = True diff --git a/python2/futures/thread.py b/python2/futures/thread.py new file mode 100644 index 0000000..9e3275f --- /dev/null +++ b/python2/futures/thread.py @@ -0,0 +1,89 @@ +#!/usr/bin/env python + +from futures._base import (PENDING, RUNNING, CANCELLED, + CANCELLED_AND_NOTIFIED, FINISHED, + ALL_COMPLETED, + LOGGER, + set_future_exception, set_future_result, + Executor, Future, FutureList, ThreadEventSink) +import queue +import threading + +class _WorkItem(object): + def __init__(self, call, future, completion_tracker): + self.call = call + self.future = future + self.completion_tracker = completion_tracker + + def run(self): + with self.future._condition: + if self.future._state == PENDING: + self.future._state = RUNNING + elif self.future._state == CANCELLED: + with self.completion_tracker._condition: + self.future._state = CANCELLED_AND_NOTIFIED + self.completion_tracker.add_cancelled() + return + else: + LOGGER.critical('Future %s in unexpected state: %d', + id(self.future), + self.future._state) + return + + try: + result = self.call() + except BaseException as e: + set_future_exception(self.future, self.completion_tracker, e) + else: + set_future_result(self.future, self.completion_tracker, result) + +class ThreadPoolExecutor(Executor): + def __init__(self, max_threads): + self._max_threads = max_threads + self._work_queue = queue.Queue() + self._threads = set() + self._shutdown = False + self._shutdown_lock = threading.Lock() + + def _worker(self): + try: + while True: + try: + work_item = self._work_queue.get(block=True, timeout=0.1) + except queue.Empty: + if self._shutdown: + return + else: + work_item.run() + except BaseException as e: + LOGGER.critical('Exception in worker', exc_info=True) + + def _adjust_thread_count(self): + for _ in range(len(self._threads), + min(self._max_threads, self._work_queue.qsize())): + t = threading.Thread(target=self._worker) + t.daemon = True + t.start() + self._threads.add(t) + + def run_to_futures(self, calls, timeout=None, return_when=ALL_COMPLETED): + with self._shutdown_lock: + if self._shutdown: + raise RuntimeError('cannot run new futures after shutdown') + + futures = [] + event_sink = ThreadEventSink() + for index, call in enumerate(calls): + f = Future(index) + w = _WorkItem(call, f, event_sink) + self._work_queue.put(w) + futures.append(f) + + self._adjust_thread_count() + fl = FutureList(futures, event_sink) + fl.wait(timeout=timeout, return_when=return_when) + return fl + + def shutdown(self): + with self._shutdown_lock: + self._shutdown = True diff --git a/python2/primes.py b/python2/primes.py new file mode 100644 index 0000000..7e83ea0 --- /dev/null +++ b/python2/primes.py @@ -0,0 +1,44 @@ +import futures +import math +import time + +PRIMES = [ + 112272535095293, + 112582705942171, + 112272535095293, + 115280095190773, + 115797848077099] + +def is_prime(n): + if n % 2 == 0: + return False + + sqrt_n = int(math.floor(math.sqrt(n))) + for i in range(3, sqrt_n + 1, 2): + if n % i == 0: + return False + return True + +def sequential(): + return list(map(is_prime, PRIMES)) + +def with_process_pool_executor(): + with futures.ProcessPoolExecutor(10) as executor: + return list(executor.map(is_prime, PRIMES)) + +def with_thread_pool_executor(): + with futures.ThreadPoolExecutor(10) as executor: + return list(executor.map(is_prime, PRIMES)) + +def main(): + for name, fn in [('sequential', sequential), + ('processes', with_process_pool_executor), + ('threads', with_thread_pool_executor)]: + print('%s: ' % name.ljust(12), end='') + start = time.time() + if fn() != [True] * len(PRIMES): + print('failed') + else: + print('%.2f seconds' % (time.time() - start)) + +main() \ No newline at end of file diff --git a/python2/test_futures.py b/python2/test_futures.py new file mode 100644 index 0000000..0572f81 --- /dev/null +++ b/python2/test_futures.py @@ -0,0 +1,798 @@ +import test.support +from test.support import verbose + +import unittest +import threading +import time +import multiprocessing + +import futures +import futures._base +from futures._base import ( + PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future) + +def create_future(state=PENDING, exception=None, result=None): + f = Future(0) + f._state = state + f._exception = exception + f._result = result + return f + +PENDING_FUTURE = create_future(state=PENDING) +RUNNING_FUTURE = create_future(state=RUNNING) +CANCELLED_FUTURE = create_future(state=CANCELLED) +CANCELLED_AND_NOTIFIED_FUTURE = create_future(state=CANCELLED_AND_NOTIFIED) +EXCEPTION_FUTURE = create_future(state=FINISHED, exception=IOError()) +SUCCESSFUL_FUTURE = create_future(state=FINISHED, result=42) + +class Call(object): + CALL_LOCKS = {} + def __init__(self, manual_finish=False, result=42): + called_event = multiprocessing.Event() + can_finish = multiprocessing.Event() + + self._result = result + self._called_event_id = id(called_event) + self._can_finish_event_id = id(can_finish) + + self.CALL_LOCKS[self._called_event_id] = called_event + self.CALL_LOCKS[self._can_finish_event_id] = can_finish + + if not manual_finish: + self._can_finish.set() + + @property + def _can_finish(self): + return self.CALL_LOCKS[self._can_finish_event_id] + + @property + def _called_event(self): + return self.CALL_LOCKS[self._called_event_id] + + def wait_on_called(self): + self._called_event.wait() + + def set_can(self): + self._can_finish.set() + + def called(self): + return self._called_event.is_set() + + def __call__(self): + if self._called_event.is_set(): print('called twice') + + self._called_event.set() + self._can_finish.wait() + return self._result + + def close(self): + del self.CALL_LOCKS[self._called_event_id] + del self.CALL_LOCKS[self._can_finish_event_id] + +class ExceptionCall(Call): + def __call__(self): + assert not self._called_event.is_set(), 'already called' + + self._called_event.set() + self._can_finish.wait() + raise ZeroDivisionError() + +class ExecutorShutdownTest(unittest.TestCase): + def test_run_after_shutdown(self): + call1 = Call() + try: + self.executor.shutdown() + self.assertRaises(RuntimeError, + self.executor.run_to_futures, + [call1]) + finally: + call1.close() + + def _start_some_futures(self): + call1 = Call(manual_finish=True) + call2 = Call(manual_finish=True) + call3 = Call(manual_finish=True) + + try: + self.executor.run_to_futures([call1, call2, call3], + return_when=futures.RETURN_IMMEDIATELY) + + call1.wait_on_called() + call2.wait_on_called() + call3.wait_on_called() + + call1.set_can() + call2.set_can() + call3.set_can() + finally: + call1.close() + call2.close() + call3.close() + +class ThreadPoolShutdownTest(ExecutorShutdownTest): + def setUp(self): + self.executor = futures.ThreadPoolExecutor(max_threads=5) + + def tearDown(self): + self.executor.shutdown() + + def test_threads_terminate(self): + self._start_some_futures() + self.assertEqual(len(self.executor._threads), 3) + self.executor.shutdown() + for t in self.executor._threads: + t.join() + + def test_context_manager_shutdown(self): + with futures.ThreadPoolExecutor(max_threads=5) as e: + executor = e + self.assertEqual(list(e.map(abs, range(-5, 5))), + [5, 4, 3, 2, 1, 0, 1, 2, 3, 4]) + + for t in executor._threads: + t.join() + +class ProcessPoolShutdownTest(ExecutorShutdownTest): + def setUp(self): + self.executor = futures.ProcessPoolExecutor(max_processes=5) + + def tearDown(self): + self.executor.shutdown() + + def test_processes_terminate(self): + self._start_some_futures() + self.assertEqual(len(self.executor._processes), 5) + self.executor.shutdown() + for p in self.executor._processes: + p.join() + + def test_context_manager_shutdown(self): + with futures.ProcessPoolExecutor(max_processes=5) as e: + executor = e + self.assertEqual(list(e.map(abs, range(-5, 5))), + [5, 4, 3, 2, 1, 0, 1, 2, 3, 4]) + + for p in self.executor._processes: + p.join() + +class WaitsTest(unittest.TestCase): + def test_concurrent_waits(self): + def wait_for_ALL_COMPLETED(): + fs.wait(return_when=futures.ALL_COMPLETED) + self.assertTrue(f1.done()) + self.assertTrue(f2.done()) + self.assertTrue(f3.done()) + self.assertTrue(f4.done()) + all_completed.release() + + def wait_for_FIRST_COMPLETED(): + fs.wait(return_when=futures.FIRST_COMPLETED) + self.assertTrue(f1.done()) + self.assertFalse(f2.done()) # XXX + self.assertFalse(f3.done()) + self.assertFalse(f4.done()) + first_completed.release() + + def wait_for_FIRST_EXCEPTION(): + fs.wait(return_when=futures.FIRST_EXCEPTION) + self.assertTrue(f1.done()) + self.assertTrue(f2.done()) + self.assertFalse(f3.done()) # XXX + self.assertFalse(f4.done()) + first_exception.release() + + all_completed = threading.Semaphore(0) + first_completed = threading.Semaphore(0) + first_exception = threading.Semaphore(0) + + call1 = Call(manual_finish=True) + call2 = ExceptionCall(manual_finish=True) + call3 = Call(manual_finish=True) + call4 = Call() + + try: + fs = self.executor.run_to_futures( + [call1, call2, call3, call4], + return_when=futures.RETURN_IMMEDIATELY) + f1, f2, f3, f4 = fs + + threads = [] + for wait_test in [wait_for_ALL_COMPLETED, + wait_for_FIRST_COMPLETED, + wait_for_FIRST_EXCEPTION]: + t = threading.Thread(target=wait_test) + t.start() + threads.append(t) + + time.sleep(1) # give threads enough time to execute wait + + call1.set_can() + first_completed.acquire() + call2.set_can() + first_exception.acquire() + call3.set_can() + all_completed.acquire() + + self.executor.shutdown() + finally: + call1.close() + call2.close() + call3.close() + call4.close() + +class ThreadPoolWaitTests(WaitsTest): + def setUp(self): + self.executor = futures.ThreadPoolExecutor(max_threads=1) + + def tearDown(self): + self.executor.shutdown() + +class ProcessPoolWaitTests(WaitsTest): + def setUp(self): + self.executor = futures.ProcessPoolExecutor(max_processes=1) + + def tearDown(self): + self.executor.shutdown() + +class CancelTests(unittest.TestCase): + def test_cancel_states(self): + call1 = Call(manual_finish=True) + call2 = Call() + call3 = Call() + call4 = Call() + + try: + fs = self.executor.run_to_futures( + [call1, call2, call3, call4], + return_when=futures.RETURN_IMMEDIATELY) + f1, f2, f3, f4 = fs + + call1.wait_on_called() + self.assertEqual(f1.cancel(), False) + self.assertEqual(f2.cancel(), True) + self.assertEqual(f4.cancel(), True) + self.assertEqual(f1.cancelled(), False) + self.assertEqual(f2.cancelled(), True) + self.assertEqual(f3.cancelled(), False) + self.assertEqual(f4.cancelled(), True) + self.assertEqual(f1.done(), False) + self.assertEqual(f2.done(), True) + self.assertEqual(f3.done(), False) + self.assertEqual(f4.done(), True) + + call1.set_can() + fs.wait(return_when=futures.ALL_COMPLETED) + self.assertEqual(f1.result(), 42) + self.assertRaises(futures.CancelledError, f2.result) + self.assertRaises(futures.CancelledError, f2.exception) + self.assertEqual(f3.result(), 42) + self.assertRaises(futures.CancelledError, f4.result) + self.assertRaises(futures.CancelledError, f4.exception) + + self.assertEqual(call2.called(), False) + self.assertEqual(call4.called(), False) + finally: + call1.close() + call2.close() + call3.close() + call4.close() + + def test_wait_for_individual_cancel(self): + def end_call(): + time.sleep(1) + f2.cancel() + call1.set_can() + + call1 = Call(manual_finish=True) + call2 = Call() + + try: + fs = self.executor.run_to_futures( + [call1, call2], + return_when=futures.RETURN_IMMEDIATELY) + f1, f2 = fs + + call1.wait_on_called() + t = threading.Thread(target=end_call) + t.start() + self.assertRaises(futures.CancelledError, f2.result) + self.assertRaises(futures.CancelledError, f2.exception) + t.join() + finally: + call1.close() + call2.close() + + def test_wait_with_already_cancelled_futures(self): + call1 = Call(manual_finish=True) + call2 = Call() + call3 = Call() + call4 = Call() + + try: + fs = self.executor.run_to_futures( + [call1, call2, call3, call4], + return_when=futures.RETURN_IMMEDIATELY) + f1, f2, f3, f4 = fs + + call1.wait_on_called() + self.assertTrue(f2.cancel()) + self.assertTrue(f3.cancel()) + call1.set_can() + time.sleep(0.1) + + fs.wait(return_when=futures.ALL_COMPLETED) + finally: + call1.close() + call2.close() + call3.close() + call4.close() + + def test_cancel_all(self): + call1 = Call(manual_finish=True) + call2 = Call() + call3 = Call() + call4 = Call() + + try: + fs = self.executor.run_to_futures( + [call1, call2, call3, call4], + return_when=futures.RETURN_IMMEDIATELY) + f1, f2, f3, f4 = fs + + call1.wait_on_called() + self.assertRaises(futures.TimeoutError, fs.cancel, timeout=0) + call1.set_can() + fs.cancel() + + self.assertFalse(f1.cancelled()) + self.assertTrue(f2.cancelled()) + self.assertTrue(f3.cancelled()) + self.assertTrue(f4.cancelled()) + finally: + call1.close() + call2.close() + call3.close() + call4.close() + +class ThreadPoolCancelTests(CancelTests): + def setUp(self): + self.executor = futures.ThreadPoolExecutor(max_threads=1) + + def tearDown(self): + self.executor.shutdown() + +class ProcessPoolCancelTests(WaitsTest): + def setUp(self): + self.executor = futures.ProcessPoolExecutor(max_processes=1) + + def tearDown(self): + self.executor.shutdown() + +class ExecutorTest(unittest.TestCase): + # Executor.shutdown() and context manager usage is tested by + # ExecutorShutdownTest. + def test_run_to_futures(self): + call1 = Call(result=1) + call2 = Call(result=2) + call3 = Call(manual_finish=True) + call4 = Call() + call5 = Call() + + try: + f1, f2, f3, f4, f5 = self.executor.run_to_futures( + [call1, call2, call3, call4, call5], + return_when=futures.RETURN_IMMEDIATELY) + + call3.wait_on_called() + + self.assertTrue(f1.done()) + self.assertFalse(f1.running()) + self.assertEqual(f1.index, 0) + + self.assertTrue(f2.done()) + self.assertFalse(f2.running()) + self.assertEqual(f2.index, 1) + + self.assertFalse(f3.done()) + self.assertTrue(f3.running()) + self.assertEqual(f3.index, 2) + + # ProcessPoolExecutor may mark some futures as running before they + # actually are. + self.assertFalse(f4.done()) + self.assertEqual(f4.index, 3) + + self.assertFalse(f5.done()) + self.assertEqual(f5.index, 4) + finally: + call1.close() + call2.close() + call3.close() + call4.close() + call5.close() + + def test_run_to_results(self): + call1 = Call(result=1) + call2 = Call(result=2) + call3 = Call(result=3) + try: + self.assertEqual( + list(self.executor.run_to_results([call1, call2, call3])), + [1, 2, 3]) + finally: + call1.close() + call2.close() + call3.close() + + def test_run_to_results_exception(self): + call1 = Call(result=1) + call2 = Call(result=2) + call3 = ExceptionCall() + try: + i = self.executor.run_to_results([call1, call2, call3]) + + self.assertEqual(i.__next__(), 1) + self.assertEqual(i.__next__(), 2) + self.assertRaises(ZeroDivisionError, i.__next__) + finally: + call1.close() + call2.close() + call3.close() + + def test_run_to_results_timeout(self): + call1 = Call(result=1) + call2 = Call(result=2) + call3 = Call(manual_finish=True) + + try: + i = self.executor.run_to_results([call1, call2, call3], timeout=1) + self.assertEqual(i.__next__(), 1) + self.assertEqual(i.__next__(), 2) + self.assertRaises(futures.TimeoutError, i.__next__) + call3.set_can() + finally: + call1.close() + call2.close() + call3.close() + + def test_map(self): + self.assertEqual( + list(self.executor.map(pow, range(10), range(10))), + list(map(pow, range(10), range(10)))) + + def test_map_exception(self): + i = self.executor.map(divmod, [1, 1, 1, 1], [2, 3, 0, 5]) + self.assertEqual(i.__next__(), (0, 1)) + self.assertEqual(i.__next__(), (0, 1)) + self.assertRaises(ZeroDivisionError, i.__next__) + +class ThreadPoolExecutorTest(ExecutorTest): + def setUp(self): + self.executor = futures.ThreadPoolExecutor(max_threads=1) + + def tearDown(self): + self.executor.shutdown() + +class ProcessPoolExecutorTest(ExecutorTest): + def setUp(self): + self.executor = futures.ProcessPoolExecutor(max_processes=1) + + def tearDown(self): + self.executor.shutdown() + +class FutureTests(unittest.TestCase): + # Future.index() is tested by ExecutorTest + # Future.cancel() is further tested by CancelTests. + + def test_repr(self): + self.assertEqual(repr(PENDING_FUTURE), '') + self.assertEqual(repr(RUNNING_FUTURE), '') + self.assertEqual(repr(CANCELLED_FUTURE), '') + self.assertEqual(repr(CANCELLED_AND_NOTIFIED_FUTURE), + '') + self.assertEqual(repr(EXCEPTION_FUTURE), + '') + self.assertEqual(repr(SUCCESSFUL_FUTURE), + '') + + create_future + + def test_cancel(self): + f1 = create_future(state=PENDING) + f2 = create_future(state=RUNNING) + f3 = create_future(state=CANCELLED) + f4 = create_future(state=CANCELLED_AND_NOTIFIED) + f5 = create_future(state=FINISHED, exception=IOError()) + f6 = create_future(state=FINISHED, result=5) + + self.assertTrue(f1.cancel()) + self.assertEquals(f1._state, CANCELLED) + + self.assertFalse(f2.cancel()) + self.assertEquals(f2._state, RUNNING) + + self.assertTrue(f3.cancel()) + self.assertEquals(f3._state, CANCELLED) + + self.assertTrue(f4.cancel()) + self.assertEquals(f4._state, CANCELLED_AND_NOTIFIED) + + self.assertFalse(f5.cancel()) + self.assertEquals(f5._state, FINISHED) + + self.assertFalse(f6.cancel()) + self.assertEquals(f6._state, FINISHED) + + def test_cancelled(self): + self.assertFalse(PENDING_FUTURE.cancelled()) + self.assertFalse(RUNNING_FUTURE.cancelled()) + self.assertTrue(CANCELLED_FUTURE.cancelled()) + self.assertTrue(CANCELLED_AND_NOTIFIED_FUTURE.cancelled()) + self.assertFalse(EXCEPTION_FUTURE.cancelled()) + self.assertFalse(SUCCESSFUL_FUTURE.cancelled()) + + def test_done(self): + self.assertFalse(PENDING_FUTURE.done()) + self.assertFalse(RUNNING_FUTURE.done()) + self.assertTrue(CANCELLED_FUTURE.done()) + self.assertTrue(CANCELLED_AND_NOTIFIED_FUTURE.done()) + self.assertTrue(EXCEPTION_FUTURE.done()) + self.assertTrue(SUCCESSFUL_FUTURE.done()) + + def test_running(self): + self.assertFalse(PENDING_FUTURE.running()) + self.assertTrue(RUNNING_FUTURE.running()) + self.assertFalse(CANCELLED_FUTURE.running()) + self.assertFalse(CANCELLED_AND_NOTIFIED_FUTURE.running()) + self.assertFalse(EXCEPTION_FUTURE.running()) + self.assertFalse(SUCCESSFUL_FUTURE.running()) + + def test_result_with_timeout(self): + self.assertRaises(futures.TimeoutError, + PENDING_FUTURE.result, timeout=0) + self.assertRaises(futures.TimeoutError, + RUNNING_FUTURE.result, timeout=0) + self.assertRaises(futures.CancelledError, + CANCELLED_FUTURE.result, timeout=0) + self.assertRaises(futures.CancelledError, + CANCELLED_AND_NOTIFIED_FUTURE.result, timeout=0) + self.assertRaises(IOError, EXCEPTION_FUTURE.result, timeout=0) + self.assertEqual(SUCCESSFUL_FUTURE.result(timeout=0), 42) + + def test_result_with_success(self): + def notification(): + time.sleep(0.1) + with f1._condition: + f1._state = FINISHED + f1._result = 42 + f1._condition.notify_all() + + f1 = create_future(state=PENDING) + t = threading.Thread(target=notification) + t.start() + + self.assertEquals(f1.result(timeout=1), 42) + + def test_result_with_cancel(self): + def notification(): + time.sleep(0.1) + with f1._condition: + f1._state = CANCELLED + f1._condition.notify_all() + + f1 = create_future(state=PENDING) + t = threading.Thread(target=notification) + t.start() + + self.assertRaises(futures.CancelledError, f1.result, timeout=1) + + def test_exception_with_timeout(self): + self.assertRaises(futures.TimeoutError, + PENDING_FUTURE.exception, timeout=0) + self.assertRaises(futures.TimeoutError, + RUNNING_FUTURE.exception, timeout=0) + self.assertRaises(futures.CancelledError, + CANCELLED_FUTURE.exception, timeout=0) + self.assertRaises(futures.CancelledError, + CANCELLED_AND_NOTIFIED_FUTURE.exception, timeout=0) + self.assertTrue(isinstance(EXCEPTION_FUTURE.exception(timeout=0), + IOError)) + self.assertEqual(SUCCESSFUL_FUTURE.exception(timeout=0), None) + + def test_exception_with_success(self): + def notification(): + time.sleep(0.1) + with f1._condition: + f1._state = FINISHED + f1._exception = IOError() + f1._condition.notify_all() + + f1 = create_future(state=PENDING) + t = threading.Thread(target=notification) + t.start() + + self.assertTrue(isinstance(f1.exception(timeout=1), IOError)) + +class FutureListTests(unittest.TestCase): + # FutureList.wait() is further tested by WaitsTest. + # FutureList.cancel() is tested by CancelTests. + def test_wait_RETURN_IMMEDIATELY(self): + f = futures.FutureList(futures=None, event_sink=None) + f.wait(return_when=futures.RETURN_IMMEDIATELY) + + def test_wait_timeout(self): + f = futures.FutureList([PENDING_FUTURE], + futures._base.ThreadEventSink()) + + for t in [futures.FIRST_COMPLETED, + futures.FIRST_EXCEPTION, + futures.ALL_COMPLETED]: + f.wait(timeout=0.1, return_when=t) + self.assertFalse(PENDING_FUTURE.done()) + + def test_wait_all_done(self): + f = futures.FutureList([CANCELLED_FUTURE, + CANCELLED_AND_NOTIFIED_FUTURE, + SUCCESSFUL_FUTURE, + EXCEPTION_FUTURE], + futures._base.ThreadEventSink()) + + f.wait(return_when=futures.ALL_COMPLETED) + + def test_filters(self): + fs = [PENDING_FUTURE, + RUNNING_FUTURE, + CANCELLED_FUTURE, + CANCELLED_AND_NOTIFIED_FUTURE, + EXCEPTION_FUTURE, + SUCCESSFUL_FUTURE] + f = futures.FutureList(fs, None) + + self.assertEqual(list(f.running_futures()), [RUNNING_FUTURE]) + self.assertEqual(list(f.cancelled_futures()), + [CANCELLED_FUTURE, + CANCELLED_AND_NOTIFIED_FUTURE]) + self.assertEqual(list(f.done_futures()), + [CANCELLED_FUTURE, + CANCELLED_AND_NOTIFIED_FUTURE, + EXCEPTION_FUTURE, + SUCCESSFUL_FUTURE]) + self.assertEqual(list(f.successful_futures()), + [SUCCESSFUL_FUTURE]) + self.assertEqual(list(f.exception_futures()), + [EXCEPTION_FUTURE]) + + def test_has_running_futures(self): + self.assertFalse( + futures.FutureList([PENDING_FUTURE, + CANCELLED_FUTURE, + CANCELLED_AND_NOTIFIED_FUTURE, + SUCCESSFUL_FUTURE, + EXCEPTION_FUTURE], + None).has_running_futures()) + self.assertTrue( + futures.FutureList([RUNNING_FUTURE], + None).has_running_futures()) + + def test_has_cancelled_futures(self): + self.assertFalse( + futures.FutureList([PENDING_FUTURE, + RUNNING_FUTURE, + SUCCESSFUL_FUTURE, + EXCEPTION_FUTURE], + None).has_cancelled_futures()) + self.assertTrue( + futures.FutureList([CANCELLED_FUTURE], + None).has_cancelled_futures()) + + self.assertTrue( + futures.FutureList([CANCELLED_AND_NOTIFIED_FUTURE], + None).has_cancelled_futures()) + + def test_has_done_futures(self): + self.assertFalse( + futures.FutureList([PENDING_FUTURE, + RUNNING_FUTURE], + None).has_done_futures()) + self.assertTrue( + futures.FutureList([CANCELLED_FUTURE], + None).has_done_futures()) + + self.assertTrue( + futures.FutureList([CANCELLED_AND_NOTIFIED_FUTURE], + None).has_done_futures()) + + self.assertTrue( + futures.FutureList([EXCEPTION_FUTURE], + None).has_done_futures()) + + self.assertTrue( + futures.FutureList([SUCCESSFUL_FUTURE], + None).has_done_futures()) + + def test_has_successful_futures(self): + self.assertFalse( + futures.FutureList([PENDING_FUTURE, + RUNNING_FUTURE, + CANCELLED_FUTURE, + CANCELLED_AND_NOTIFIED_FUTURE, + EXCEPTION_FUTURE], + None).has_successful_futures()) + + self.assertTrue( + futures.FutureList([SUCCESSFUL_FUTURE], + None).has_successful_futures()) + + def test_has_exception_futures(self): + self.assertFalse( + futures.FutureList([PENDING_FUTURE, + RUNNING_FUTURE, + CANCELLED_FUTURE, + CANCELLED_AND_NOTIFIED_FUTURE, + SUCCESSFUL_FUTURE], + None).has_exception_futures()) + + self.assertTrue( + futures.FutureList([EXCEPTION_FUTURE], + None).has_exception_futures()) + + def test_get_item(self): + fs = [PENDING_FUTURE, RUNNING_FUTURE, CANCELLED_FUTURE] + f = futures.FutureList(fs, None) + self.assertEqual(f[0], PENDING_FUTURE) + self.assertEqual(f[1], RUNNING_FUTURE) + self.assertEqual(f[2], CANCELLED_FUTURE) + self.assertRaises(IndexError, f.__getitem__, 3) + + def test_len(self): + f = futures.FutureList([PENDING_FUTURE, + RUNNING_FUTURE, + CANCELLED_FUTURE], + None) + self.assertEqual(len(f), 3) + + def test_iter(self): + fs = [PENDING_FUTURE, RUNNING_FUTURE, CANCELLED_FUTURE] + f = futures.FutureList(fs, None) + self.assertEqual(list(iter(f)), fs) + + def test_contains(self): + f = futures.FutureList([PENDING_FUTURE, + RUNNING_FUTURE], + None) + self.assertTrue(PENDING_FUTURE in f) + self.assertTrue(RUNNING_FUTURE in f) + self.assertFalse(CANCELLED_FUTURE in f) + + def test_repr(self): + pending = create_future(state=PENDING) + cancelled = create_future(state=CANCELLED) + cancelled2 = create_future(state=CANCELLED_AND_NOTIFIED) + running = create_future(state=RUNNING) + finished = create_future(state=FINISHED) + + f = futures.FutureList( + [PENDING_FUTURE] * 4 + [CANCELLED_FUTURE] * 2 + + [CANCELLED_AND_NOTIFIED_FUTURE] + + [RUNNING_FUTURE] * 2 + + [SUCCESSFUL_FUTURE, EXCEPTION_FUTURE] * 3, + None) + + self.assertEqual(repr(f), + '') + +def test_main(): + test.support.run_unittest(ProcessPoolCancelTests, + ThreadPoolCancelTests, + ProcessPoolExecutorTest, + ThreadPoolExecutorTest, + ProcessPoolWaitTests, + ThreadPoolWaitTests, + FutureTests, + FutureListTests, + ProcessPoolShutdownTest, + ThreadPoolShutdownTest) + +if __name__ == "__main__": + test_main() \ No newline at end of file diff --git a/python3/crawl.py b/python3/crawl.py new file mode 100644 index 0000000..10e35c3 --- /dev/null +++ b/python3/crawl.py @@ -0,0 +1,57 @@ +import datetime +import functools +import futures.thread +import time +import timeit +import urllib.request + +URLS = ['http://www.google.com/', + 'http://www.apple.com/', + 'http://www.ibm.com', + 'http://www.thisurlprobablydoesnotexist.com', + 'http://www.slashdot.org/', + 'http://www.python.org/', + 'http://www.sweetapp.com/'] * 5 + +def load_url(url, timeout): + return urllib.request.urlopen(url, timeout=timeout).read() + +def download_urls_sequential(urls, timeout=60): + url_to_content = {} + for url in urls: + try: + url_to_content[url] = load_url(url, timeout=timeout) + except: + pass + return url_to_content + +def download_urls_with_executor(urls, executor, timeout=60): + try: + url_to_content = {} + fs = executor.run_to_futures( + (functools.partial(load_url, url, timeout) for url in urls), + timeout=timeout) + for future in fs.successful_futures(): + url = urls[future.index] + url_to_content[url] = future.result() + return url_to_content + finally: + executor.shutdown() + +def main(): + for name, fn in [('sequential', + functools.partial(download_urls_sequential, URLS)), + ('processes', + functools.partial(download_urls_with_executor, + URLS, + futures.ProcessPoolExecutor(10))), + ('threads', + functools.partial(download_urls_with_executor, + URLS, + futures.ThreadPoolExecutor(10)))]: + print('%s: ' % name.ljust(12), end='') + start = time.time() + fn() + print('%.2f seconds' % (time.time() - start)) + +main() diff --git a/python3/futures/__init__.py b/python3/futures/__init__.py new file mode 100644 index 0000000..5f599ad --- /dev/null +++ b/python3/futures/__init__.py @@ -0,0 +1,6 @@ +from futures._base import (FIRST_COMPLETED, FIRST_EXCEPTION, + ALL_COMPLETED, RETURN_IMMEDIATELY, + CancelledError, TimeoutError, + Future, FutureList) +from futures.thread import ThreadPoolExecutor +from futures.process import ProcessPoolExecutor diff --git a/python3/futures/_base.py b/python3/futures/_base.py new file mode 100644 index 0000000..19cabe8 --- /dev/null +++ b/python3/futures/_base.py @@ -0,0 +1,482 @@ +import functools +import logging +import threading +import time + +FIRST_COMPLETED = 0 +FIRST_EXCEPTION = 1 +ALL_COMPLETED = 2 +RETURN_IMMEDIATELY = 3 + +# Possible future states +PENDING = 0 +RUNNING = 1 +CANCELLED = 2 # The future was cancelled... +CANCELLED_AND_NOTIFIED = 3 # ...and .add_cancelled() was called. +FINISHED = 4 + +FUTURE_STATES = [ + PENDING, + RUNNING, + CANCELLED, + CANCELLED_AND_NOTIFIED, + FINISHED +] + +_STATE_TO_DESCRIPTION_MAP = { + PENDING: "pending", + RUNNING: "running", + CANCELLED: "cancelled", + CANCELLED_AND_NOTIFIED: "cancelled", + FINISHED: "finished" +} + +LOGGER = logging.getLogger("futures") +_handler = logging.StreamHandler() +LOGGER.addHandler(_handler) +del _handler + +def set_future_exception(future, event_sink, exception): + with future._condition: + future._exception = exception + with event_sink._condition: + future._state = FINISHED + event_sink.add_exception() + future._condition.notify_all() + +def set_future_result(future, event_sink, result): + with future._condition: + future._result = result + with event_sink._condition: + future._state = FINISHED + event_sink.add_result() + future._condition.notify_all() + +class Error(Exception): + pass + +class CancelledError(Error): + pass + +class TimeoutError(Error): + pass + +class Future(object): + """Represents the result of an asynchronous computation.""" + + def __init__(self, index): + """Initializes the future. Should not be called by clients.""" + self._condition = threading.Condition() + self._state = PENDING + self._result = None + self._exception = None + self._index = index + + def __repr__(self): + with self._condition: + if self._state == FINISHED: + if self._exception: + return '' % ( + _STATE_TO_DESCRIPTION_MAP[self._state], + self._exception.__class__.__name__) + else: + return '' % ( + _STATE_TO_DESCRIPTION_MAP[self._state], + self._result.__class__.__name__) + return '' % _STATE_TO_DESCRIPTION_MAP[self._state] + + @property + def index(self): + """The index of the future in its FutureList.""" + return self._index + + def cancel(self): + """Cancel the future if possible. + + Returns True if the future was cancelled, False otherwise. A future + cannot be cancelled if it is running or has already completed. + """ + with self._condition: + if self._state in [RUNNING, FINISHED]: + return False + + if self._state not in [CANCELLED, CANCELLED_AND_NOTIFIED]: + self._state = CANCELLED + self._condition.notify_all() + return True + + def cancelled(self): + """Return True if the future has cancelled.""" + with self._condition: + return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED] + + def running(self): + with self._condition: + return self._state == RUNNING + + def done(self): + """Return True of the future was cancelled or finished executing.""" + with self._condition: + return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED] + + def __get_result(self): + if self._exception: + raise self._exception + else: + return self._result + + def result(self, timeout=None): + """Return the result of the call that the future represents. + + Args: + timeout: The number of seconds to wait for the result if the future + isn't done. If None, then there is no limit on the wait time. + + Returns: + The result of the call that the future represents. + + Raises: + CancelledError: If the future was cancelled. + TimeoutError: If the future didn't finish executing before the given + timeout. + Exception: If the call raised then that exception will be raised. + """ + with self._condition: + if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: + raise CancelledError() + elif self._state == FINISHED: + return self.__get_result() + + self._condition.wait(timeout) + + if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: + raise CancelledError() + elif self._state == FINISHED: + return self.__get_result() + else: + raise TimeoutError() + + def exception(self, timeout=None): + """Return the exception raised by the call that the future represents. + + Args: + timeout: The number of seconds to wait for the exception if the + future isn't done. If None, then there is no limit on the wait + time. + + Returns: + The exception raised by the call that the future represents or None + if the call completed without raising. + + Raises: + CancelledError: If the future was cancelled. + TimeoutError: If the future didn't finish executing before the given + timeout. + """ + + with self._condition: + if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: + raise CancelledError() + elif self._state == FINISHED: + return self._exception + + self._condition.wait(timeout) + + if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: + raise CancelledError() + elif self._state == FINISHED: + return self._exception + else: + raise TimeoutError() + +class _FirstCompletedWaitTracker(object): + def __init__(self): + self.event = threading.Event() + + def add_result(self): + self.event.set() + + def add_exception(self): + self.event.set() + + def add_cancelled(self): + self.event.set() + +class _AllCompletedWaitTracker(object): + def __init__(self, pending_calls, stop_on_exception): + self.pending_calls = pending_calls + self.stop_on_exception = stop_on_exception + self.event = threading.Event() + + def add_result(self): + self.pending_calls -= 1 + if not self.pending_calls: + self.event.set() + + def add_exception(self): + if self.stop_on_exception: + self.event.set() + else: + self.add_result() + + def add_cancelled(self): + self.add_result() + +class ThreadEventSink(object): + def __init__(self): + self._condition = threading.Lock() + self._waiters = [] + + def add(self, e): + self._waiters.append(e) + + def remove(self, e): + self._waiters.remove(e) + + def add_result(self): + for waiter in self._waiters: + waiter.add_result() + + def add_exception(self): + for waiter in self._waiters: + waiter.add_exception() + + def add_cancelled(self): + for waiter in self._waiters: + waiter.add_cancelled() + +class FutureList(object): + def __init__(self, futures, event_sink): + """Initializes the FutureList. Should not be called by clients.""" + self._futures = futures + self._event_sink = event_sink + + def wait(self, timeout=None, return_when=ALL_COMPLETED): + """Wait for the futures in the list to complete. + + Args: + timeout: The maximum number of seconds to wait. If None, then there + is no limit on the wait time. + return_when: Indicates when the method should return. The options + are: + + FIRST_COMPLETED - Return when any future finishes or is + cancelled. + FIRST_EXCEPTION - Return when any future finishes by raising an + exception. If no future raises and exception + then it is equivalent to ALL_COMPLETED. + ALL_COMPLETED - Return when all futures finish or are cancelled. + RETURN_IMMEDIATELY - Return without waiting (this is not likely + to be a useful option but it is there to + be symmetrical with the + executor.run_to_futures() method. + + Raises: + TimeoutError: If the wait condition wasn't satisfied before the + given timeout. + """ + if return_when == RETURN_IMMEDIATELY: + return + + with self._event_sink._condition: + # Make a quick exit if every future is already done. This check is + # necessary because, if every future is in the + # CANCELLED_AND_NOTIFIED or FINISHED state then the WaitTracker will + # never receive any + if all(f._state in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED] + for f in self): + return + + if return_when == FIRST_COMPLETED: + completed_tracker = _FirstCompletedWaitTracker() + else: + # Calculate how many events are expected before every future + # is complete. This can be done without holding the futures' + # locks because a future cannot transition itself into either + # of the states being looked for. + pending_count = sum( + f._state not in [CANCELLED_AND_NOTIFIED, FINISHED] + for f in self) + + if return_when == FIRST_EXCEPTION: + completed_tracker = _AllCompletedWaitTracker( + pending_count, stop_on_exception=True) + elif return_when == ALL_COMPLETED: + completed_tracker = _AllCompletedWaitTracker( + pending_count, stop_on_exception=False) + + self._event_sink.add(completed_tracker) + + try: + completed_tracker.event.wait(timeout) + finally: + self._event_sink.remove(completed_tracker) + + def cancel(self, timeout=None): + """Cancel the futures in the list. + + Args: + timeout: The maximum number of seconds to wait. If None, then there + is no limit on the wait time. + + Raises: + TimeoutError: If all the futures were not finished before the + given timeout. + """ + for f in self: + f.cancel() + self.wait(timeout=timeout, return_when=ALL_COMPLETED) + if any(not f.done() for f in self): + raise TimeoutError() + + def has_running_futures(self): + return any(self.running_futures()) + + def has_cancelled_futures(self): + return any(self.cancelled_futures()) + + def has_done_futures(self): + return any(self.done_futures()) + + def has_successful_futures(self): + return any(self.successful_futures()) + + def has_exception_futures(self): + return any(self.exception_futures()) + + def cancelled_futures(self): + return (f for f in self + if f._state in [CANCELLED, CANCELLED_AND_NOTIFIED]) + + def done_futures(self): + return (f for f in self + if f._state in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]) + + def successful_futures(self): + return (f for f in self + if f._state == FINISHED and f._exception is None) + + def exception_futures(self): + return (f for f in self + if f._state == FINISHED and f._exception is not None) + + def running_futures(self): + return (f for f in self if f._state == RUNNING) + + def __len__(self): + return len(self._futures) + + def __getitem__(self, i): + return self._futures[i] + + def __iter__(self): + return iter(self._futures) + + def __contains__(self, future): + return future in self._futures + + def __repr__(self): + states = {state: 0 for state in FUTURE_STATES} + for f in self: + states[f._state] += 1 + + return ('' % ( + len(self), + states[PENDING], + states[CANCELLED] + states[CANCELLED_AND_NOTIFIED], + states[RUNNING], + states[FINISHED])) + +class Executor(object): + def run_to_futures(self, calls, timeout=None, return_when=ALL_COMPLETED): + """Return a list of futures representing the given calls. + + Args: + calls: A sequence of callables that take no arguments. These will + be bound to Futures and returned. + timeout: The maximum number of seconds to wait. If None, then there + is no limit on the wait time. + return_when: Indicates when the method should return. The options + are: + + FIRST_COMPLETED - Return when any future finishes or is + cancelled. + FIRST_EXCEPTION - Return when any future finishes by raising an + exception. If no future raises and exception + then it is equivalent to ALL_COMPLETED. + ALL_COMPLETED - Return when all futures finish or are cancelled. + RETURN_IMMEDIATELY - Return without waiting. + + Returns: + A FuturesList containing futures for the given calls. + """ + raise NotImplementedError() + + def run_to_results(self, calls, timeout=None): + """Returns a iterator of the results of the given calls. + + Args: + calls: A sequence of callables that take no arguments. These will + be called and their results returned. + timeout: The maximum number of seconds to wait. If None, then there + is no limit on the wait time. + + Returns: + An iterator over the results of the given calls. Equivalent to: + (call() for call in calls) but the calls may be evaluated + out-of-order. + + Raises: + TimeoutError: If all the given calls were not completed before the + given timeout. + Exception: If any call() raises. + """ + if timeout is not None: + end_time = timeout + time.time() + + fs = self.run_to_futures(calls, return_when=RETURN_IMMEDIATELY) + + try: + for future in fs: + if timeout is None: + yield future.result() + else: + yield future.result(end_time - time.time()) + finally: + try: + fs.cancel(timeout=0) + except TimeoutError: + pass + + def map(self, func, *iterables, timeout=None): + """Returns a iterator equivalent to map(fn, iter). + + Args: + func: A callable that will take take as many arguments as there + are passed iterables. + timeout: The maximum number of seconds to wait. If None, then there + is no limit on the wait time. + + Returns: + An iterator equivalent to: map(func, *iterables) but the calls may + be evaluated out-of-order. + + Raises: + TimeoutError: If the entire result iterator could not be generated + before the given timeout. + Exception: If fn(*args) raises for any values. + """ + calls = [functools.partial(func, *args) for args in zip(*iterables)] + return self.run_to_results(calls, timeout) + + def shutdown(self): + """Clean-up. No other methods can be called afterwards.""" + raise NotImplementedError() + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.shutdown() + return False diff --git a/python3/futures/process.py b/python3/futures/process.py new file mode 100644 index 0000000..ad6dc6d --- /dev/null +++ b/python3/futures/process.py @@ -0,0 +1,155 @@ +#!/usr/bin/env python + +from futures._base import (PENDING, RUNNING, CANCELLED, + CANCELLED_AND_NOTIFIED, FINISHED, + ALL_COMPLETED, + set_future_exception, set_future_result, + Executor, Future, FutureList, ThreadEventSink) + +import queue +import multiprocessing +import threading + +class _WorkItem(object): + def __init__(self, call, future, completion_tracker): + self.call = call + self.future = future + self.completion_tracker = completion_tracker + +class _ResultItem(object): + def __init__(self, work_id, exception=None, result=None): + self.work_id = work_id + self.exception = exception + self.result = result + +class _CallItem(object): + def __init__(self, work_id, call): + self.work_id = work_id + self.call = call + +def _process_worker(call_queue, result_queue, shutdown): + while True: + try: + call_item = call_queue.get(block=True, timeout=0.1) + except queue.Empty: + if shutdown.is_set(): + return + else: + try: + r = call_item.call() + except BaseException as e: + result_queue.put(_ResultItem(call_item.work_id, + exception=e)) + else: + result_queue.put(_ResultItem(call_item.work_id, + result=r)) + +class ProcessPoolExecutor(Executor): + def __init__(self, max_processes=None): + if max_processes is None: + max_processes = multiprocessing.cpu_count() + + self._max_processes = max_processes + # Make the call queue slightly larger than the number of processes to + # prevent the worker processes from starving but to make future.cancel() + # responsive. + self._call_queue = multiprocessing.Queue(self._max_processes + 1) + self._result_queue = multiprocessing.Queue() + self._work_ids = queue.Queue() + self._queue_management_thread = None + self._processes = set() + + # Shutdown is a two-step process. + self._shutdown_thread = False + self._shutdown_process_event = multiprocessing.Event() + self._shutdown_lock = threading.Lock() + self._queue_count = 0 + self._pending_work_items = {} + + def _add_call_item_to_queue(self): + while True: + try: + work_id = self._work_ids.get(block=False) + except queue.Empty: + return + else: + work_item = self._pending_work_items[work_id] + + if work_item.future.cancelled(): + with work_item.future._condition: + work_item.future._condition.notify_all() + work_item.completion_tracker.add_cancelled() + continue + else: + with work_item.future._condition: + work_item.future._state = RUNNING + + self._call_queue.put(_CallItem(work_id, work_item.call), + block=True) + if self._call_queue.full(): + return + + def _result(self): + while True: + self._add_call_item_to_queue() + try: + result_item = self._result_queue.get(block=True, + timeout=0.1) + except queue.Empty: + if self._shutdown_thread and not self._pending_work_items: + self._shutdown_process_event.set() + return + else: + work_item = self._pending_work_items[result_item.work_id] + del self._pending_work_items[result_item.work_id] + + if result_item.exception: + set_future_exception(work_item.future, + work_item.completion_tracker, + result_item.exception) + else: + set_future_result(work_item.future, + work_item.completion_tracker, + result_item.result) + + def _adjust_process_count(self): + if self._queue_management_thread is None: + self._queue_management_thread = threading.Thread( + target=self._result) + self._queue_management_thread.daemon = True + self._queue_management_thread.start() + + for _ in range(len(self._processes), self._max_processes): + p = multiprocessing.Process( + target=_process_worker, + args=(self._call_queue, + self._result_queue, + self._shutdown_process_event)) + p.daemon = True + p.start() + self._processes.add(p) + + def run_to_futures(self, calls, timeout=None, return_when=ALL_COMPLETED): + with self._shutdown_lock: + if self._shutdown_thread: + raise RuntimeError('cannot run new futures after shutdown') + + futures = [] + event_sink = ThreadEventSink() + self._queue_count + for index, call in enumerate(calls): + f = Future(index) + self._pending_work_items[self._queue_count] = _WorkItem( + call, f, event_sink) + self._work_ids.put(self._queue_count) + futures.append(f) + self._queue_count += 1 + + self._adjust_process_count() + fl = FutureList(futures, event_sink) + fl.wait(timeout=timeout, return_when=return_when) + return fl + + def shutdown(self): + with self._shutdown_lock: + self._shutdown_thread = True diff --git a/python3/futures/thread.py b/python3/futures/thread.py new file mode 100644 index 0000000..9e3275f --- /dev/null +++ b/python3/futures/thread.py @@ -0,0 +1,89 @@ +#!/usr/bin/env python + +from futures._base import (PENDING, RUNNING, CANCELLED, + CANCELLED_AND_NOTIFIED, FINISHED, + ALL_COMPLETED, + LOGGER, + set_future_exception, set_future_result, + Executor, Future, FutureList, ThreadEventSink) +import queue +import threading + +class _WorkItem(object): + def __init__(self, call, future, completion_tracker): + self.call = call + self.future = future + self.completion_tracker = completion_tracker + + def run(self): + with self.future._condition: + if self.future._state == PENDING: + self.future._state = RUNNING + elif self.future._state == CANCELLED: + with self.completion_tracker._condition: + self.future._state = CANCELLED_AND_NOTIFIED + self.completion_tracker.add_cancelled() + return + else: + LOGGER.critical('Future %s in unexpected state: %d', + id(self.future), + self.future._state) + return + + try: + result = self.call() + except BaseException as e: + set_future_exception(self.future, self.completion_tracker, e) + else: + set_future_result(self.future, self.completion_tracker, result) + +class ThreadPoolExecutor(Executor): + def __init__(self, max_threads): + self._max_threads = max_threads + self._work_queue = queue.Queue() + self._threads = set() + self._shutdown = False + self._shutdown_lock = threading.Lock() + + def _worker(self): + try: + while True: + try: + work_item = self._work_queue.get(block=True, timeout=0.1) + except queue.Empty: + if self._shutdown: + return + else: + work_item.run() + except BaseException as e: + LOGGER.critical('Exception in worker', exc_info=True) + + def _adjust_thread_count(self): + for _ in range(len(self._threads), + min(self._max_threads, self._work_queue.qsize())): + t = threading.Thread(target=self._worker) + t.daemon = True + t.start() + self._threads.add(t) + + def run_to_futures(self, calls, timeout=None, return_when=ALL_COMPLETED): + with self._shutdown_lock: + if self._shutdown: + raise RuntimeError('cannot run new futures after shutdown') + + futures = [] + event_sink = ThreadEventSink() + for index, call in enumerate(calls): + f = Future(index) + w = _WorkItem(call, f, event_sink) + self._work_queue.put(w) + futures.append(f) + + self._adjust_thread_count() + fl = FutureList(futures, event_sink) + fl.wait(timeout=timeout, return_when=return_when) + return fl + + def shutdown(self): + with self._shutdown_lock: + self._shutdown = True diff --git a/python3/primes.py b/python3/primes.py new file mode 100644 index 0000000..7e83ea0 --- /dev/null +++ b/python3/primes.py @@ -0,0 +1,44 @@ +import futures +import math +import time + +PRIMES = [ + 112272535095293, + 112582705942171, + 112272535095293, + 115280095190773, + 115797848077099] + +def is_prime(n): + if n % 2 == 0: + return False + + sqrt_n = int(math.floor(math.sqrt(n))) + for i in range(3, sqrt_n + 1, 2): + if n % i == 0: + return False + return True + +def sequential(): + return list(map(is_prime, PRIMES)) + +def with_process_pool_executor(): + with futures.ProcessPoolExecutor(10) as executor: + return list(executor.map(is_prime, PRIMES)) + +def with_thread_pool_executor(): + with futures.ThreadPoolExecutor(10) as executor: + return list(executor.map(is_prime, PRIMES)) + +def main(): + for name, fn in [('sequential', sequential), + ('processes', with_process_pool_executor), + ('threads', with_thread_pool_executor)]: + print('%s: ' % name.ljust(12), end='') + start = time.time() + if fn() != [True] * len(PRIMES): + print('failed') + else: + print('%.2f seconds' % (time.time() - start)) + +main() \ No newline at end of file diff --git a/python3/test_futures.py b/python3/test_futures.py new file mode 100644 index 0000000..0572f81 --- /dev/null +++ b/python3/test_futures.py @@ -0,0 +1,798 @@ +import test.support +from test.support import verbose + +import unittest +import threading +import time +import multiprocessing + +import futures +import futures._base +from futures._base import ( + PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future) + +def create_future(state=PENDING, exception=None, result=None): + f = Future(0) + f._state = state + f._exception = exception + f._result = result + return f + +PENDING_FUTURE = create_future(state=PENDING) +RUNNING_FUTURE = create_future(state=RUNNING) +CANCELLED_FUTURE = create_future(state=CANCELLED) +CANCELLED_AND_NOTIFIED_FUTURE = create_future(state=CANCELLED_AND_NOTIFIED) +EXCEPTION_FUTURE = create_future(state=FINISHED, exception=IOError()) +SUCCESSFUL_FUTURE = create_future(state=FINISHED, result=42) + +class Call(object): + CALL_LOCKS = {} + def __init__(self, manual_finish=False, result=42): + called_event = multiprocessing.Event() + can_finish = multiprocessing.Event() + + self._result = result + self._called_event_id = id(called_event) + self._can_finish_event_id = id(can_finish) + + self.CALL_LOCKS[self._called_event_id] = called_event + self.CALL_LOCKS[self._can_finish_event_id] = can_finish + + if not manual_finish: + self._can_finish.set() + + @property + def _can_finish(self): + return self.CALL_LOCKS[self._can_finish_event_id] + + @property + def _called_event(self): + return self.CALL_LOCKS[self._called_event_id] + + def wait_on_called(self): + self._called_event.wait() + + def set_can(self): + self._can_finish.set() + + def called(self): + return self._called_event.is_set() + + def __call__(self): + if self._called_event.is_set(): print('called twice') + + self._called_event.set() + self._can_finish.wait() + return self._result + + def close(self): + del self.CALL_LOCKS[self._called_event_id] + del self.CALL_LOCKS[self._can_finish_event_id] + +class ExceptionCall(Call): + def __call__(self): + assert not self._called_event.is_set(), 'already called' + + self._called_event.set() + self._can_finish.wait() + raise ZeroDivisionError() + +class ExecutorShutdownTest(unittest.TestCase): + def test_run_after_shutdown(self): + call1 = Call() + try: + self.executor.shutdown() + self.assertRaises(RuntimeError, + self.executor.run_to_futures, + [call1]) + finally: + call1.close() + + def _start_some_futures(self): + call1 = Call(manual_finish=True) + call2 = Call(manual_finish=True) + call3 = Call(manual_finish=True) + + try: + self.executor.run_to_futures([call1, call2, call3], + return_when=futures.RETURN_IMMEDIATELY) + + call1.wait_on_called() + call2.wait_on_called() + call3.wait_on_called() + + call1.set_can() + call2.set_can() + call3.set_can() + finally: + call1.close() + call2.close() + call3.close() + +class ThreadPoolShutdownTest(ExecutorShutdownTest): + def setUp(self): + self.executor = futures.ThreadPoolExecutor(max_threads=5) + + def tearDown(self): + self.executor.shutdown() + + def test_threads_terminate(self): + self._start_some_futures() + self.assertEqual(len(self.executor._threads), 3) + self.executor.shutdown() + for t in self.executor._threads: + t.join() + + def test_context_manager_shutdown(self): + with futures.ThreadPoolExecutor(max_threads=5) as e: + executor = e + self.assertEqual(list(e.map(abs, range(-5, 5))), + [5, 4, 3, 2, 1, 0, 1, 2, 3, 4]) + + for t in executor._threads: + t.join() + +class ProcessPoolShutdownTest(ExecutorShutdownTest): + def setUp(self): + self.executor = futures.ProcessPoolExecutor(max_processes=5) + + def tearDown(self): + self.executor.shutdown() + + def test_processes_terminate(self): + self._start_some_futures() + self.assertEqual(len(self.executor._processes), 5) + self.executor.shutdown() + for p in self.executor._processes: + p.join() + + def test_context_manager_shutdown(self): + with futures.ProcessPoolExecutor(max_processes=5) as e: + executor = e + self.assertEqual(list(e.map(abs, range(-5, 5))), + [5, 4, 3, 2, 1, 0, 1, 2, 3, 4]) + + for p in self.executor._processes: + p.join() + +class WaitsTest(unittest.TestCase): + def test_concurrent_waits(self): + def wait_for_ALL_COMPLETED(): + fs.wait(return_when=futures.ALL_COMPLETED) + self.assertTrue(f1.done()) + self.assertTrue(f2.done()) + self.assertTrue(f3.done()) + self.assertTrue(f4.done()) + all_completed.release() + + def wait_for_FIRST_COMPLETED(): + fs.wait(return_when=futures.FIRST_COMPLETED) + self.assertTrue(f1.done()) + self.assertFalse(f2.done()) # XXX + self.assertFalse(f3.done()) + self.assertFalse(f4.done()) + first_completed.release() + + def wait_for_FIRST_EXCEPTION(): + fs.wait(return_when=futures.FIRST_EXCEPTION) + self.assertTrue(f1.done()) + self.assertTrue(f2.done()) + self.assertFalse(f3.done()) # XXX + self.assertFalse(f4.done()) + first_exception.release() + + all_completed = threading.Semaphore(0) + first_completed = threading.Semaphore(0) + first_exception = threading.Semaphore(0) + + call1 = Call(manual_finish=True) + call2 = ExceptionCall(manual_finish=True) + call3 = Call(manual_finish=True) + call4 = Call() + + try: + fs = self.executor.run_to_futures( + [call1, call2, call3, call4], + return_when=futures.RETURN_IMMEDIATELY) + f1, f2, f3, f4 = fs + + threads = [] + for wait_test in [wait_for_ALL_COMPLETED, + wait_for_FIRST_COMPLETED, + wait_for_FIRST_EXCEPTION]: + t = threading.Thread(target=wait_test) + t.start() + threads.append(t) + + time.sleep(1) # give threads enough time to execute wait + + call1.set_can() + first_completed.acquire() + call2.set_can() + first_exception.acquire() + call3.set_can() + all_completed.acquire() + + self.executor.shutdown() + finally: + call1.close() + call2.close() + call3.close() + call4.close() + +class ThreadPoolWaitTests(WaitsTest): + def setUp(self): + self.executor = futures.ThreadPoolExecutor(max_threads=1) + + def tearDown(self): + self.executor.shutdown() + +class ProcessPoolWaitTests(WaitsTest): + def setUp(self): + self.executor = futures.ProcessPoolExecutor(max_processes=1) + + def tearDown(self): + self.executor.shutdown() + +class CancelTests(unittest.TestCase): + def test_cancel_states(self): + call1 = Call(manual_finish=True) + call2 = Call() + call3 = Call() + call4 = Call() + + try: + fs = self.executor.run_to_futures( + [call1, call2, call3, call4], + return_when=futures.RETURN_IMMEDIATELY) + f1, f2, f3, f4 = fs + + call1.wait_on_called() + self.assertEqual(f1.cancel(), False) + self.assertEqual(f2.cancel(), True) + self.assertEqual(f4.cancel(), True) + self.assertEqual(f1.cancelled(), False) + self.assertEqual(f2.cancelled(), True) + self.assertEqual(f3.cancelled(), False) + self.assertEqual(f4.cancelled(), True) + self.assertEqual(f1.done(), False) + self.assertEqual(f2.done(), True) + self.assertEqual(f3.done(), False) + self.assertEqual(f4.done(), True) + + call1.set_can() + fs.wait(return_when=futures.ALL_COMPLETED) + self.assertEqual(f1.result(), 42) + self.assertRaises(futures.CancelledError, f2.result) + self.assertRaises(futures.CancelledError, f2.exception) + self.assertEqual(f3.result(), 42) + self.assertRaises(futures.CancelledError, f4.result) + self.assertRaises(futures.CancelledError, f4.exception) + + self.assertEqual(call2.called(), False) + self.assertEqual(call4.called(), False) + finally: + call1.close() + call2.close() + call3.close() + call4.close() + + def test_wait_for_individual_cancel(self): + def end_call(): + time.sleep(1) + f2.cancel() + call1.set_can() + + call1 = Call(manual_finish=True) + call2 = Call() + + try: + fs = self.executor.run_to_futures( + [call1, call2], + return_when=futures.RETURN_IMMEDIATELY) + f1, f2 = fs + + call1.wait_on_called() + t = threading.Thread(target=end_call) + t.start() + self.assertRaises(futures.CancelledError, f2.result) + self.assertRaises(futures.CancelledError, f2.exception) + t.join() + finally: + call1.close() + call2.close() + + def test_wait_with_already_cancelled_futures(self): + call1 = Call(manual_finish=True) + call2 = Call() + call3 = Call() + call4 = Call() + + try: + fs = self.executor.run_to_futures( + [call1, call2, call3, call4], + return_when=futures.RETURN_IMMEDIATELY) + f1, f2, f3, f4 = fs + + call1.wait_on_called() + self.assertTrue(f2.cancel()) + self.assertTrue(f3.cancel()) + call1.set_can() + time.sleep(0.1) + + fs.wait(return_when=futures.ALL_COMPLETED) + finally: + call1.close() + call2.close() + call3.close() + call4.close() + + def test_cancel_all(self): + call1 = Call(manual_finish=True) + call2 = Call() + call3 = Call() + call4 = Call() + + try: + fs = self.executor.run_to_futures( + [call1, call2, call3, call4], + return_when=futures.RETURN_IMMEDIATELY) + f1, f2, f3, f4 = fs + + call1.wait_on_called() + self.assertRaises(futures.TimeoutError, fs.cancel, timeout=0) + call1.set_can() + fs.cancel() + + self.assertFalse(f1.cancelled()) + self.assertTrue(f2.cancelled()) + self.assertTrue(f3.cancelled()) + self.assertTrue(f4.cancelled()) + finally: + call1.close() + call2.close() + call3.close() + call4.close() + +class ThreadPoolCancelTests(CancelTests): + def setUp(self): + self.executor = futures.ThreadPoolExecutor(max_threads=1) + + def tearDown(self): + self.executor.shutdown() + +class ProcessPoolCancelTests(WaitsTest): + def setUp(self): + self.executor = futures.ProcessPoolExecutor(max_processes=1) + + def tearDown(self): + self.executor.shutdown() + +class ExecutorTest(unittest.TestCase): + # Executor.shutdown() and context manager usage is tested by + # ExecutorShutdownTest. + def test_run_to_futures(self): + call1 = Call(result=1) + call2 = Call(result=2) + call3 = Call(manual_finish=True) + call4 = Call() + call5 = Call() + + try: + f1, f2, f3, f4, f5 = self.executor.run_to_futures( + [call1, call2, call3, call4, call5], + return_when=futures.RETURN_IMMEDIATELY) + + call3.wait_on_called() + + self.assertTrue(f1.done()) + self.assertFalse(f1.running()) + self.assertEqual(f1.index, 0) + + self.assertTrue(f2.done()) + self.assertFalse(f2.running()) + self.assertEqual(f2.index, 1) + + self.assertFalse(f3.done()) + self.assertTrue(f3.running()) + self.assertEqual(f3.index, 2) + + # ProcessPoolExecutor may mark some futures as running before they + # actually are. + self.assertFalse(f4.done()) + self.assertEqual(f4.index, 3) + + self.assertFalse(f5.done()) + self.assertEqual(f5.index, 4) + finally: + call1.close() + call2.close() + call3.close() + call4.close() + call5.close() + + def test_run_to_results(self): + call1 = Call(result=1) + call2 = Call(result=2) + call3 = Call(result=3) + try: + self.assertEqual( + list(self.executor.run_to_results([call1, call2, call3])), + [1, 2, 3]) + finally: + call1.close() + call2.close() + call3.close() + + def test_run_to_results_exception(self): + call1 = Call(result=1) + call2 = Call(result=2) + call3 = ExceptionCall() + try: + i = self.executor.run_to_results([call1, call2, call3]) + + self.assertEqual(i.__next__(), 1) + self.assertEqual(i.__next__(), 2) + self.assertRaises(ZeroDivisionError, i.__next__) + finally: + call1.close() + call2.close() + call3.close() + + def test_run_to_results_timeout(self): + call1 = Call(result=1) + call2 = Call(result=2) + call3 = Call(manual_finish=True) + + try: + i = self.executor.run_to_results([call1, call2, call3], timeout=1) + self.assertEqual(i.__next__(), 1) + self.assertEqual(i.__next__(), 2) + self.assertRaises(futures.TimeoutError, i.__next__) + call3.set_can() + finally: + call1.close() + call2.close() + call3.close() + + def test_map(self): + self.assertEqual( + list(self.executor.map(pow, range(10), range(10))), + list(map(pow, range(10), range(10)))) + + def test_map_exception(self): + i = self.executor.map(divmod, [1, 1, 1, 1], [2, 3, 0, 5]) + self.assertEqual(i.__next__(), (0, 1)) + self.assertEqual(i.__next__(), (0, 1)) + self.assertRaises(ZeroDivisionError, i.__next__) + +class ThreadPoolExecutorTest(ExecutorTest): + def setUp(self): + self.executor = futures.ThreadPoolExecutor(max_threads=1) + + def tearDown(self): + self.executor.shutdown() + +class ProcessPoolExecutorTest(ExecutorTest): + def setUp(self): + self.executor = futures.ProcessPoolExecutor(max_processes=1) + + def tearDown(self): + self.executor.shutdown() + +class FutureTests(unittest.TestCase): + # Future.index() is tested by ExecutorTest + # Future.cancel() is further tested by CancelTests. + + def test_repr(self): + self.assertEqual(repr(PENDING_FUTURE), '') + self.assertEqual(repr(RUNNING_FUTURE), '') + self.assertEqual(repr(CANCELLED_FUTURE), '') + self.assertEqual(repr(CANCELLED_AND_NOTIFIED_FUTURE), + '') + self.assertEqual(repr(EXCEPTION_FUTURE), + '') + self.assertEqual(repr(SUCCESSFUL_FUTURE), + '') + + create_future + + def test_cancel(self): + f1 = create_future(state=PENDING) + f2 = create_future(state=RUNNING) + f3 = create_future(state=CANCELLED) + f4 = create_future(state=CANCELLED_AND_NOTIFIED) + f5 = create_future(state=FINISHED, exception=IOError()) + f6 = create_future(state=FINISHED, result=5) + + self.assertTrue(f1.cancel()) + self.assertEquals(f1._state, CANCELLED) + + self.assertFalse(f2.cancel()) + self.assertEquals(f2._state, RUNNING) + + self.assertTrue(f3.cancel()) + self.assertEquals(f3._state, CANCELLED) + + self.assertTrue(f4.cancel()) + self.assertEquals(f4._state, CANCELLED_AND_NOTIFIED) + + self.assertFalse(f5.cancel()) + self.assertEquals(f5._state, FINISHED) + + self.assertFalse(f6.cancel()) + self.assertEquals(f6._state, FINISHED) + + def test_cancelled(self): + self.assertFalse(PENDING_FUTURE.cancelled()) + self.assertFalse(RUNNING_FUTURE.cancelled()) + self.assertTrue(CANCELLED_FUTURE.cancelled()) + self.assertTrue(CANCELLED_AND_NOTIFIED_FUTURE.cancelled()) + self.assertFalse(EXCEPTION_FUTURE.cancelled()) + self.assertFalse(SUCCESSFUL_FUTURE.cancelled()) + + def test_done(self): + self.assertFalse(PENDING_FUTURE.done()) + self.assertFalse(RUNNING_FUTURE.done()) + self.assertTrue(CANCELLED_FUTURE.done()) + self.assertTrue(CANCELLED_AND_NOTIFIED_FUTURE.done()) + self.assertTrue(EXCEPTION_FUTURE.done()) + self.assertTrue(SUCCESSFUL_FUTURE.done()) + + def test_running(self): + self.assertFalse(PENDING_FUTURE.running()) + self.assertTrue(RUNNING_FUTURE.running()) + self.assertFalse(CANCELLED_FUTURE.running()) + self.assertFalse(CANCELLED_AND_NOTIFIED_FUTURE.running()) + self.assertFalse(EXCEPTION_FUTURE.running()) + self.assertFalse(SUCCESSFUL_FUTURE.running()) + + def test_result_with_timeout(self): + self.assertRaises(futures.TimeoutError, + PENDING_FUTURE.result, timeout=0) + self.assertRaises(futures.TimeoutError, + RUNNING_FUTURE.result, timeout=0) + self.assertRaises(futures.CancelledError, + CANCELLED_FUTURE.result, timeout=0) + self.assertRaises(futures.CancelledError, + CANCELLED_AND_NOTIFIED_FUTURE.result, timeout=0) + self.assertRaises(IOError, EXCEPTION_FUTURE.result, timeout=0) + self.assertEqual(SUCCESSFUL_FUTURE.result(timeout=0), 42) + + def test_result_with_success(self): + def notification(): + time.sleep(0.1) + with f1._condition: + f1._state = FINISHED + f1._result = 42 + f1._condition.notify_all() + + f1 = create_future(state=PENDING) + t = threading.Thread(target=notification) + t.start() + + self.assertEquals(f1.result(timeout=1), 42) + + def test_result_with_cancel(self): + def notification(): + time.sleep(0.1) + with f1._condition: + f1._state = CANCELLED + f1._condition.notify_all() + + f1 = create_future(state=PENDING) + t = threading.Thread(target=notification) + t.start() + + self.assertRaises(futures.CancelledError, f1.result, timeout=1) + + def test_exception_with_timeout(self): + self.assertRaises(futures.TimeoutError, + PENDING_FUTURE.exception, timeout=0) + self.assertRaises(futures.TimeoutError, + RUNNING_FUTURE.exception, timeout=0) + self.assertRaises(futures.CancelledError, + CANCELLED_FUTURE.exception, timeout=0) + self.assertRaises(futures.CancelledError, + CANCELLED_AND_NOTIFIED_FUTURE.exception, timeout=0) + self.assertTrue(isinstance(EXCEPTION_FUTURE.exception(timeout=0), + IOError)) + self.assertEqual(SUCCESSFUL_FUTURE.exception(timeout=0), None) + + def test_exception_with_success(self): + def notification(): + time.sleep(0.1) + with f1._condition: + f1._state = FINISHED + f1._exception = IOError() + f1._condition.notify_all() + + f1 = create_future(state=PENDING) + t = threading.Thread(target=notification) + t.start() + + self.assertTrue(isinstance(f1.exception(timeout=1), IOError)) + +class FutureListTests(unittest.TestCase): + # FutureList.wait() is further tested by WaitsTest. + # FutureList.cancel() is tested by CancelTests. + def test_wait_RETURN_IMMEDIATELY(self): + f = futures.FutureList(futures=None, event_sink=None) + f.wait(return_when=futures.RETURN_IMMEDIATELY) + + def test_wait_timeout(self): + f = futures.FutureList([PENDING_FUTURE], + futures._base.ThreadEventSink()) + + for t in [futures.FIRST_COMPLETED, + futures.FIRST_EXCEPTION, + futures.ALL_COMPLETED]: + f.wait(timeout=0.1, return_when=t) + self.assertFalse(PENDING_FUTURE.done()) + + def test_wait_all_done(self): + f = futures.FutureList([CANCELLED_FUTURE, + CANCELLED_AND_NOTIFIED_FUTURE, + SUCCESSFUL_FUTURE, + EXCEPTION_FUTURE], + futures._base.ThreadEventSink()) + + f.wait(return_when=futures.ALL_COMPLETED) + + def test_filters(self): + fs = [PENDING_FUTURE, + RUNNING_FUTURE, + CANCELLED_FUTURE, + CANCELLED_AND_NOTIFIED_FUTURE, + EXCEPTION_FUTURE, + SUCCESSFUL_FUTURE] + f = futures.FutureList(fs, None) + + self.assertEqual(list(f.running_futures()), [RUNNING_FUTURE]) + self.assertEqual(list(f.cancelled_futures()), + [CANCELLED_FUTURE, + CANCELLED_AND_NOTIFIED_FUTURE]) + self.assertEqual(list(f.done_futures()), + [CANCELLED_FUTURE, + CANCELLED_AND_NOTIFIED_FUTURE, + EXCEPTION_FUTURE, + SUCCESSFUL_FUTURE]) + self.assertEqual(list(f.successful_futures()), + [SUCCESSFUL_FUTURE]) + self.assertEqual(list(f.exception_futures()), + [EXCEPTION_FUTURE]) + + def test_has_running_futures(self): + self.assertFalse( + futures.FutureList([PENDING_FUTURE, + CANCELLED_FUTURE, + CANCELLED_AND_NOTIFIED_FUTURE, + SUCCESSFUL_FUTURE, + EXCEPTION_FUTURE], + None).has_running_futures()) + self.assertTrue( + futures.FutureList([RUNNING_FUTURE], + None).has_running_futures()) + + def test_has_cancelled_futures(self): + self.assertFalse( + futures.FutureList([PENDING_FUTURE, + RUNNING_FUTURE, + SUCCESSFUL_FUTURE, + EXCEPTION_FUTURE], + None).has_cancelled_futures()) + self.assertTrue( + futures.FutureList([CANCELLED_FUTURE], + None).has_cancelled_futures()) + + self.assertTrue( + futures.FutureList([CANCELLED_AND_NOTIFIED_FUTURE], + None).has_cancelled_futures()) + + def test_has_done_futures(self): + self.assertFalse( + futures.FutureList([PENDING_FUTURE, + RUNNING_FUTURE], + None).has_done_futures()) + self.assertTrue( + futures.FutureList([CANCELLED_FUTURE], + None).has_done_futures()) + + self.assertTrue( + futures.FutureList([CANCELLED_AND_NOTIFIED_FUTURE], + None).has_done_futures()) + + self.assertTrue( + futures.FutureList([EXCEPTION_FUTURE], + None).has_done_futures()) + + self.assertTrue( + futures.FutureList([SUCCESSFUL_FUTURE], + None).has_done_futures()) + + def test_has_successful_futures(self): + self.assertFalse( + futures.FutureList([PENDING_FUTURE, + RUNNING_FUTURE, + CANCELLED_FUTURE, + CANCELLED_AND_NOTIFIED_FUTURE, + EXCEPTION_FUTURE], + None).has_successful_futures()) + + self.assertTrue( + futures.FutureList([SUCCESSFUL_FUTURE], + None).has_successful_futures()) + + def test_has_exception_futures(self): + self.assertFalse( + futures.FutureList([PENDING_FUTURE, + RUNNING_FUTURE, + CANCELLED_FUTURE, + CANCELLED_AND_NOTIFIED_FUTURE, + SUCCESSFUL_FUTURE], + None).has_exception_futures()) + + self.assertTrue( + futures.FutureList([EXCEPTION_FUTURE], + None).has_exception_futures()) + + def test_get_item(self): + fs = [PENDING_FUTURE, RUNNING_FUTURE, CANCELLED_FUTURE] + f = futures.FutureList(fs, None) + self.assertEqual(f[0], PENDING_FUTURE) + self.assertEqual(f[1], RUNNING_FUTURE) + self.assertEqual(f[2], CANCELLED_FUTURE) + self.assertRaises(IndexError, f.__getitem__, 3) + + def test_len(self): + f = futures.FutureList([PENDING_FUTURE, + RUNNING_FUTURE, + CANCELLED_FUTURE], + None) + self.assertEqual(len(f), 3) + + def test_iter(self): + fs = [PENDING_FUTURE, RUNNING_FUTURE, CANCELLED_FUTURE] + f = futures.FutureList(fs, None) + self.assertEqual(list(iter(f)), fs) + + def test_contains(self): + f = futures.FutureList([PENDING_FUTURE, + RUNNING_FUTURE], + None) + self.assertTrue(PENDING_FUTURE in f) + self.assertTrue(RUNNING_FUTURE in f) + self.assertFalse(CANCELLED_FUTURE in f) + + def test_repr(self): + pending = create_future(state=PENDING) + cancelled = create_future(state=CANCELLED) + cancelled2 = create_future(state=CANCELLED_AND_NOTIFIED) + running = create_future(state=RUNNING) + finished = create_future(state=FINISHED) + + f = futures.FutureList( + [PENDING_FUTURE] * 4 + [CANCELLED_FUTURE] * 2 + + [CANCELLED_AND_NOTIFIED_FUTURE] + + [RUNNING_FUTURE] * 2 + + [SUCCESSFUL_FUTURE, EXCEPTION_FUTURE] * 3, + None) + + self.assertEqual(repr(f), + '') + +def test_main(): + test.support.run_unittest(ProcessPoolCancelTests, + ThreadPoolCancelTests, + ProcessPoolExecutorTest, + ThreadPoolExecutorTest, + ProcessPoolWaitTests, + ThreadPoolWaitTests, + FutureTests, + FutureListTests, + ProcessPoolShutdownTest, + ThreadPoolShutdownTest) + +if __name__ == "__main__": + test_main() \ No newline at end of file diff --git a/test_futures.py b/test_futures.py deleted file mode 100644 index 0572f81..0000000 --- a/test_futures.py +++ /dev/null @@ -1,798 +0,0 @@ -import test.support -from test.support import verbose - -import unittest -import threading -import time -import multiprocessing - -import futures -import futures._base -from futures._base import ( - PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future) - -def create_future(state=PENDING, exception=None, result=None): - f = Future(0) - f._state = state - f._exception = exception - f._result = result - return f - -PENDING_FUTURE = create_future(state=PENDING) -RUNNING_FUTURE = create_future(state=RUNNING) -CANCELLED_FUTURE = create_future(state=CANCELLED) -CANCELLED_AND_NOTIFIED_FUTURE = create_future(state=CANCELLED_AND_NOTIFIED) -EXCEPTION_FUTURE = create_future(state=FINISHED, exception=IOError()) -SUCCESSFUL_FUTURE = create_future(state=FINISHED, result=42) - -class Call(object): - CALL_LOCKS = {} - def __init__(self, manual_finish=False, result=42): - called_event = multiprocessing.Event() - can_finish = multiprocessing.Event() - - self._result = result - self._called_event_id = id(called_event) - self._can_finish_event_id = id(can_finish) - - self.CALL_LOCKS[self._called_event_id] = called_event - self.CALL_LOCKS[self._can_finish_event_id] = can_finish - - if not manual_finish: - self._can_finish.set() - - @property - def _can_finish(self): - return self.CALL_LOCKS[self._can_finish_event_id] - - @property - def _called_event(self): - return self.CALL_LOCKS[self._called_event_id] - - def wait_on_called(self): - self._called_event.wait() - - def set_can(self): - self._can_finish.set() - - def called(self): - return self._called_event.is_set() - - def __call__(self): - if self._called_event.is_set(): print('called twice') - - self._called_event.set() - self._can_finish.wait() - return self._result - - def close(self): - del self.CALL_LOCKS[self._called_event_id] - del self.CALL_LOCKS[self._can_finish_event_id] - -class ExceptionCall(Call): - def __call__(self): - assert not self._called_event.is_set(), 'already called' - - self._called_event.set() - self._can_finish.wait() - raise ZeroDivisionError() - -class ExecutorShutdownTest(unittest.TestCase): - def test_run_after_shutdown(self): - call1 = Call() - try: - self.executor.shutdown() - self.assertRaises(RuntimeError, - self.executor.run_to_futures, - [call1]) - finally: - call1.close() - - def _start_some_futures(self): - call1 = Call(manual_finish=True) - call2 = Call(manual_finish=True) - call3 = Call(manual_finish=True) - - try: - self.executor.run_to_futures([call1, call2, call3], - return_when=futures.RETURN_IMMEDIATELY) - - call1.wait_on_called() - call2.wait_on_called() - call3.wait_on_called() - - call1.set_can() - call2.set_can() - call3.set_can() - finally: - call1.close() - call2.close() - call3.close() - -class ThreadPoolShutdownTest(ExecutorShutdownTest): - def setUp(self): - self.executor = futures.ThreadPoolExecutor(max_threads=5) - - def tearDown(self): - self.executor.shutdown() - - def test_threads_terminate(self): - self._start_some_futures() - self.assertEqual(len(self.executor._threads), 3) - self.executor.shutdown() - for t in self.executor._threads: - t.join() - - def test_context_manager_shutdown(self): - with futures.ThreadPoolExecutor(max_threads=5) as e: - executor = e - self.assertEqual(list(e.map(abs, range(-5, 5))), - [5, 4, 3, 2, 1, 0, 1, 2, 3, 4]) - - for t in executor._threads: - t.join() - -class ProcessPoolShutdownTest(ExecutorShutdownTest): - def setUp(self): - self.executor = futures.ProcessPoolExecutor(max_processes=5) - - def tearDown(self): - self.executor.shutdown() - - def test_processes_terminate(self): - self._start_some_futures() - self.assertEqual(len(self.executor._processes), 5) - self.executor.shutdown() - for p in self.executor._processes: - p.join() - - def test_context_manager_shutdown(self): - with futures.ProcessPoolExecutor(max_processes=5) as e: - executor = e - self.assertEqual(list(e.map(abs, range(-5, 5))), - [5, 4, 3, 2, 1, 0, 1, 2, 3, 4]) - - for p in self.executor._processes: - p.join() - -class WaitsTest(unittest.TestCase): - def test_concurrent_waits(self): - def wait_for_ALL_COMPLETED(): - fs.wait(return_when=futures.ALL_COMPLETED) - self.assertTrue(f1.done()) - self.assertTrue(f2.done()) - self.assertTrue(f3.done()) - self.assertTrue(f4.done()) - all_completed.release() - - def wait_for_FIRST_COMPLETED(): - fs.wait(return_when=futures.FIRST_COMPLETED) - self.assertTrue(f1.done()) - self.assertFalse(f2.done()) # XXX - self.assertFalse(f3.done()) - self.assertFalse(f4.done()) - first_completed.release() - - def wait_for_FIRST_EXCEPTION(): - fs.wait(return_when=futures.FIRST_EXCEPTION) - self.assertTrue(f1.done()) - self.assertTrue(f2.done()) - self.assertFalse(f3.done()) # XXX - self.assertFalse(f4.done()) - first_exception.release() - - all_completed = threading.Semaphore(0) - first_completed = threading.Semaphore(0) - first_exception = threading.Semaphore(0) - - call1 = Call(manual_finish=True) - call2 = ExceptionCall(manual_finish=True) - call3 = Call(manual_finish=True) - call4 = Call() - - try: - fs = self.executor.run_to_futures( - [call1, call2, call3, call4], - return_when=futures.RETURN_IMMEDIATELY) - f1, f2, f3, f4 = fs - - threads = [] - for wait_test in [wait_for_ALL_COMPLETED, - wait_for_FIRST_COMPLETED, - wait_for_FIRST_EXCEPTION]: - t = threading.Thread(target=wait_test) - t.start() - threads.append(t) - - time.sleep(1) # give threads enough time to execute wait - - call1.set_can() - first_completed.acquire() - call2.set_can() - first_exception.acquire() - call3.set_can() - all_completed.acquire() - - self.executor.shutdown() - finally: - call1.close() - call2.close() - call3.close() - call4.close() - -class ThreadPoolWaitTests(WaitsTest): - def setUp(self): - self.executor = futures.ThreadPoolExecutor(max_threads=1) - - def tearDown(self): - self.executor.shutdown() - -class ProcessPoolWaitTests(WaitsTest): - def setUp(self): - self.executor = futures.ProcessPoolExecutor(max_processes=1) - - def tearDown(self): - self.executor.shutdown() - -class CancelTests(unittest.TestCase): - def test_cancel_states(self): - call1 = Call(manual_finish=True) - call2 = Call() - call3 = Call() - call4 = Call() - - try: - fs = self.executor.run_to_futures( - [call1, call2, call3, call4], - return_when=futures.RETURN_IMMEDIATELY) - f1, f2, f3, f4 = fs - - call1.wait_on_called() - self.assertEqual(f1.cancel(), False) - self.assertEqual(f2.cancel(), True) - self.assertEqual(f4.cancel(), True) - self.assertEqual(f1.cancelled(), False) - self.assertEqual(f2.cancelled(), True) - self.assertEqual(f3.cancelled(), False) - self.assertEqual(f4.cancelled(), True) - self.assertEqual(f1.done(), False) - self.assertEqual(f2.done(), True) - self.assertEqual(f3.done(), False) - self.assertEqual(f4.done(), True) - - call1.set_can() - fs.wait(return_when=futures.ALL_COMPLETED) - self.assertEqual(f1.result(), 42) - self.assertRaises(futures.CancelledError, f2.result) - self.assertRaises(futures.CancelledError, f2.exception) - self.assertEqual(f3.result(), 42) - self.assertRaises(futures.CancelledError, f4.result) - self.assertRaises(futures.CancelledError, f4.exception) - - self.assertEqual(call2.called(), False) - self.assertEqual(call4.called(), False) - finally: - call1.close() - call2.close() - call3.close() - call4.close() - - def test_wait_for_individual_cancel(self): - def end_call(): - time.sleep(1) - f2.cancel() - call1.set_can() - - call1 = Call(manual_finish=True) - call2 = Call() - - try: - fs = self.executor.run_to_futures( - [call1, call2], - return_when=futures.RETURN_IMMEDIATELY) - f1, f2 = fs - - call1.wait_on_called() - t = threading.Thread(target=end_call) - t.start() - self.assertRaises(futures.CancelledError, f2.result) - self.assertRaises(futures.CancelledError, f2.exception) - t.join() - finally: - call1.close() - call2.close() - - def test_wait_with_already_cancelled_futures(self): - call1 = Call(manual_finish=True) - call2 = Call() - call3 = Call() - call4 = Call() - - try: - fs = self.executor.run_to_futures( - [call1, call2, call3, call4], - return_when=futures.RETURN_IMMEDIATELY) - f1, f2, f3, f4 = fs - - call1.wait_on_called() - self.assertTrue(f2.cancel()) - self.assertTrue(f3.cancel()) - call1.set_can() - time.sleep(0.1) - - fs.wait(return_when=futures.ALL_COMPLETED) - finally: - call1.close() - call2.close() - call3.close() - call4.close() - - def test_cancel_all(self): - call1 = Call(manual_finish=True) - call2 = Call() - call3 = Call() - call4 = Call() - - try: - fs = self.executor.run_to_futures( - [call1, call2, call3, call4], - return_when=futures.RETURN_IMMEDIATELY) - f1, f2, f3, f4 = fs - - call1.wait_on_called() - self.assertRaises(futures.TimeoutError, fs.cancel, timeout=0) - call1.set_can() - fs.cancel() - - self.assertFalse(f1.cancelled()) - self.assertTrue(f2.cancelled()) - self.assertTrue(f3.cancelled()) - self.assertTrue(f4.cancelled()) - finally: - call1.close() - call2.close() - call3.close() - call4.close() - -class ThreadPoolCancelTests(CancelTests): - def setUp(self): - self.executor = futures.ThreadPoolExecutor(max_threads=1) - - def tearDown(self): - self.executor.shutdown() - -class ProcessPoolCancelTests(WaitsTest): - def setUp(self): - self.executor = futures.ProcessPoolExecutor(max_processes=1) - - def tearDown(self): - self.executor.shutdown() - -class ExecutorTest(unittest.TestCase): - # Executor.shutdown() and context manager usage is tested by - # ExecutorShutdownTest. - def test_run_to_futures(self): - call1 = Call(result=1) - call2 = Call(result=2) - call3 = Call(manual_finish=True) - call4 = Call() - call5 = Call() - - try: - f1, f2, f3, f4, f5 = self.executor.run_to_futures( - [call1, call2, call3, call4, call5], - return_when=futures.RETURN_IMMEDIATELY) - - call3.wait_on_called() - - self.assertTrue(f1.done()) - self.assertFalse(f1.running()) - self.assertEqual(f1.index, 0) - - self.assertTrue(f2.done()) - self.assertFalse(f2.running()) - self.assertEqual(f2.index, 1) - - self.assertFalse(f3.done()) - self.assertTrue(f3.running()) - self.assertEqual(f3.index, 2) - - # ProcessPoolExecutor may mark some futures as running before they - # actually are. - self.assertFalse(f4.done()) - self.assertEqual(f4.index, 3) - - self.assertFalse(f5.done()) - self.assertEqual(f5.index, 4) - finally: - call1.close() - call2.close() - call3.close() - call4.close() - call5.close() - - def test_run_to_results(self): - call1 = Call(result=1) - call2 = Call(result=2) - call3 = Call(result=3) - try: - self.assertEqual( - list(self.executor.run_to_results([call1, call2, call3])), - [1, 2, 3]) - finally: - call1.close() - call2.close() - call3.close() - - def test_run_to_results_exception(self): - call1 = Call(result=1) - call2 = Call(result=2) - call3 = ExceptionCall() - try: - i = self.executor.run_to_results([call1, call2, call3]) - - self.assertEqual(i.__next__(), 1) - self.assertEqual(i.__next__(), 2) - self.assertRaises(ZeroDivisionError, i.__next__) - finally: - call1.close() - call2.close() - call3.close() - - def test_run_to_results_timeout(self): - call1 = Call(result=1) - call2 = Call(result=2) - call3 = Call(manual_finish=True) - - try: - i = self.executor.run_to_results([call1, call2, call3], timeout=1) - self.assertEqual(i.__next__(), 1) - self.assertEqual(i.__next__(), 2) - self.assertRaises(futures.TimeoutError, i.__next__) - call3.set_can() - finally: - call1.close() - call2.close() - call3.close() - - def test_map(self): - self.assertEqual( - list(self.executor.map(pow, range(10), range(10))), - list(map(pow, range(10), range(10)))) - - def test_map_exception(self): - i = self.executor.map(divmod, [1, 1, 1, 1], [2, 3, 0, 5]) - self.assertEqual(i.__next__(), (0, 1)) - self.assertEqual(i.__next__(), (0, 1)) - self.assertRaises(ZeroDivisionError, i.__next__) - -class ThreadPoolExecutorTest(ExecutorTest): - def setUp(self): - self.executor = futures.ThreadPoolExecutor(max_threads=1) - - def tearDown(self): - self.executor.shutdown() - -class ProcessPoolExecutorTest(ExecutorTest): - def setUp(self): - self.executor = futures.ProcessPoolExecutor(max_processes=1) - - def tearDown(self): - self.executor.shutdown() - -class FutureTests(unittest.TestCase): - # Future.index() is tested by ExecutorTest - # Future.cancel() is further tested by CancelTests. - - def test_repr(self): - self.assertEqual(repr(PENDING_FUTURE), '') - self.assertEqual(repr(RUNNING_FUTURE), '') - self.assertEqual(repr(CANCELLED_FUTURE), '') - self.assertEqual(repr(CANCELLED_AND_NOTIFIED_FUTURE), - '') - self.assertEqual(repr(EXCEPTION_FUTURE), - '') - self.assertEqual(repr(SUCCESSFUL_FUTURE), - '') - - create_future - - def test_cancel(self): - f1 = create_future(state=PENDING) - f2 = create_future(state=RUNNING) - f3 = create_future(state=CANCELLED) - f4 = create_future(state=CANCELLED_AND_NOTIFIED) - f5 = create_future(state=FINISHED, exception=IOError()) - f6 = create_future(state=FINISHED, result=5) - - self.assertTrue(f1.cancel()) - self.assertEquals(f1._state, CANCELLED) - - self.assertFalse(f2.cancel()) - self.assertEquals(f2._state, RUNNING) - - self.assertTrue(f3.cancel()) - self.assertEquals(f3._state, CANCELLED) - - self.assertTrue(f4.cancel()) - self.assertEquals(f4._state, CANCELLED_AND_NOTIFIED) - - self.assertFalse(f5.cancel()) - self.assertEquals(f5._state, FINISHED) - - self.assertFalse(f6.cancel()) - self.assertEquals(f6._state, FINISHED) - - def test_cancelled(self): - self.assertFalse(PENDING_FUTURE.cancelled()) - self.assertFalse(RUNNING_FUTURE.cancelled()) - self.assertTrue(CANCELLED_FUTURE.cancelled()) - self.assertTrue(CANCELLED_AND_NOTIFIED_FUTURE.cancelled()) - self.assertFalse(EXCEPTION_FUTURE.cancelled()) - self.assertFalse(SUCCESSFUL_FUTURE.cancelled()) - - def test_done(self): - self.assertFalse(PENDING_FUTURE.done()) - self.assertFalse(RUNNING_FUTURE.done()) - self.assertTrue(CANCELLED_FUTURE.done()) - self.assertTrue(CANCELLED_AND_NOTIFIED_FUTURE.done()) - self.assertTrue(EXCEPTION_FUTURE.done()) - self.assertTrue(SUCCESSFUL_FUTURE.done()) - - def test_running(self): - self.assertFalse(PENDING_FUTURE.running()) - self.assertTrue(RUNNING_FUTURE.running()) - self.assertFalse(CANCELLED_FUTURE.running()) - self.assertFalse(CANCELLED_AND_NOTIFIED_FUTURE.running()) - self.assertFalse(EXCEPTION_FUTURE.running()) - self.assertFalse(SUCCESSFUL_FUTURE.running()) - - def test_result_with_timeout(self): - self.assertRaises(futures.TimeoutError, - PENDING_FUTURE.result, timeout=0) - self.assertRaises(futures.TimeoutError, - RUNNING_FUTURE.result, timeout=0) - self.assertRaises(futures.CancelledError, - CANCELLED_FUTURE.result, timeout=0) - self.assertRaises(futures.CancelledError, - CANCELLED_AND_NOTIFIED_FUTURE.result, timeout=0) - self.assertRaises(IOError, EXCEPTION_FUTURE.result, timeout=0) - self.assertEqual(SUCCESSFUL_FUTURE.result(timeout=0), 42) - - def test_result_with_success(self): - def notification(): - time.sleep(0.1) - with f1._condition: - f1._state = FINISHED - f1._result = 42 - f1._condition.notify_all() - - f1 = create_future(state=PENDING) - t = threading.Thread(target=notification) - t.start() - - self.assertEquals(f1.result(timeout=1), 42) - - def test_result_with_cancel(self): - def notification(): - time.sleep(0.1) - with f1._condition: - f1._state = CANCELLED - f1._condition.notify_all() - - f1 = create_future(state=PENDING) - t = threading.Thread(target=notification) - t.start() - - self.assertRaises(futures.CancelledError, f1.result, timeout=1) - - def test_exception_with_timeout(self): - self.assertRaises(futures.TimeoutError, - PENDING_FUTURE.exception, timeout=0) - self.assertRaises(futures.TimeoutError, - RUNNING_FUTURE.exception, timeout=0) - self.assertRaises(futures.CancelledError, - CANCELLED_FUTURE.exception, timeout=0) - self.assertRaises(futures.CancelledError, - CANCELLED_AND_NOTIFIED_FUTURE.exception, timeout=0) - self.assertTrue(isinstance(EXCEPTION_FUTURE.exception(timeout=0), - IOError)) - self.assertEqual(SUCCESSFUL_FUTURE.exception(timeout=0), None) - - def test_exception_with_success(self): - def notification(): - time.sleep(0.1) - with f1._condition: - f1._state = FINISHED - f1._exception = IOError() - f1._condition.notify_all() - - f1 = create_future(state=PENDING) - t = threading.Thread(target=notification) - t.start() - - self.assertTrue(isinstance(f1.exception(timeout=1), IOError)) - -class FutureListTests(unittest.TestCase): - # FutureList.wait() is further tested by WaitsTest. - # FutureList.cancel() is tested by CancelTests. - def test_wait_RETURN_IMMEDIATELY(self): - f = futures.FutureList(futures=None, event_sink=None) - f.wait(return_when=futures.RETURN_IMMEDIATELY) - - def test_wait_timeout(self): - f = futures.FutureList([PENDING_FUTURE], - futures._base.ThreadEventSink()) - - for t in [futures.FIRST_COMPLETED, - futures.FIRST_EXCEPTION, - futures.ALL_COMPLETED]: - f.wait(timeout=0.1, return_when=t) - self.assertFalse(PENDING_FUTURE.done()) - - def test_wait_all_done(self): - f = futures.FutureList([CANCELLED_FUTURE, - CANCELLED_AND_NOTIFIED_FUTURE, - SUCCESSFUL_FUTURE, - EXCEPTION_FUTURE], - futures._base.ThreadEventSink()) - - f.wait(return_when=futures.ALL_COMPLETED) - - def test_filters(self): - fs = [PENDING_FUTURE, - RUNNING_FUTURE, - CANCELLED_FUTURE, - CANCELLED_AND_NOTIFIED_FUTURE, - EXCEPTION_FUTURE, - SUCCESSFUL_FUTURE] - f = futures.FutureList(fs, None) - - self.assertEqual(list(f.running_futures()), [RUNNING_FUTURE]) - self.assertEqual(list(f.cancelled_futures()), - [CANCELLED_FUTURE, - CANCELLED_AND_NOTIFIED_FUTURE]) - self.assertEqual(list(f.done_futures()), - [CANCELLED_FUTURE, - CANCELLED_AND_NOTIFIED_FUTURE, - EXCEPTION_FUTURE, - SUCCESSFUL_FUTURE]) - self.assertEqual(list(f.successful_futures()), - [SUCCESSFUL_FUTURE]) - self.assertEqual(list(f.exception_futures()), - [EXCEPTION_FUTURE]) - - def test_has_running_futures(self): - self.assertFalse( - futures.FutureList([PENDING_FUTURE, - CANCELLED_FUTURE, - CANCELLED_AND_NOTIFIED_FUTURE, - SUCCESSFUL_FUTURE, - EXCEPTION_FUTURE], - None).has_running_futures()) - self.assertTrue( - futures.FutureList([RUNNING_FUTURE], - None).has_running_futures()) - - def test_has_cancelled_futures(self): - self.assertFalse( - futures.FutureList([PENDING_FUTURE, - RUNNING_FUTURE, - SUCCESSFUL_FUTURE, - EXCEPTION_FUTURE], - None).has_cancelled_futures()) - self.assertTrue( - futures.FutureList([CANCELLED_FUTURE], - None).has_cancelled_futures()) - - self.assertTrue( - futures.FutureList([CANCELLED_AND_NOTIFIED_FUTURE], - None).has_cancelled_futures()) - - def test_has_done_futures(self): - self.assertFalse( - futures.FutureList([PENDING_FUTURE, - RUNNING_FUTURE], - None).has_done_futures()) - self.assertTrue( - futures.FutureList([CANCELLED_FUTURE], - None).has_done_futures()) - - self.assertTrue( - futures.FutureList([CANCELLED_AND_NOTIFIED_FUTURE], - None).has_done_futures()) - - self.assertTrue( - futures.FutureList([EXCEPTION_FUTURE], - None).has_done_futures()) - - self.assertTrue( - futures.FutureList([SUCCESSFUL_FUTURE], - None).has_done_futures()) - - def test_has_successful_futures(self): - self.assertFalse( - futures.FutureList([PENDING_FUTURE, - RUNNING_FUTURE, - CANCELLED_FUTURE, - CANCELLED_AND_NOTIFIED_FUTURE, - EXCEPTION_FUTURE], - None).has_successful_futures()) - - self.assertTrue( - futures.FutureList([SUCCESSFUL_FUTURE], - None).has_successful_futures()) - - def test_has_exception_futures(self): - self.assertFalse( - futures.FutureList([PENDING_FUTURE, - RUNNING_FUTURE, - CANCELLED_FUTURE, - CANCELLED_AND_NOTIFIED_FUTURE, - SUCCESSFUL_FUTURE], - None).has_exception_futures()) - - self.assertTrue( - futures.FutureList([EXCEPTION_FUTURE], - None).has_exception_futures()) - - def test_get_item(self): - fs = [PENDING_FUTURE, RUNNING_FUTURE, CANCELLED_FUTURE] - f = futures.FutureList(fs, None) - self.assertEqual(f[0], PENDING_FUTURE) - self.assertEqual(f[1], RUNNING_FUTURE) - self.assertEqual(f[2], CANCELLED_FUTURE) - self.assertRaises(IndexError, f.__getitem__, 3) - - def test_len(self): - f = futures.FutureList([PENDING_FUTURE, - RUNNING_FUTURE, - CANCELLED_FUTURE], - None) - self.assertEqual(len(f), 3) - - def test_iter(self): - fs = [PENDING_FUTURE, RUNNING_FUTURE, CANCELLED_FUTURE] - f = futures.FutureList(fs, None) - self.assertEqual(list(iter(f)), fs) - - def test_contains(self): - f = futures.FutureList([PENDING_FUTURE, - RUNNING_FUTURE], - None) - self.assertTrue(PENDING_FUTURE in f) - self.assertTrue(RUNNING_FUTURE in f) - self.assertFalse(CANCELLED_FUTURE in f) - - def test_repr(self): - pending = create_future(state=PENDING) - cancelled = create_future(state=CANCELLED) - cancelled2 = create_future(state=CANCELLED_AND_NOTIFIED) - running = create_future(state=RUNNING) - finished = create_future(state=FINISHED) - - f = futures.FutureList( - [PENDING_FUTURE] * 4 + [CANCELLED_FUTURE] * 2 + - [CANCELLED_AND_NOTIFIED_FUTURE] + - [RUNNING_FUTURE] * 2 + - [SUCCESSFUL_FUTURE, EXCEPTION_FUTURE] * 3, - None) - - self.assertEqual(repr(f), - '') - -def test_main(): - test.support.run_unittest(ProcessPoolCancelTests, - ThreadPoolCancelTests, - ProcessPoolExecutorTest, - ThreadPoolExecutorTest, - ProcessPoolWaitTests, - ThreadPoolWaitTests, - FutureTests, - FutureListTests, - ProcessPoolShutdownTest, - ThreadPoolShutdownTest) - -if __name__ == "__main__": - test_main() \ No newline at end of file -- cgit v1.2.1