diff options
author | brian quinlan <brian.quinlan@gmail.com> | 2009-05-23 09:18:19 +0000 |
---|---|---|
committer | brian quinlan <brian.quinlan@gmail.com> | 2009-05-23 09:18:19 +0000 |
commit | e1c6c9758b70b3e9e1630b5e8545a9a1e3de7368 (patch) | |
tree | 2dc7cdb8c8eda8f97696e5711831143e4d3fab0d | |
parent | e63029bf8fa0b37f42add3d6914509dd79c81cb6 (diff) | |
download | futures-e1c6c9758b70b3e9e1630b5e8545a9a1e3de7368.tar.gz |
Seperate into python2 and python3 directories
-rw-r--r-- | python2/crawl.py (renamed from crawl.py) | 0 | ||||
-rw-r--r-- | python2/futures/__init__.py (renamed from futures/__init__.py) | 0 | ||||
-rw-r--r-- | python2/futures/_base.py (renamed from futures/_base.py) | 0 | ||||
-rw-r--r-- | python2/futures/process.py (renamed from futures/process.py) | 0 | ||||
-rw-r--r-- | python2/futures/thread.py (renamed from futures/thread.py) | 0 | ||||
-rw-r--r-- | python2/primes.py (renamed from primes.py) | 0 | ||||
-rw-r--r-- | python2/test_futures.py (renamed from test_futures.py) | 0 | ||||
-rw-r--r-- | python3/crawl.py | 57 | ||||
-rw-r--r-- | python3/futures/__init__.py | 6 | ||||
-rw-r--r-- | python3/futures/_base.py | 482 | ||||
-rw-r--r-- | python3/futures/process.py | 155 | ||||
-rw-r--r-- | python3/futures/thread.py | 89 | ||||
-rw-r--r-- | python3/primes.py | 44 | ||||
-rw-r--r-- | python3/test_futures.py | 798 |
14 files changed, 1631 insertions, 0 deletions
diff --git a/crawl.py b/python2/crawl.py index 10e35c3..10e35c3 100644 --- a/crawl.py +++ b/python2/crawl.py diff --git a/futures/__init__.py b/python2/futures/__init__.py index 5f599ad..5f599ad 100644 --- a/futures/__init__.py +++ b/python2/futures/__init__.py diff --git a/futures/_base.py b/python2/futures/_base.py index 19cabe8..19cabe8 100644 --- a/futures/_base.py +++ b/python2/futures/_base.py diff --git a/futures/process.py b/python2/futures/process.py index ad6dc6d..ad6dc6d 100644 --- a/futures/process.py +++ b/python2/futures/process.py diff --git a/futures/thread.py b/python2/futures/thread.py index 9e3275f..9e3275f 100644 --- a/futures/thread.py +++ b/python2/futures/thread.py diff --git a/primes.py b/python2/primes.py index 7e83ea0..7e83ea0 100644 --- a/primes.py +++ b/python2/primes.py diff --git a/test_futures.py b/python2/test_futures.py index 0572f81..0572f81 100644 --- a/test_futures.py +++ b/python2/test_futures.py diff --git a/python3/crawl.py b/python3/crawl.py new file mode 100644 index 0000000..10e35c3 --- /dev/null +++ b/python3/crawl.py @@ -0,0 +1,57 @@ +import datetime +import functools +import futures.thread +import time +import timeit +import urllib.request + +URLS = ['http://www.google.com/', + 'http://www.apple.com/', + 'http://www.ibm.com', + 'http://www.thisurlprobablydoesnotexist.com', + 'http://www.slashdot.org/', + 'http://www.python.org/', + 'http://www.sweetapp.com/'] * 5 + +def load_url(url, timeout): + return urllib.request.urlopen(url, timeout=timeout).read() + +def download_urls_sequential(urls, timeout=60): + url_to_content = {} + for url in urls: + try: + url_to_content[url] = load_url(url, timeout=timeout) + except: + pass + return url_to_content + +def download_urls_with_executor(urls, executor, timeout=60): + try: + url_to_content = {} + fs = executor.run_to_futures( + (functools.partial(load_url, url, timeout) for url in urls), + timeout=timeout) + for future in fs.successful_futures(): + url = urls[future.index] + url_to_content[url] = future.result() + return url_to_content + finally: + executor.shutdown() + +def main(): + for name, fn in [('sequential', + functools.partial(download_urls_sequential, URLS)), + ('processes', + functools.partial(download_urls_with_executor, + URLS, + futures.ProcessPoolExecutor(10))), + ('threads', + functools.partial(download_urls_with_executor, + URLS, + futures.ThreadPoolExecutor(10)))]: + print('%s: ' % name.ljust(12), end='') + start = time.time() + fn() + print('%.2f seconds' % (time.time() - start)) + +main() diff --git a/python3/futures/__init__.py b/python3/futures/__init__.py new file mode 100644 index 0000000..5f599ad --- /dev/null +++ b/python3/futures/__init__.py @@ -0,0 +1,6 @@ +from futures._base import (FIRST_COMPLETED, FIRST_EXCEPTION, + ALL_COMPLETED, RETURN_IMMEDIATELY, + CancelledError, TimeoutError, + Future, FutureList) +from futures.thread import ThreadPoolExecutor +from futures.process import ProcessPoolExecutor diff --git a/python3/futures/_base.py b/python3/futures/_base.py new file mode 100644 index 0000000..19cabe8 --- /dev/null +++ b/python3/futures/_base.py @@ -0,0 +1,482 @@ +import functools +import logging +import threading +import time + +FIRST_COMPLETED = 0 +FIRST_EXCEPTION = 1 +ALL_COMPLETED = 2 +RETURN_IMMEDIATELY = 3 + +# Possible future states +PENDING = 0 +RUNNING = 1 +CANCELLED = 2 # The future was cancelled... +CANCELLED_AND_NOTIFIED = 3 # ...and .add_cancelled() was called. +FINISHED = 4 + +FUTURE_STATES = [ + PENDING, + RUNNING, + CANCELLED, + CANCELLED_AND_NOTIFIED, + FINISHED +] + +_STATE_TO_DESCRIPTION_MAP = { + PENDING: "pending", + RUNNING: "running", + CANCELLED: "cancelled", + CANCELLED_AND_NOTIFIED: "cancelled", + FINISHED: "finished" +} + +LOGGER = logging.getLogger("futures") +_handler = logging.StreamHandler() +LOGGER.addHandler(_handler) +del _handler + +def set_future_exception(future, event_sink, exception): + with future._condition: + future._exception = exception + with event_sink._condition: + future._state = FINISHED + event_sink.add_exception() + future._condition.notify_all() + +def set_future_result(future, event_sink, result): + with future._condition: + future._result = result + with event_sink._condition: + future._state = FINISHED + event_sink.add_result() + future._condition.notify_all() + +class Error(Exception): + pass + +class CancelledError(Error): + pass + +class TimeoutError(Error): + pass + +class Future(object): + """Represents the result of an asynchronous computation.""" + + def __init__(self, index): + """Initializes the future. Should not be called by clients.""" + self._condition = threading.Condition() + self._state = PENDING + self._result = None + self._exception = None + self._index = index + + def __repr__(self): + with self._condition: + if self._state == FINISHED: + if self._exception: + return '<Future state=%s raised %s>' % ( + _STATE_TO_DESCRIPTION_MAP[self._state], + self._exception.__class__.__name__) + else: + return '<Future state=%s returned %s>' % ( + _STATE_TO_DESCRIPTION_MAP[self._state], + self._result.__class__.__name__) + return '<Future state=%s>' % _STATE_TO_DESCRIPTION_MAP[self._state] + + @property + def index(self): + """The index of the future in its FutureList.""" + return self._index + + def cancel(self): + """Cancel the future if possible. + + Returns True if the future was cancelled, False otherwise. A future + cannot be cancelled if it is running or has already completed. + """ + with self._condition: + if self._state in [RUNNING, FINISHED]: + return False + + if self._state not in [CANCELLED, CANCELLED_AND_NOTIFIED]: + self._state = CANCELLED + self._condition.notify_all() + return True + + def cancelled(self): + """Return True if the future has cancelled.""" + with self._condition: + return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED] + + def running(self): + with self._condition: + return self._state == RUNNING + + def done(self): + """Return True of the future was cancelled or finished executing.""" + with self._condition: + return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED] + + def __get_result(self): + if self._exception: + raise self._exception + else: + return self._result + + def result(self, timeout=None): + """Return the result of the call that the future represents. + + Args: + timeout: The number of seconds to wait for the result if the future + isn't done. If None, then there is no limit on the wait time. + + Returns: + The result of the call that the future represents. + + Raises: + CancelledError: If the future was cancelled. + TimeoutError: If the future didn't finish executing before the given + timeout. + Exception: If the call raised then that exception will be raised. + """ + with self._condition: + if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: + raise CancelledError() + elif self._state == FINISHED: + return self.__get_result() + + self._condition.wait(timeout) + + if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: + raise CancelledError() + elif self._state == FINISHED: + return self.__get_result() + else: + raise TimeoutError() + + def exception(self, timeout=None): + """Return the exception raised by the call that the future represents. + + Args: + timeout: The number of seconds to wait for the exception if the + future isn't done. If None, then there is no limit on the wait + time. + + Returns: + The exception raised by the call that the future represents or None + if the call completed without raising. + + Raises: + CancelledError: If the future was cancelled. + TimeoutError: If the future didn't finish executing before the given + timeout. + """ + + with self._condition: + if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: + raise CancelledError() + elif self._state == FINISHED: + return self._exception + + self._condition.wait(timeout) + + if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: + raise CancelledError() + elif self._state == FINISHED: + return self._exception + else: + raise TimeoutError() + +class _FirstCompletedWaitTracker(object): + def __init__(self): + self.event = threading.Event() + + def add_result(self): + self.event.set() + + def add_exception(self): + self.event.set() + + def add_cancelled(self): + self.event.set() + +class _AllCompletedWaitTracker(object): + def __init__(self, pending_calls, stop_on_exception): + self.pending_calls = pending_calls + self.stop_on_exception = stop_on_exception + self.event = threading.Event() + + def add_result(self): + self.pending_calls -= 1 + if not self.pending_calls: + self.event.set() + + def add_exception(self): + if self.stop_on_exception: + self.event.set() + else: + self.add_result() + + def add_cancelled(self): + self.add_result() + +class ThreadEventSink(object): + def __init__(self): + self._condition = threading.Lock() + self._waiters = [] + + def add(self, e): + self._waiters.append(e) + + def remove(self, e): + self._waiters.remove(e) + + def add_result(self): + for waiter in self._waiters: + waiter.add_result() + + def add_exception(self): + for waiter in self._waiters: + waiter.add_exception() + + def add_cancelled(self): + for waiter in self._waiters: + waiter.add_cancelled() + +class FutureList(object): + def __init__(self, futures, event_sink): + """Initializes the FutureList. Should not be called by clients.""" + self._futures = futures + self._event_sink = event_sink + + def wait(self, timeout=None, return_when=ALL_COMPLETED): + """Wait for the futures in the list to complete. + + Args: + timeout: The maximum number of seconds to wait. If None, then there + is no limit on the wait time. + return_when: Indicates when the method should return. The options + are: + + FIRST_COMPLETED - Return when any future finishes or is + cancelled. + FIRST_EXCEPTION - Return when any future finishes by raising an + exception. If no future raises and exception + then it is equivalent to ALL_COMPLETED. + ALL_COMPLETED - Return when all futures finish or are cancelled. + RETURN_IMMEDIATELY - Return without waiting (this is not likely + to be a useful option but it is there to + be symmetrical with the + executor.run_to_futures() method. + + Raises: + TimeoutError: If the wait condition wasn't satisfied before the + given timeout. + """ + if return_when == RETURN_IMMEDIATELY: + return + + with self._event_sink._condition: + # Make a quick exit if every future is already done. This check is + # necessary because, if every future is in the + # CANCELLED_AND_NOTIFIED or FINISHED state then the WaitTracker will + # never receive any + if all(f._state in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED] + for f in self): + return + + if return_when == FIRST_COMPLETED: + completed_tracker = _FirstCompletedWaitTracker() + else: + # Calculate how many events are expected before every future + # is complete. This can be done without holding the futures' + # locks because a future cannot transition itself into either + # of the states being looked for. + pending_count = sum( + f._state not in [CANCELLED_AND_NOTIFIED, FINISHED] + for f in self) + + if return_when == FIRST_EXCEPTION: + completed_tracker = _AllCompletedWaitTracker( + pending_count, stop_on_exception=True) + elif return_when == ALL_COMPLETED: + completed_tracker = _AllCompletedWaitTracker( + pending_count, stop_on_exception=False) + + self._event_sink.add(completed_tracker) + + try: + completed_tracker.event.wait(timeout) + finally: + self._event_sink.remove(completed_tracker) + + def cancel(self, timeout=None): + """Cancel the futures in the list. + + Args: + timeout: The maximum number of seconds to wait. If None, then there + is no limit on the wait time. + + Raises: + TimeoutError: If all the futures were not finished before the + given timeout. + """ + for f in self: + f.cancel() + self.wait(timeout=timeout, return_when=ALL_COMPLETED) + if any(not f.done() for f in self): + raise TimeoutError() + + def has_running_futures(self): + return any(self.running_futures()) + + def has_cancelled_futures(self): + return any(self.cancelled_futures()) + + def has_done_futures(self): + return any(self.done_futures()) + + def has_successful_futures(self): + return any(self.successful_futures()) + + def has_exception_futures(self): + return any(self.exception_futures()) + + def cancelled_futures(self): + return (f for f in self + if f._state in [CANCELLED, CANCELLED_AND_NOTIFIED]) + + def done_futures(self): + return (f for f in self + if f._state in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]) + + def successful_futures(self): + return (f for f in self + if f._state == FINISHED and f._exception is None) + + def exception_futures(self): + return (f for f in self + if f._state == FINISHED and f._exception is not None) + + def running_futures(self): + return (f for f in self if f._state == RUNNING) + + def __len__(self): + return len(self._futures) + + def __getitem__(self, i): + return self._futures[i] + + def __iter__(self): + return iter(self._futures) + + def __contains__(self, future): + return future in self._futures + + def __repr__(self): + states = {state: 0 for state in FUTURE_STATES} + for f in self: + states[f._state] += 1 + + return ('<FutureList #futures=%d ' + '[#pending=%d #cancelled=%d #running=%d #finished=%d]>' % ( + len(self), + states[PENDING], + states[CANCELLED] + states[CANCELLED_AND_NOTIFIED], + states[RUNNING], + states[FINISHED])) + +class Executor(object): + def run_to_futures(self, calls, timeout=None, return_when=ALL_COMPLETED): + """Return a list of futures representing the given calls. + + Args: + calls: A sequence of callables that take no arguments. These will + be bound to Futures and returned. + timeout: The maximum number of seconds to wait. If None, then there + is no limit on the wait time. + return_when: Indicates when the method should return. The options + are: + + FIRST_COMPLETED - Return when any future finishes or is + cancelled. + FIRST_EXCEPTION - Return when any future finishes by raising an + exception. If no future raises and exception + then it is equivalent to ALL_COMPLETED. + ALL_COMPLETED - Return when all futures finish or are cancelled. + RETURN_IMMEDIATELY - Return without waiting. + + Returns: + A FuturesList containing futures for the given calls. + """ + raise NotImplementedError() + + def run_to_results(self, calls, timeout=None): + """Returns a iterator of the results of the given calls. + + Args: + calls: A sequence of callables that take no arguments. These will + be called and their results returned. + timeout: The maximum number of seconds to wait. If None, then there + is no limit on the wait time. + + Returns: + An iterator over the results of the given calls. Equivalent to: + (call() for call in calls) but the calls may be evaluated + out-of-order. + + Raises: + TimeoutError: If all the given calls were not completed before the + given timeout. + Exception: If any call() raises. + """ + if timeout is not None: + end_time = timeout + time.time() + + fs = self.run_to_futures(calls, return_when=RETURN_IMMEDIATELY) + + try: + for future in fs: + if timeout is None: + yield future.result() + else: + yield future.result(end_time - time.time()) + finally: + try: + fs.cancel(timeout=0) + except TimeoutError: + pass + + def map(self, func, *iterables, timeout=None): + """Returns a iterator equivalent to map(fn, iter). + + Args: + func: A callable that will take take as many arguments as there + are passed iterables. + timeout: The maximum number of seconds to wait. If None, then there + is no limit on the wait time. + + Returns: + An iterator equivalent to: map(func, *iterables) but the calls may + be evaluated out-of-order. + + Raises: + TimeoutError: If the entire result iterator could not be generated + before the given timeout. + Exception: If fn(*args) raises for any values. + """ + calls = [functools.partial(func, *args) for args in zip(*iterables)] + return self.run_to_results(calls, timeout) + + def shutdown(self): + """Clean-up. No other methods can be called afterwards.""" + raise NotImplementedError() + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.shutdown() + return False diff --git a/python3/futures/process.py b/python3/futures/process.py new file mode 100644 index 0000000..ad6dc6d --- /dev/null +++ b/python3/futures/process.py @@ -0,0 +1,155 @@ +#!/usr/bin/env python + +from futures._base import (PENDING, RUNNING, CANCELLED, + CANCELLED_AND_NOTIFIED, FINISHED, + ALL_COMPLETED, + set_future_exception, set_future_result, + Executor, Future, FutureList, ThreadEventSink) + +import queue +import multiprocessing +import threading + +class _WorkItem(object): + def __init__(self, call, future, completion_tracker): + self.call = call + self.future = future + self.completion_tracker = completion_tracker + +class _ResultItem(object): + def __init__(self, work_id, exception=None, result=None): + self.work_id = work_id + self.exception = exception + self.result = result + +class _CallItem(object): + def __init__(self, work_id, call): + self.work_id = work_id + self.call = call + +def _process_worker(call_queue, result_queue, shutdown): + while True: + try: + call_item = call_queue.get(block=True, timeout=0.1) + except queue.Empty: + if shutdown.is_set(): + return + else: + try: + r = call_item.call() + except BaseException as e: + result_queue.put(_ResultItem(call_item.work_id, + exception=e)) + else: + result_queue.put(_ResultItem(call_item.work_id, + result=r)) + +class ProcessPoolExecutor(Executor): + def __init__(self, max_processes=None): + if max_processes is None: + max_processes = multiprocessing.cpu_count() + + self._max_processes = max_processes + # Make the call queue slightly larger than the number of processes to + # prevent the worker processes from starving but to make future.cancel() + # responsive. + self._call_queue = multiprocessing.Queue(self._max_processes + 1) + self._result_queue = multiprocessing.Queue() + self._work_ids = queue.Queue() + self._queue_management_thread = None + self._processes = set() + + # Shutdown is a two-step process. + self._shutdown_thread = False + self._shutdown_process_event = multiprocessing.Event() + self._shutdown_lock = threading.Lock() + self._queue_count = 0 + self._pending_work_items = {} + + def _add_call_item_to_queue(self): + while True: + try: + work_id = self._work_ids.get(block=False) + except queue.Empty: + return + else: + work_item = self._pending_work_items[work_id] + + if work_item.future.cancelled(): + with work_item.future._condition: + work_item.future._condition.notify_all() + work_item.completion_tracker.add_cancelled() + continue + else: + with work_item.future._condition: + work_item.future._state = RUNNING + + self._call_queue.put(_CallItem(work_id, work_item.call), + block=True) + if self._call_queue.full(): + return + + def _result(self): + while True: + self._add_call_item_to_queue() + try: + result_item = self._result_queue.get(block=True, + timeout=0.1) + except queue.Empty: + if self._shutdown_thread and not self._pending_work_items: + self._shutdown_process_event.set() + return + else: + work_item = self._pending_work_items[result_item.work_id] + del self._pending_work_items[result_item.work_id] + + if result_item.exception: + set_future_exception(work_item.future, + work_item.completion_tracker, + result_item.exception) + else: + set_future_result(work_item.future, + work_item.completion_tracker, + result_item.result) + + def _adjust_process_count(self): + if self._queue_management_thread is None: + self._queue_management_thread = threading.Thread( + target=self._result) + self._queue_management_thread.daemon = True + self._queue_management_thread.start() + + for _ in range(len(self._processes), self._max_processes): + p = multiprocessing.Process( + target=_process_worker, + args=(self._call_queue, + self._result_queue, + self._shutdown_process_event)) + p.daemon = True + p.start() + self._processes.add(p) + + def run_to_futures(self, calls, timeout=None, return_when=ALL_COMPLETED): + with self._shutdown_lock: + if self._shutdown_thread: + raise RuntimeError('cannot run new futures after shutdown') + + futures = [] + event_sink = ThreadEventSink() + self._queue_count + for index, call in enumerate(calls): + f = Future(index) + self._pending_work_items[self._queue_count] = _WorkItem( + call, f, event_sink) + self._work_ids.put(self._queue_count) + futures.append(f) + self._queue_count += 1 + + self._adjust_process_count() + fl = FutureList(futures, event_sink) + fl.wait(timeout=timeout, return_when=return_when) + return fl + + def shutdown(self): + with self._shutdown_lock: + self._shutdown_thread = True diff --git a/python3/futures/thread.py b/python3/futures/thread.py new file mode 100644 index 0000000..9e3275f --- /dev/null +++ b/python3/futures/thread.py @@ -0,0 +1,89 @@ +#!/usr/bin/env python + +from futures._base import (PENDING, RUNNING, CANCELLED, + CANCELLED_AND_NOTIFIED, FINISHED, + ALL_COMPLETED, + LOGGER, + set_future_exception, set_future_result, + Executor, Future, FutureList, ThreadEventSink) +import queue +import threading + +class _WorkItem(object): + def __init__(self, call, future, completion_tracker): + self.call = call + self.future = future + self.completion_tracker = completion_tracker + + def run(self): + with self.future._condition: + if self.future._state == PENDING: + self.future._state = RUNNING + elif self.future._state == CANCELLED: + with self.completion_tracker._condition: + self.future._state = CANCELLED_AND_NOTIFIED + self.completion_tracker.add_cancelled() + return + else: + LOGGER.critical('Future %s in unexpected state: %d', + id(self.future), + self.future._state) + return + + try: + result = self.call() + except BaseException as e: + set_future_exception(self.future, self.completion_tracker, e) + else: + set_future_result(self.future, self.completion_tracker, result) + +class ThreadPoolExecutor(Executor): + def __init__(self, max_threads): + self._max_threads = max_threads + self._work_queue = queue.Queue() + self._threads = set() + self._shutdown = False + self._shutdown_lock = threading.Lock() + + def _worker(self): + try: + while True: + try: + work_item = self._work_queue.get(block=True, timeout=0.1) + except queue.Empty: + if self._shutdown: + return + else: + work_item.run() + except BaseException as e: + LOGGER.critical('Exception in worker', exc_info=True) + + def _adjust_thread_count(self): + for _ in range(len(self._threads), + min(self._max_threads, self._work_queue.qsize())): + t = threading.Thread(target=self._worker) + t.daemon = True + t.start() + self._threads.add(t) + + def run_to_futures(self, calls, timeout=None, return_when=ALL_COMPLETED): + with self._shutdown_lock: + if self._shutdown: + raise RuntimeError('cannot run new futures after shutdown') + + futures = [] + event_sink = ThreadEventSink() + for index, call in enumerate(calls): + f = Future(index) + w = _WorkItem(call, f, event_sink) + self._work_queue.put(w) + futures.append(f) + + self._adjust_thread_count() + fl = FutureList(futures, event_sink) + fl.wait(timeout=timeout, return_when=return_when) + return fl + + def shutdown(self): + with self._shutdown_lock: + self._shutdown = True diff --git a/python3/primes.py b/python3/primes.py new file mode 100644 index 0000000..7e83ea0 --- /dev/null +++ b/python3/primes.py @@ -0,0 +1,44 @@ +import futures +import math +import time + +PRIMES = [ + 112272535095293, + 112582705942171, + 112272535095293, + 115280095190773, + 115797848077099] + +def is_prime(n): + if n % 2 == 0: + return False + + sqrt_n = int(math.floor(math.sqrt(n))) + for i in range(3, sqrt_n + 1, 2): + if n % i == 0: + return False + return True + +def sequential(): + return list(map(is_prime, PRIMES)) + +def with_process_pool_executor(): + with futures.ProcessPoolExecutor(10) as executor: + return list(executor.map(is_prime, PRIMES)) + +def with_thread_pool_executor(): + with futures.ThreadPoolExecutor(10) as executor: + return list(executor.map(is_prime, PRIMES)) + +def main(): + for name, fn in [('sequential', sequential), + ('processes', with_process_pool_executor), + ('threads', with_thread_pool_executor)]: + print('%s: ' % name.ljust(12), end='') + start = time.time() + if fn() != [True] * len(PRIMES): + print('failed') + else: + print('%.2f seconds' % (time.time() - start)) + +main()
\ No newline at end of file diff --git a/python3/test_futures.py b/python3/test_futures.py new file mode 100644 index 0000000..0572f81 --- /dev/null +++ b/python3/test_futures.py @@ -0,0 +1,798 @@ +import test.support +from test.support import verbose + +import unittest +import threading +import time +import multiprocessing + +import futures +import futures._base +from futures._base import ( + PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future) + +def create_future(state=PENDING, exception=None, result=None): + f = Future(0) + f._state = state + f._exception = exception + f._result = result + return f + +PENDING_FUTURE = create_future(state=PENDING) +RUNNING_FUTURE = create_future(state=RUNNING) +CANCELLED_FUTURE = create_future(state=CANCELLED) +CANCELLED_AND_NOTIFIED_FUTURE = create_future(state=CANCELLED_AND_NOTIFIED) +EXCEPTION_FUTURE = create_future(state=FINISHED, exception=IOError()) +SUCCESSFUL_FUTURE = create_future(state=FINISHED, result=42) + +class Call(object): + CALL_LOCKS = {} + def __init__(self, manual_finish=False, result=42): + called_event = multiprocessing.Event() + can_finish = multiprocessing.Event() + + self._result = result + self._called_event_id = id(called_event) + self._can_finish_event_id = id(can_finish) + + self.CALL_LOCKS[self._called_event_id] = called_event + self.CALL_LOCKS[self._can_finish_event_id] = can_finish + + if not manual_finish: + self._can_finish.set() + + @property + def _can_finish(self): + return self.CALL_LOCKS[self._can_finish_event_id] + + @property + def _called_event(self): + return self.CALL_LOCKS[self._called_event_id] + + def wait_on_called(self): + self._called_event.wait() + + def set_can(self): + self._can_finish.set() + + def called(self): + return self._called_event.is_set() + + def __call__(self): + if self._called_event.is_set(): print('called twice') + + self._called_event.set() + self._can_finish.wait() + return self._result + + def close(self): + del self.CALL_LOCKS[self._called_event_id] + del self.CALL_LOCKS[self._can_finish_event_id] + +class ExceptionCall(Call): + def __call__(self): + assert not self._called_event.is_set(), 'already called' + + self._called_event.set() + self._can_finish.wait() + raise ZeroDivisionError() + +class ExecutorShutdownTest(unittest.TestCase): + def test_run_after_shutdown(self): + call1 = Call() + try: + self.executor.shutdown() + self.assertRaises(RuntimeError, + self.executor.run_to_futures, + [call1]) + finally: + call1.close() + + def _start_some_futures(self): + call1 = Call(manual_finish=True) + call2 = Call(manual_finish=True) + call3 = Call(manual_finish=True) + + try: + self.executor.run_to_futures([call1, call2, call3], + return_when=futures.RETURN_IMMEDIATELY) + + call1.wait_on_called() + call2.wait_on_called() + call3.wait_on_called() + + call1.set_can() + call2.set_can() + call3.set_can() + finally: + call1.close() + call2.close() + call3.close() + +class ThreadPoolShutdownTest(ExecutorShutdownTest): + def setUp(self): + self.executor = futures.ThreadPoolExecutor(max_threads=5) + + def tearDown(self): + self.executor.shutdown() + + def test_threads_terminate(self): + self._start_some_futures() + self.assertEqual(len(self.executor._threads), 3) + self.executor.shutdown() + for t in self.executor._threads: + t.join() + + def test_context_manager_shutdown(self): + with futures.ThreadPoolExecutor(max_threads=5) as e: + executor = e + self.assertEqual(list(e.map(abs, range(-5, 5))), + [5, 4, 3, 2, 1, 0, 1, 2, 3, 4]) + + for t in executor._threads: + t.join() + +class ProcessPoolShutdownTest(ExecutorShutdownTest): + def setUp(self): + self.executor = futures.ProcessPoolExecutor(max_processes=5) + + def tearDown(self): + self.executor.shutdown() + + def test_processes_terminate(self): + self._start_some_futures() + self.assertEqual(len(self.executor._processes), 5) + self.executor.shutdown() + for p in self.executor._processes: + p.join() + + def test_context_manager_shutdown(self): + with futures.ProcessPoolExecutor(max_processes=5) as e: + executor = e + self.assertEqual(list(e.map(abs, range(-5, 5))), + [5, 4, 3, 2, 1, 0, 1, 2, 3, 4]) + + for p in self.executor._processes: + p.join() + +class WaitsTest(unittest.TestCase): + def test_concurrent_waits(self): + def wait_for_ALL_COMPLETED(): + fs.wait(return_when=futures.ALL_COMPLETED) + self.assertTrue(f1.done()) + self.assertTrue(f2.done()) + self.assertTrue(f3.done()) + self.assertTrue(f4.done()) + all_completed.release() + + def wait_for_FIRST_COMPLETED(): + fs.wait(return_when=futures.FIRST_COMPLETED) + self.assertTrue(f1.done()) + self.assertFalse(f2.done()) # XXX + self.assertFalse(f3.done()) + self.assertFalse(f4.done()) + first_completed.release() + + def wait_for_FIRST_EXCEPTION(): + fs.wait(return_when=futures.FIRST_EXCEPTION) + self.assertTrue(f1.done()) + self.assertTrue(f2.done()) + self.assertFalse(f3.done()) # XXX + self.assertFalse(f4.done()) + first_exception.release() + + all_completed = threading.Semaphore(0) + first_completed = threading.Semaphore(0) + first_exception = threading.Semaphore(0) + + call1 = Call(manual_finish=True) + call2 = ExceptionCall(manual_finish=True) + call3 = Call(manual_finish=True) + call4 = Call() + + try: + fs = self.executor.run_to_futures( + [call1, call2, call3, call4], + return_when=futures.RETURN_IMMEDIATELY) + f1, f2, f3, f4 = fs + + threads = [] + for wait_test in [wait_for_ALL_COMPLETED, + wait_for_FIRST_COMPLETED, + wait_for_FIRST_EXCEPTION]: + t = threading.Thread(target=wait_test) + t.start() + threads.append(t) + + time.sleep(1) # give threads enough time to execute wait + + call1.set_can() + first_completed.acquire() + call2.set_can() + first_exception.acquire() + call3.set_can() + all_completed.acquire() + + self.executor.shutdown() + finally: + call1.close() + call2.close() + call3.close() + call4.close() + +class ThreadPoolWaitTests(WaitsTest): + def setUp(self): + self.executor = futures.ThreadPoolExecutor(max_threads=1) + + def tearDown(self): + self.executor.shutdown() + +class ProcessPoolWaitTests(WaitsTest): + def setUp(self): + self.executor = futures.ProcessPoolExecutor(max_processes=1) + + def tearDown(self): + self.executor.shutdown() + +class CancelTests(unittest.TestCase): + def test_cancel_states(self): + call1 = Call(manual_finish=True) + call2 = Call() + call3 = Call() + call4 = Call() + + try: + fs = self.executor.run_to_futures( + [call1, call2, call3, call4], + return_when=futures.RETURN_IMMEDIATELY) + f1, f2, f3, f4 = fs + + call1.wait_on_called() + self.assertEqual(f1.cancel(), False) + self.assertEqual(f2.cancel(), True) + self.assertEqual(f4.cancel(), True) + self.assertEqual(f1.cancelled(), False) + self.assertEqual(f2.cancelled(), True) + self.assertEqual(f3.cancelled(), False) + self.assertEqual(f4.cancelled(), True) + self.assertEqual(f1.done(), False) + self.assertEqual(f2.done(), True) + self.assertEqual(f3.done(), False) + self.assertEqual(f4.done(), True) + + call1.set_can() + fs.wait(return_when=futures.ALL_COMPLETED) + self.assertEqual(f1.result(), 42) + self.assertRaises(futures.CancelledError, f2.result) + self.assertRaises(futures.CancelledError, f2.exception) + self.assertEqual(f3.result(), 42) + self.assertRaises(futures.CancelledError, f4.result) + self.assertRaises(futures.CancelledError, f4.exception) + + self.assertEqual(call2.called(), False) + self.assertEqual(call4.called(), False) + finally: + call1.close() + call2.close() + call3.close() + call4.close() + + def test_wait_for_individual_cancel(self): + def end_call(): + time.sleep(1) + f2.cancel() + call1.set_can() + + call1 = Call(manual_finish=True) + call2 = Call() + + try: + fs = self.executor.run_to_futures( + [call1, call2], + return_when=futures.RETURN_IMMEDIATELY) + f1, f2 = fs + + call1.wait_on_called() + t = threading.Thread(target=end_call) + t.start() + self.assertRaises(futures.CancelledError, f2.result) + self.assertRaises(futures.CancelledError, f2.exception) + t.join() + finally: + call1.close() + call2.close() + + def test_wait_with_already_cancelled_futures(self): + call1 = Call(manual_finish=True) + call2 = Call() + call3 = Call() + call4 = Call() + + try: + fs = self.executor.run_to_futures( + [call1, call2, call3, call4], + return_when=futures.RETURN_IMMEDIATELY) + f1, f2, f3, f4 = fs + + call1.wait_on_called() + self.assertTrue(f2.cancel()) + self.assertTrue(f3.cancel()) + call1.set_can() + time.sleep(0.1) + + fs.wait(return_when=futures.ALL_COMPLETED) + finally: + call1.close() + call2.close() + call3.close() + call4.close() + + def test_cancel_all(self): + call1 = Call(manual_finish=True) + call2 = Call() + call3 = Call() + call4 = Call() + + try: + fs = self.executor.run_to_futures( + [call1, call2, call3, call4], + return_when=futures.RETURN_IMMEDIATELY) + f1, f2, f3, f4 = fs + + call1.wait_on_called() + self.assertRaises(futures.TimeoutError, fs.cancel, timeout=0) + call1.set_can() + fs.cancel() + + self.assertFalse(f1.cancelled()) + self.assertTrue(f2.cancelled()) + self.assertTrue(f3.cancelled()) + self.assertTrue(f4.cancelled()) + finally: + call1.close() + call2.close() + call3.close() + call4.close() + +class ThreadPoolCancelTests(CancelTests): + def setUp(self): + self.executor = futures.ThreadPoolExecutor(max_threads=1) + + def tearDown(self): + self.executor.shutdown() + +class ProcessPoolCancelTests(WaitsTest): + def setUp(self): + self.executor = futures.ProcessPoolExecutor(max_processes=1) + + def tearDown(self): + self.executor.shutdown() + +class ExecutorTest(unittest.TestCase): + # Executor.shutdown() and context manager usage is tested by + # ExecutorShutdownTest. + def test_run_to_futures(self): + call1 = Call(result=1) + call2 = Call(result=2) + call3 = Call(manual_finish=True) + call4 = Call() + call5 = Call() + + try: + f1, f2, f3, f4, f5 = self.executor.run_to_futures( + [call1, call2, call3, call4, call5], + return_when=futures.RETURN_IMMEDIATELY) + + call3.wait_on_called() + + self.assertTrue(f1.done()) + self.assertFalse(f1.running()) + self.assertEqual(f1.index, 0) + + self.assertTrue(f2.done()) + self.assertFalse(f2.running()) + self.assertEqual(f2.index, 1) + + self.assertFalse(f3.done()) + self.assertTrue(f3.running()) + self.assertEqual(f3.index, 2) + + # ProcessPoolExecutor may mark some futures as running before they + # actually are. + self.assertFalse(f4.done()) + self.assertEqual(f4.index, 3) + + self.assertFalse(f5.done()) + self.assertEqual(f5.index, 4) + finally: + call1.close() + call2.close() + call3.close() + call4.close() + call5.close() + + def test_run_to_results(self): + call1 = Call(result=1) + call2 = Call(result=2) + call3 = Call(result=3) + try: + self.assertEqual( + list(self.executor.run_to_results([call1, call2, call3])), + [1, 2, 3]) + finally: + call1.close() + call2.close() + call3.close() + + def test_run_to_results_exception(self): + call1 = Call(result=1) + call2 = Call(result=2) + call3 = ExceptionCall() + try: + i = self.executor.run_to_results([call1, call2, call3]) + + self.assertEqual(i.__next__(), 1) + self.assertEqual(i.__next__(), 2) + self.assertRaises(ZeroDivisionError, i.__next__) + finally: + call1.close() + call2.close() + call3.close() + + def test_run_to_results_timeout(self): + call1 = Call(result=1) + call2 = Call(result=2) + call3 = Call(manual_finish=True) + + try: + i = self.executor.run_to_results([call1, call2, call3], timeout=1) + self.assertEqual(i.__next__(), 1) + self.assertEqual(i.__next__(), 2) + self.assertRaises(futures.TimeoutError, i.__next__) + call3.set_can() + finally: + call1.close() + call2.close() + call3.close() + + def test_map(self): + self.assertEqual( + list(self.executor.map(pow, range(10), range(10))), + list(map(pow, range(10), range(10)))) + + def test_map_exception(self): + i = self.executor.map(divmod, [1, 1, 1, 1], [2, 3, 0, 5]) + self.assertEqual(i.__next__(), (0, 1)) + self.assertEqual(i.__next__(), (0, 1)) + self.assertRaises(ZeroDivisionError, i.__next__) + +class ThreadPoolExecutorTest(ExecutorTest): + def setUp(self): + self.executor = futures.ThreadPoolExecutor(max_threads=1) + + def tearDown(self): + self.executor.shutdown() + +class ProcessPoolExecutorTest(ExecutorTest): + def setUp(self): + self.executor = futures.ProcessPoolExecutor(max_processes=1) + + def tearDown(self): + self.executor.shutdown() + +class FutureTests(unittest.TestCase): + # Future.index() is tested by ExecutorTest + # Future.cancel() is further tested by CancelTests. + + def test_repr(self): + self.assertEqual(repr(PENDING_FUTURE), '<Future state=pending>') + self.assertEqual(repr(RUNNING_FUTURE), '<Future state=running>') + self.assertEqual(repr(CANCELLED_FUTURE), '<Future state=cancelled>') + self.assertEqual(repr(CANCELLED_AND_NOTIFIED_FUTURE), + '<Future state=cancelled>') + self.assertEqual(repr(EXCEPTION_FUTURE), + '<Future state=finished raised IOError>') + self.assertEqual(repr(SUCCESSFUL_FUTURE), + '<Future state=finished returned int>') + + create_future + + def test_cancel(self): + f1 = create_future(state=PENDING) + f2 = create_future(state=RUNNING) + f3 = create_future(state=CANCELLED) + f4 = create_future(state=CANCELLED_AND_NOTIFIED) + f5 = create_future(state=FINISHED, exception=IOError()) + f6 = create_future(state=FINISHED, result=5) + + self.assertTrue(f1.cancel()) + self.assertEquals(f1._state, CANCELLED) + + self.assertFalse(f2.cancel()) + self.assertEquals(f2._state, RUNNING) + + self.assertTrue(f3.cancel()) + self.assertEquals(f3._state, CANCELLED) + + self.assertTrue(f4.cancel()) + self.assertEquals(f4._state, CANCELLED_AND_NOTIFIED) + + self.assertFalse(f5.cancel()) + self.assertEquals(f5._state, FINISHED) + + self.assertFalse(f6.cancel()) + self.assertEquals(f6._state, FINISHED) + + def test_cancelled(self): + self.assertFalse(PENDING_FUTURE.cancelled()) + self.assertFalse(RUNNING_FUTURE.cancelled()) + self.assertTrue(CANCELLED_FUTURE.cancelled()) + self.assertTrue(CANCELLED_AND_NOTIFIED_FUTURE.cancelled()) + self.assertFalse(EXCEPTION_FUTURE.cancelled()) + self.assertFalse(SUCCESSFUL_FUTURE.cancelled()) + + def test_done(self): + self.assertFalse(PENDING_FUTURE.done()) + self.assertFalse(RUNNING_FUTURE.done()) + self.assertTrue(CANCELLED_FUTURE.done()) + self.assertTrue(CANCELLED_AND_NOTIFIED_FUTURE.done()) + self.assertTrue(EXCEPTION_FUTURE.done()) + self.assertTrue(SUCCESSFUL_FUTURE.done()) + + def test_running(self): + self.assertFalse(PENDING_FUTURE.running()) + self.assertTrue(RUNNING_FUTURE.running()) + self.assertFalse(CANCELLED_FUTURE.running()) + self.assertFalse(CANCELLED_AND_NOTIFIED_FUTURE.running()) + self.assertFalse(EXCEPTION_FUTURE.running()) + self.assertFalse(SUCCESSFUL_FUTURE.running()) + + def test_result_with_timeout(self): + self.assertRaises(futures.TimeoutError, + PENDING_FUTURE.result, timeout=0) + self.assertRaises(futures.TimeoutError, + RUNNING_FUTURE.result, timeout=0) + self.assertRaises(futures.CancelledError, + CANCELLED_FUTURE.result, timeout=0) + self.assertRaises(futures.CancelledError, + CANCELLED_AND_NOTIFIED_FUTURE.result, timeout=0) + self.assertRaises(IOError, EXCEPTION_FUTURE.result, timeout=0) + self.assertEqual(SUCCESSFUL_FUTURE.result(timeout=0), 42) + + def test_result_with_success(self): + def notification(): + time.sleep(0.1) + with f1._condition: + f1._state = FINISHED + f1._result = 42 + f1._condition.notify_all() + + f1 = create_future(state=PENDING) + t = threading.Thread(target=notification) + t.start() + + self.assertEquals(f1.result(timeout=1), 42) + + def test_result_with_cancel(self): + def notification(): + time.sleep(0.1) + with f1._condition: + f1._state = CANCELLED + f1._condition.notify_all() + + f1 = create_future(state=PENDING) + t = threading.Thread(target=notification) + t.start() + + self.assertRaises(futures.CancelledError, f1.result, timeout=1) + + def test_exception_with_timeout(self): + self.assertRaises(futures.TimeoutError, + PENDING_FUTURE.exception, timeout=0) + self.assertRaises(futures.TimeoutError, + RUNNING_FUTURE.exception, timeout=0) + self.assertRaises(futures.CancelledError, + CANCELLED_FUTURE.exception, timeout=0) + self.assertRaises(futures.CancelledError, + CANCELLED_AND_NOTIFIED_FUTURE.exception, timeout=0) + self.assertTrue(isinstance(EXCEPTION_FUTURE.exception(timeout=0), + IOError)) + self.assertEqual(SUCCESSFUL_FUTURE.exception(timeout=0), None) + + def test_exception_with_success(self): + def notification(): + time.sleep(0.1) + with f1._condition: + f1._state = FINISHED + f1._exception = IOError() + f1._condition.notify_all() + + f1 = create_future(state=PENDING) + t = threading.Thread(target=notification) + t.start() + + self.assertTrue(isinstance(f1.exception(timeout=1), IOError)) + +class FutureListTests(unittest.TestCase): + # FutureList.wait() is further tested by WaitsTest. + # FutureList.cancel() is tested by CancelTests. + def test_wait_RETURN_IMMEDIATELY(self): + f = futures.FutureList(futures=None, event_sink=None) + f.wait(return_when=futures.RETURN_IMMEDIATELY) + + def test_wait_timeout(self): + f = futures.FutureList([PENDING_FUTURE], + futures._base.ThreadEventSink()) + + for t in [futures.FIRST_COMPLETED, + futures.FIRST_EXCEPTION, + futures.ALL_COMPLETED]: + f.wait(timeout=0.1, return_when=t) + self.assertFalse(PENDING_FUTURE.done()) + + def test_wait_all_done(self): + f = futures.FutureList([CANCELLED_FUTURE, + CANCELLED_AND_NOTIFIED_FUTURE, + SUCCESSFUL_FUTURE, + EXCEPTION_FUTURE], + futures._base.ThreadEventSink()) + + f.wait(return_when=futures.ALL_COMPLETED) + + def test_filters(self): + fs = [PENDING_FUTURE, + RUNNING_FUTURE, + CANCELLED_FUTURE, + CANCELLED_AND_NOTIFIED_FUTURE, + EXCEPTION_FUTURE, + SUCCESSFUL_FUTURE] + f = futures.FutureList(fs, None) + + self.assertEqual(list(f.running_futures()), [RUNNING_FUTURE]) + self.assertEqual(list(f.cancelled_futures()), + [CANCELLED_FUTURE, + CANCELLED_AND_NOTIFIED_FUTURE]) + self.assertEqual(list(f.done_futures()), + [CANCELLED_FUTURE, + CANCELLED_AND_NOTIFIED_FUTURE, + EXCEPTION_FUTURE, + SUCCESSFUL_FUTURE]) + self.assertEqual(list(f.successful_futures()), + [SUCCESSFUL_FUTURE]) + self.assertEqual(list(f.exception_futures()), + [EXCEPTION_FUTURE]) + + def test_has_running_futures(self): + self.assertFalse( + futures.FutureList([PENDING_FUTURE, + CANCELLED_FUTURE, + CANCELLED_AND_NOTIFIED_FUTURE, + SUCCESSFUL_FUTURE, + EXCEPTION_FUTURE], + None).has_running_futures()) + self.assertTrue( + futures.FutureList([RUNNING_FUTURE], + None).has_running_futures()) + + def test_has_cancelled_futures(self): + self.assertFalse( + futures.FutureList([PENDING_FUTURE, + RUNNING_FUTURE, + SUCCESSFUL_FUTURE, + EXCEPTION_FUTURE], + None).has_cancelled_futures()) + self.assertTrue( + futures.FutureList([CANCELLED_FUTURE], + None).has_cancelled_futures()) + + self.assertTrue( + futures.FutureList([CANCELLED_AND_NOTIFIED_FUTURE], + None).has_cancelled_futures()) + + def test_has_done_futures(self): + self.assertFalse( + futures.FutureList([PENDING_FUTURE, + RUNNING_FUTURE], + None).has_done_futures()) + self.assertTrue( + futures.FutureList([CANCELLED_FUTURE], + None).has_done_futures()) + + self.assertTrue( + futures.FutureList([CANCELLED_AND_NOTIFIED_FUTURE], + None).has_done_futures()) + + self.assertTrue( + futures.FutureList([EXCEPTION_FUTURE], + None).has_done_futures()) + + self.assertTrue( + futures.FutureList([SUCCESSFUL_FUTURE], + None).has_done_futures()) + + def test_has_successful_futures(self): + self.assertFalse( + futures.FutureList([PENDING_FUTURE, + RUNNING_FUTURE, + CANCELLED_FUTURE, + CANCELLED_AND_NOTIFIED_FUTURE, + EXCEPTION_FUTURE], + None).has_successful_futures()) + + self.assertTrue( + futures.FutureList([SUCCESSFUL_FUTURE], + None).has_successful_futures()) + + def test_has_exception_futures(self): + self.assertFalse( + futures.FutureList([PENDING_FUTURE, + RUNNING_FUTURE, + CANCELLED_FUTURE, + CANCELLED_AND_NOTIFIED_FUTURE, + SUCCESSFUL_FUTURE], + None).has_exception_futures()) + + self.assertTrue( + futures.FutureList([EXCEPTION_FUTURE], + None).has_exception_futures()) + + def test_get_item(self): + fs = [PENDING_FUTURE, RUNNING_FUTURE, CANCELLED_FUTURE] + f = futures.FutureList(fs, None) + self.assertEqual(f[0], PENDING_FUTURE) + self.assertEqual(f[1], RUNNING_FUTURE) + self.assertEqual(f[2], CANCELLED_FUTURE) + self.assertRaises(IndexError, f.__getitem__, 3) + + def test_len(self): + f = futures.FutureList([PENDING_FUTURE, + RUNNING_FUTURE, + CANCELLED_FUTURE], + None) + self.assertEqual(len(f), 3) + + def test_iter(self): + fs = [PENDING_FUTURE, RUNNING_FUTURE, CANCELLED_FUTURE] + f = futures.FutureList(fs, None) + self.assertEqual(list(iter(f)), fs) + + def test_contains(self): + f = futures.FutureList([PENDING_FUTURE, + RUNNING_FUTURE], + None) + self.assertTrue(PENDING_FUTURE in f) + self.assertTrue(RUNNING_FUTURE in f) + self.assertFalse(CANCELLED_FUTURE in f) + + def test_repr(self): + pending = create_future(state=PENDING) + cancelled = create_future(state=CANCELLED) + cancelled2 = create_future(state=CANCELLED_AND_NOTIFIED) + running = create_future(state=RUNNING) + finished = create_future(state=FINISHED) + + f = futures.FutureList( + [PENDING_FUTURE] * 4 + [CANCELLED_FUTURE] * 2 + + [CANCELLED_AND_NOTIFIED_FUTURE] + + [RUNNING_FUTURE] * 2 + + [SUCCESSFUL_FUTURE, EXCEPTION_FUTURE] * 3, + None) + + self.assertEqual(repr(f), + '<FutureList #futures=15 ' + '[#pending=4 #cancelled=3 #running=2 #finished=6]>') + +def test_main(): + test.support.run_unittest(ProcessPoolCancelTests, + ThreadPoolCancelTests, + ProcessPoolExecutorTest, + ThreadPoolExecutorTest, + ProcessPoolWaitTests, + ThreadPoolWaitTests, + FutureTests, + FutureListTests, + ProcessPoolShutdownTest, + ThreadPoolShutdownTest) + +if __name__ == "__main__": + test_main()
\ No newline at end of file |