summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorbrian quinlan <brian.quinlan@gmail.com>2009-05-23 09:18:19 +0000
committerbrian quinlan <brian.quinlan@gmail.com>2009-05-23 09:18:19 +0000
commite1c6c9758b70b3e9e1630b5e8545a9a1e3de7368 (patch)
tree2dc7cdb8c8eda8f97696e5711831143e4d3fab0d
parente63029bf8fa0b37f42add3d6914509dd79c81cb6 (diff)
downloadfutures-e1c6c9758b70b3e9e1630b5e8545a9a1e3de7368.tar.gz
Seperate into python2 and python3 directories
-rw-r--r--python2/crawl.py (renamed from crawl.py)0
-rw-r--r--python2/futures/__init__.py (renamed from futures/__init__.py)0
-rw-r--r--python2/futures/_base.py (renamed from futures/_base.py)0
-rw-r--r--python2/futures/process.py (renamed from futures/process.py)0
-rw-r--r--python2/futures/thread.py (renamed from futures/thread.py)0
-rw-r--r--python2/primes.py (renamed from primes.py)0
-rw-r--r--python2/test_futures.py (renamed from test_futures.py)0
-rw-r--r--python3/crawl.py57
-rw-r--r--python3/futures/__init__.py6
-rw-r--r--python3/futures/_base.py482
-rw-r--r--python3/futures/process.py155
-rw-r--r--python3/futures/thread.py89
-rw-r--r--python3/primes.py44
-rw-r--r--python3/test_futures.py798
14 files changed, 1631 insertions, 0 deletions
diff --git a/crawl.py b/python2/crawl.py
index 10e35c3..10e35c3 100644
--- a/crawl.py
+++ b/python2/crawl.py
diff --git a/futures/__init__.py b/python2/futures/__init__.py
index 5f599ad..5f599ad 100644
--- a/futures/__init__.py
+++ b/python2/futures/__init__.py
diff --git a/futures/_base.py b/python2/futures/_base.py
index 19cabe8..19cabe8 100644
--- a/futures/_base.py
+++ b/python2/futures/_base.py
diff --git a/futures/process.py b/python2/futures/process.py
index ad6dc6d..ad6dc6d 100644
--- a/futures/process.py
+++ b/python2/futures/process.py
diff --git a/futures/thread.py b/python2/futures/thread.py
index 9e3275f..9e3275f 100644
--- a/futures/thread.py
+++ b/python2/futures/thread.py
diff --git a/primes.py b/python2/primes.py
index 7e83ea0..7e83ea0 100644
--- a/primes.py
+++ b/python2/primes.py
diff --git a/test_futures.py b/python2/test_futures.py
index 0572f81..0572f81 100644
--- a/test_futures.py
+++ b/python2/test_futures.py
diff --git a/python3/crawl.py b/python3/crawl.py
new file mode 100644
index 0000000..10e35c3
--- /dev/null
+++ b/python3/crawl.py
@@ -0,0 +1,57 @@
+import datetime
+import functools
+import futures.thread
+import time
+import timeit
+import urllib.request
+
+URLS = ['http://www.google.com/',
+ 'http://www.apple.com/',
+ 'http://www.ibm.com',
+ 'http://www.thisurlprobablydoesnotexist.com',
+ 'http://www.slashdot.org/',
+ 'http://www.python.org/',
+ 'http://www.sweetapp.com/'] * 5
+
+def load_url(url, timeout):
+ return urllib.request.urlopen(url, timeout=timeout).read()
+
+def download_urls_sequential(urls, timeout=60):
+ url_to_content = {}
+ for url in urls:
+ try:
+ url_to_content[url] = load_url(url, timeout=timeout)
+ except:
+ pass
+ return url_to_content
+
+def download_urls_with_executor(urls, executor, timeout=60):
+ try:
+ url_to_content = {}
+ fs = executor.run_to_futures(
+ (functools.partial(load_url, url, timeout) for url in urls),
+ timeout=timeout)
+ for future in fs.successful_futures():
+ url = urls[future.index]
+ url_to_content[url] = future.result()
+ return url_to_content
+ finally:
+ executor.shutdown()
+
+def main():
+ for name, fn in [('sequential',
+ functools.partial(download_urls_sequential, URLS)),
+ ('processes',
+ functools.partial(download_urls_with_executor,
+ URLS,
+ futures.ProcessPoolExecutor(10))),
+ ('threads',
+ functools.partial(download_urls_with_executor,
+ URLS,
+ futures.ThreadPoolExecutor(10)))]:
+ print('%s: ' % name.ljust(12), end='')
+ start = time.time()
+ fn()
+ print('%.2f seconds' % (time.time() - start))
+
+main()
diff --git a/python3/futures/__init__.py b/python3/futures/__init__.py
new file mode 100644
index 0000000..5f599ad
--- /dev/null
+++ b/python3/futures/__init__.py
@@ -0,0 +1,6 @@
+from futures._base import (FIRST_COMPLETED, FIRST_EXCEPTION,
+ ALL_COMPLETED, RETURN_IMMEDIATELY,
+ CancelledError, TimeoutError,
+ Future, FutureList)
+from futures.thread import ThreadPoolExecutor
+from futures.process import ProcessPoolExecutor
diff --git a/python3/futures/_base.py b/python3/futures/_base.py
new file mode 100644
index 0000000..19cabe8
--- /dev/null
+++ b/python3/futures/_base.py
@@ -0,0 +1,482 @@
+import functools
+import logging
+import threading
+import time
+
+FIRST_COMPLETED = 0
+FIRST_EXCEPTION = 1
+ALL_COMPLETED = 2
+RETURN_IMMEDIATELY = 3
+
+# Possible future states
+PENDING = 0
+RUNNING = 1
+CANCELLED = 2 # The future was cancelled...
+CANCELLED_AND_NOTIFIED = 3 # ...and .add_cancelled() was called.
+FINISHED = 4
+
+FUTURE_STATES = [
+ PENDING,
+ RUNNING,
+ CANCELLED,
+ CANCELLED_AND_NOTIFIED,
+ FINISHED
+]
+
+_STATE_TO_DESCRIPTION_MAP = {
+ PENDING: "pending",
+ RUNNING: "running",
+ CANCELLED: "cancelled",
+ CANCELLED_AND_NOTIFIED: "cancelled",
+ FINISHED: "finished"
+}
+
+LOGGER = logging.getLogger("futures")
+_handler = logging.StreamHandler()
+LOGGER.addHandler(_handler)
+del _handler
+
+def set_future_exception(future, event_sink, exception):
+ with future._condition:
+ future._exception = exception
+ with event_sink._condition:
+ future._state = FINISHED
+ event_sink.add_exception()
+ future._condition.notify_all()
+
+def set_future_result(future, event_sink, result):
+ with future._condition:
+ future._result = result
+ with event_sink._condition:
+ future._state = FINISHED
+ event_sink.add_result()
+ future._condition.notify_all()
+
+class Error(Exception):
+ pass
+
+class CancelledError(Error):
+ pass
+
+class TimeoutError(Error):
+ pass
+
+class Future(object):
+ """Represents the result of an asynchronous computation."""
+
+ def __init__(self, index):
+ """Initializes the future. Should not be called by clients."""
+ self._condition = threading.Condition()
+ self._state = PENDING
+ self._result = None
+ self._exception = None
+ self._index = index
+
+ def __repr__(self):
+ with self._condition:
+ if self._state == FINISHED:
+ if self._exception:
+ return '<Future state=%s raised %s>' % (
+ _STATE_TO_DESCRIPTION_MAP[self._state],
+ self._exception.__class__.__name__)
+ else:
+ return '<Future state=%s returned %s>' % (
+ _STATE_TO_DESCRIPTION_MAP[self._state],
+ self._result.__class__.__name__)
+ return '<Future state=%s>' % _STATE_TO_DESCRIPTION_MAP[self._state]
+
+ @property
+ def index(self):
+ """The index of the future in its FutureList."""
+ return self._index
+
+ def cancel(self):
+ """Cancel the future if possible.
+
+ Returns True if the future was cancelled, False otherwise. A future
+ cannot be cancelled if it is running or has already completed.
+ """
+ with self._condition:
+ if self._state in [RUNNING, FINISHED]:
+ return False
+
+ if self._state not in [CANCELLED, CANCELLED_AND_NOTIFIED]:
+ self._state = CANCELLED
+ self._condition.notify_all()
+ return True
+
+ def cancelled(self):
+ """Return True if the future has cancelled."""
+ with self._condition:
+ return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]
+
+ def running(self):
+ with self._condition:
+ return self._state == RUNNING
+
+ def done(self):
+ """Return True of the future was cancelled or finished executing."""
+ with self._condition:
+ return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]
+
+ def __get_result(self):
+ if self._exception:
+ raise self._exception
+ else:
+ return self._result
+
+ def result(self, timeout=None):
+ """Return the result of the call that the future represents.
+
+ Args:
+ timeout: The number of seconds to wait for the result if the future
+ isn't done. If None, then there is no limit on the wait time.
+
+ Returns:
+ The result of the call that the future represents.
+
+ Raises:
+ CancelledError: If the future was cancelled.
+ TimeoutError: If the future didn't finish executing before the given
+ timeout.
+ Exception: If the call raised then that exception will be raised.
+ """
+ with self._condition:
+ if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
+ raise CancelledError()
+ elif self._state == FINISHED:
+ return self.__get_result()
+
+ self._condition.wait(timeout)
+
+ if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
+ raise CancelledError()
+ elif self._state == FINISHED:
+ return self.__get_result()
+ else:
+ raise TimeoutError()
+
+ def exception(self, timeout=None):
+ """Return the exception raised by the call that the future represents.
+
+ Args:
+ timeout: The number of seconds to wait for the exception if the
+ future isn't done. If None, then there is no limit on the wait
+ time.
+
+ Returns:
+ The exception raised by the call that the future represents or None
+ if the call completed without raising.
+
+ Raises:
+ CancelledError: If the future was cancelled.
+ TimeoutError: If the future didn't finish executing before the given
+ timeout.
+ """
+
+ with self._condition:
+ if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
+ raise CancelledError()
+ elif self._state == FINISHED:
+ return self._exception
+
+ self._condition.wait(timeout)
+
+ if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
+ raise CancelledError()
+ elif self._state == FINISHED:
+ return self._exception
+ else:
+ raise TimeoutError()
+
+class _FirstCompletedWaitTracker(object):
+ def __init__(self):
+ self.event = threading.Event()
+
+ def add_result(self):
+ self.event.set()
+
+ def add_exception(self):
+ self.event.set()
+
+ def add_cancelled(self):
+ self.event.set()
+
+class _AllCompletedWaitTracker(object):
+ def __init__(self, pending_calls, stop_on_exception):
+ self.pending_calls = pending_calls
+ self.stop_on_exception = stop_on_exception
+ self.event = threading.Event()
+
+ def add_result(self):
+ self.pending_calls -= 1
+ if not self.pending_calls:
+ self.event.set()
+
+ def add_exception(self):
+ if self.stop_on_exception:
+ self.event.set()
+ else:
+ self.add_result()
+
+ def add_cancelled(self):
+ self.add_result()
+
+class ThreadEventSink(object):
+ def __init__(self):
+ self._condition = threading.Lock()
+ self._waiters = []
+
+ def add(self, e):
+ self._waiters.append(e)
+
+ def remove(self, e):
+ self._waiters.remove(e)
+
+ def add_result(self):
+ for waiter in self._waiters:
+ waiter.add_result()
+
+ def add_exception(self):
+ for waiter in self._waiters:
+ waiter.add_exception()
+
+ def add_cancelled(self):
+ for waiter in self._waiters:
+ waiter.add_cancelled()
+
+class FutureList(object):
+ def __init__(self, futures, event_sink):
+ """Initializes the FutureList. Should not be called by clients."""
+ self._futures = futures
+ self._event_sink = event_sink
+
+ def wait(self, timeout=None, return_when=ALL_COMPLETED):
+ """Wait for the futures in the list to complete.
+
+ Args:
+ timeout: The maximum number of seconds to wait. If None, then there
+ is no limit on the wait time.
+ return_when: Indicates when the method should return. The options
+ are:
+
+ FIRST_COMPLETED - Return when any future finishes or is
+ cancelled.
+ FIRST_EXCEPTION - Return when any future finishes by raising an
+ exception. If no future raises and exception
+ then it is equivalent to ALL_COMPLETED.
+ ALL_COMPLETED - Return when all futures finish or are cancelled.
+ RETURN_IMMEDIATELY - Return without waiting (this is not likely
+ to be a useful option but it is there to
+ be symmetrical with the
+ executor.run_to_futures() method.
+
+ Raises:
+ TimeoutError: If the wait condition wasn't satisfied before the
+ given timeout.
+ """
+ if return_when == RETURN_IMMEDIATELY:
+ return
+
+ with self._event_sink._condition:
+ # Make a quick exit if every future is already done. This check is
+ # necessary because, if every future is in the
+ # CANCELLED_AND_NOTIFIED or FINISHED state then the WaitTracker will
+ # never receive any
+ if all(f._state in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]
+ for f in self):
+ return
+
+ if return_when == FIRST_COMPLETED:
+ completed_tracker = _FirstCompletedWaitTracker()
+ else:
+ # Calculate how many events are expected before every future
+ # is complete. This can be done without holding the futures'
+ # locks because a future cannot transition itself into either
+ # of the states being looked for.
+ pending_count = sum(
+ f._state not in [CANCELLED_AND_NOTIFIED, FINISHED]
+ for f in self)
+
+ if return_when == FIRST_EXCEPTION:
+ completed_tracker = _AllCompletedWaitTracker(
+ pending_count, stop_on_exception=True)
+ elif return_when == ALL_COMPLETED:
+ completed_tracker = _AllCompletedWaitTracker(
+ pending_count, stop_on_exception=False)
+
+ self._event_sink.add(completed_tracker)
+
+ try:
+ completed_tracker.event.wait(timeout)
+ finally:
+ self._event_sink.remove(completed_tracker)
+
+ def cancel(self, timeout=None):
+ """Cancel the futures in the list.
+
+ Args:
+ timeout: The maximum number of seconds to wait. If None, then there
+ is no limit on the wait time.
+
+ Raises:
+ TimeoutError: If all the futures were not finished before the
+ given timeout.
+ """
+ for f in self:
+ f.cancel()
+ self.wait(timeout=timeout, return_when=ALL_COMPLETED)
+ if any(not f.done() for f in self):
+ raise TimeoutError()
+
+ def has_running_futures(self):
+ return any(self.running_futures())
+
+ def has_cancelled_futures(self):
+ return any(self.cancelled_futures())
+
+ def has_done_futures(self):
+ return any(self.done_futures())
+
+ def has_successful_futures(self):
+ return any(self.successful_futures())
+
+ def has_exception_futures(self):
+ return any(self.exception_futures())
+
+ def cancelled_futures(self):
+ return (f for f in self
+ if f._state in [CANCELLED, CANCELLED_AND_NOTIFIED])
+
+ def done_futures(self):
+ return (f for f in self
+ if f._state in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED])
+
+ def successful_futures(self):
+ return (f for f in self
+ if f._state == FINISHED and f._exception is None)
+
+ def exception_futures(self):
+ return (f for f in self
+ if f._state == FINISHED and f._exception is not None)
+
+ def running_futures(self):
+ return (f for f in self if f._state == RUNNING)
+
+ def __len__(self):
+ return len(self._futures)
+
+ def __getitem__(self, i):
+ return self._futures[i]
+
+ def __iter__(self):
+ return iter(self._futures)
+
+ def __contains__(self, future):
+ return future in self._futures
+
+ def __repr__(self):
+ states = {state: 0 for state in FUTURE_STATES}
+ for f in self:
+ states[f._state] += 1
+
+ return ('<FutureList #futures=%d '
+ '[#pending=%d #cancelled=%d #running=%d #finished=%d]>' % (
+ len(self),
+ states[PENDING],
+ states[CANCELLED] + states[CANCELLED_AND_NOTIFIED],
+ states[RUNNING],
+ states[FINISHED]))
+
+class Executor(object):
+ def run_to_futures(self, calls, timeout=None, return_when=ALL_COMPLETED):
+ """Return a list of futures representing the given calls.
+
+ Args:
+ calls: A sequence of callables that take no arguments. These will
+ be bound to Futures and returned.
+ timeout: The maximum number of seconds to wait. If None, then there
+ is no limit on the wait time.
+ return_when: Indicates when the method should return. The options
+ are:
+
+ FIRST_COMPLETED - Return when any future finishes or is
+ cancelled.
+ FIRST_EXCEPTION - Return when any future finishes by raising an
+ exception. If no future raises and exception
+ then it is equivalent to ALL_COMPLETED.
+ ALL_COMPLETED - Return when all futures finish or are cancelled.
+ RETURN_IMMEDIATELY - Return without waiting.
+
+ Returns:
+ A FuturesList containing futures for the given calls.
+ """
+ raise NotImplementedError()
+
+ def run_to_results(self, calls, timeout=None):
+ """Returns a iterator of the results of the given calls.
+
+ Args:
+ calls: A sequence of callables that take no arguments. These will
+ be called and their results returned.
+ timeout: The maximum number of seconds to wait. If None, then there
+ is no limit on the wait time.
+
+ Returns:
+ An iterator over the results of the given calls. Equivalent to:
+ (call() for call in calls) but the calls may be evaluated
+ out-of-order.
+
+ Raises:
+ TimeoutError: If all the given calls were not completed before the
+ given timeout.
+ Exception: If any call() raises.
+ """
+ if timeout is not None:
+ end_time = timeout + time.time()
+
+ fs = self.run_to_futures(calls, return_when=RETURN_IMMEDIATELY)
+
+ try:
+ for future in fs:
+ if timeout is None:
+ yield future.result()
+ else:
+ yield future.result(end_time - time.time())
+ finally:
+ try:
+ fs.cancel(timeout=0)
+ except TimeoutError:
+ pass
+
+ def map(self, func, *iterables, timeout=None):
+ """Returns a iterator equivalent to map(fn, iter).
+
+ Args:
+ func: A callable that will take take as many arguments as there
+ are passed iterables.
+ timeout: The maximum number of seconds to wait. If None, then there
+ is no limit on the wait time.
+
+ Returns:
+ An iterator equivalent to: map(func, *iterables) but the calls may
+ be evaluated out-of-order.
+
+ Raises:
+ TimeoutError: If the entire result iterator could not be generated
+ before the given timeout.
+ Exception: If fn(*args) raises for any values.
+ """
+ calls = [functools.partial(func, *args) for args in zip(*iterables)]
+ return self.run_to_results(calls, timeout)
+
+ def shutdown(self):
+ """Clean-up. No other methods can be called afterwards."""
+ raise NotImplementedError()
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ self.shutdown()
+ return False
diff --git a/python3/futures/process.py b/python3/futures/process.py
new file mode 100644
index 0000000..ad6dc6d
--- /dev/null
+++ b/python3/futures/process.py
@@ -0,0 +1,155 @@
+#!/usr/bin/env python
+
+from futures._base import (PENDING, RUNNING, CANCELLED,
+ CANCELLED_AND_NOTIFIED, FINISHED,
+ ALL_COMPLETED,
+ set_future_exception, set_future_result,
+ Executor, Future, FutureList, ThreadEventSink)
+
+import queue
+import multiprocessing
+import threading
+
+class _WorkItem(object):
+ def __init__(self, call, future, completion_tracker):
+ self.call = call
+ self.future = future
+ self.completion_tracker = completion_tracker
+
+class _ResultItem(object):
+ def __init__(self, work_id, exception=None, result=None):
+ self.work_id = work_id
+ self.exception = exception
+ self.result = result
+
+class _CallItem(object):
+ def __init__(self, work_id, call):
+ self.work_id = work_id
+ self.call = call
+
+def _process_worker(call_queue, result_queue, shutdown):
+ while True:
+ try:
+ call_item = call_queue.get(block=True, timeout=0.1)
+ except queue.Empty:
+ if shutdown.is_set():
+ return
+ else:
+ try:
+ r = call_item.call()
+ except BaseException as e:
+ result_queue.put(_ResultItem(call_item.work_id,
+ exception=e))
+ else:
+ result_queue.put(_ResultItem(call_item.work_id,
+ result=r))
+
+class ProcessPoolExecutor(Executor):
+ def __init__(self, max_processes=None):
+ if max_processes is None:
+ max_processes = multiprocessing.cpu_count()
+
+ self._max_processes = max_processes
+ # Make the call queue slightly larger than the number of processes to
+ # prevent the worker processes from starving but to make future.cancel()
+ # responsive.
+ self._call_queue = multiprocessing.Queue(self._max_processes + 1)
+ self._result_queue = multiprocessing.Queue()
+ self._work_ids = queue.Queue()
+ self._queue_management_thread = None
+ self._processes = set()
+
+ # Shutdown is a two-step process.
+ self._shutdown_thread = False
+ self._shutdown_process_event = multiprocessing.Event()
+ self._shutdown_lock = threading.Lock()
+ self._queue_count = 0
+ self._pending_work_items = {}
+
+ def _add_call_item_to_queue(self):
+ while True:
+ try:
+ work_id = self._work_ids.get(block=False)
+ except queue.Empty:
+ return
+ else:
+ work_item = self._pending_work_items[work_id]
+
+ if work_item.future.cancelled():
+ with work_item.future._condition:
+ work_item.future._condition.notify_all()
+ work_item.completion_tracker.add_cancelled()
+ continue
+ else:
+ with work_item.future._condition:
+ work_item.future._state = RUNNING
+
+ self._call_queue.put(_CallItem(work_id, work_item.call),
+ block=True)
+ if self._call_queue.full():
+ return
+
+ def _result(self):
+ while True:
+ self._add_call_item_to_queue()
+ try:
+ result_item = self._result_queue.get(block=True,
+ timeout=0.1)
+ except queue.Empty:
+ if self._shutdown_thread and not self._pending_work_items:
+ self._shutdown_process_event.set()
+ return
+ else:
+ work_item = self._pending_work_items[result_item.work_id]
+ del self._pending_work_items[result_item.work_id]
+
+ if result_item.exception:
+ set_future_exception(work_item.future,
+ work_item.completion_tracker,
+ result_item.exception)
+ else:
+ set_future_result(work_item.future,
+ work_item.completion_tracker,
+ result_item.result)
+
+ def _adjust_process_count(self):
+ if self._queue_management_thread is None:
+ self._queue_management_thread = threading.Thread(
+ target=self._result)
+ self._queue_management_thread.daemon = True
+ self._queue_management_thread.start()
+
+ for _ in range(len(self._processes), self._max_processes):
+ p = multiprocessing.Process(
+ target=_process_worker,
+ args=(self._call_queue,
+ self._result_queue,
+ self._shutdown_process_event))
+ p.daemon = True
+ p.start()
+ self._processes.add(p)
+
+ def run_to_futures(self, calls, timeout=None, return_when=ALL_COMPLETED):
+ with self._shutdown_lock:
+ if self._shutdown_thread:
+ raise RuntimeError('cannot run new futures after shutdown')
+
+ futures = []
+ event_sink = ThreadEventSink()
+ self._queue_count
+ for index, call in enumerate(calls):
+ f = Future(index)
+ self._pending_work_items[self._queue_count] = _WorkItem(
+ call, f, event_sink)
+ self._work_ids.put(self._queue_count)
+ futures.append(f)
+ self._queue_count += 1
+
+ self._adjust_process_count()
+ fl = FutureList(futures, event_sink)
+ fl.wait(timeout=timeout, return_when=return_when)
+ return fl
+
+ def shutdown(self):
+ with self._shutdown_lock:
+ self._shutdown_thread = True
diff --git a/python3/futures/thread.py b/python3/futures/thread.py
new file mode 100644
index 0000000..9e3275f
--- /dev/null
+++ b/python3/futures/thread.py
@@ -0,0 +1,89 @@
+#!/usr/bin/env python
+
+from futures._base import (PENDING, RUNNING, CANCELLED,
+ CANCELLED_AND_NOTIFIED, FINISHED,
+ ALL_COMPLETED,
+ LOGGER,
+ set_future_exception, set_future_result,
+ Executor, Future, FutureList, ThreadEventSink)
+import queue
+import threading
+
+class _WorkItem(object):
+ def __init__(self, call, future, completion_tracker):
+ self.call = call
+ self.future = future
+ self.completion_tracker = completion_tracker
+
+ def run(self):
+ with self.future._condition:
+ if self.future._state == PENDING:
+ self.future._state = RUNNING
+ elif self.future._state == CANCELLED:
+ with self.completion_tracker._condition:
+ self.future._state = CANCELLED_AND_NOTIFIED
+ self.completion_tracker.add_cancelled()
+ return
+ else:
+ LOGGER.critical('Future %s in unexpected state: %d',
+ id(self.future),
+ self.future._state)
+ return
+
+ try:
+ result = self.call()
+ except BaseException as e:
+ set_future_exception(self.future, self.completion_tracker, e)
+ else:
+ set_future_result(self.future, self.completion_tracker, result)
+
+class ThreadPoolExecutor(Executor):
+ def __init__(self, max_threads):
+ self._max_threads = max_threads
+ self._work_queue = queue.Queue()
+ self._threads = set()
+ self._shutdown = False
+ self._shutdown_lock = threading.Lock()
+
+ def _worker(self):
+ try:
+ while True:
+ try:
+ work_item = self._work_queue.get(block=True, timeout=0.1)
+ except queue.Empty:
+ if self._shutdown:
+ return
+ else:
+ work_item.run()
+ except BaseException as e:
+ LOGGER.critical('Exception in worker', exc_info=True)
+
+ def _adjust_thread_count(self):
+ for _ in range(len(self._threads),
+ min(self._max_threads, self._work_queue.qsize())):
+ t = threading.Thread(target=self._worker)
+ t.daemon = True
+ t.start()
+ self._threads.add(t)
+
+ def run_to_futures(self, calls, timeout=None, return_when=ALL_COMPLETED):
+ with self._shutdown_lock:
+ if self._shutdown:
+ raise RuntimeError('cannot run new futures after shutdown')
+
+ futures = []
+ event_sink = ThreadEventSink()
+ for index, call in enumerate(calls):
+ f = Future(index)
+ w = _WorkItem(call, f, event_sink)
+ self._work_queue.put(w)
+ futures.append(f)
+
+ self._adjust_thread_count()
+ fl = FutureList(futures, event_sink)
+ fl.wait(timeout=timeout, return_when=return_when)
+ return fl
+
+ def shutdown(self):
+ with self._shutdown_lock:
+ self._shutdown = True
diff --git a/python3/primes.py b/python3/primes.py
new file mode 100644
index 0000000..7e83ea0
--- /dev/null
+++ b/python3/primes.py
@@ -0,0 +1,44 @@
+import futures
+import math
+import time
+
+PRIMES = [
+ 112272535095293,
+ 112582705942171,
+ 112272535095293,
+ 115280095190773,
+ 115797848077099]
+
+def is_prime(n):
+ if n % 2 == 0:
+ return False
+
+ sqrt_n = int(math.floor(math.sqrt(n)))
+ for i in range(3, sqrt_n + 1, 2):
+ if n % i == 0:
+ return False
+ return True
+
+def sequential():
+ return list(map(is_prime, PRIMES))
+
+def with_process_pool_executor():
+ with futures.ProcessPoolExecutor(10) as executor:
+ return list(executor.map(is_prime, PRIMES))
+
+def with_thread_pool_executor():
+ with futures.ThreadPoolExecutor(10) as executor:
+ return list(executor.map(is_prime, PRIMES))
+
+def main():
+ for name, fn in [('sequential', sequential),
+ ('processes', with_process_pool_executor),
+ ('threads', with_thread_pool_executor)]:
+ print('%s: ' % name.ljust(12), end='')
+ start = time.time()
+ if fn() != [True] * len(PRIMES):
+ print('failed')
+ else:
+ print('%.2f seconds' % (time.time() - start))
+
+main() \ No newline at end of file
diff --git a/python3/test_futures.py b/python3/test_futures.py
new file mode 100644
index 0000000..0572f81
--- /dev/null
+++ b/python3/test_futures.py
@@ -0,0 +1,798 @@
+import test.support
+from test.support import verbose
+
+import unittest
+import threading
+import time
+import multiprocessing
+
+import futures
+import futures._base
+from futures._base import (
+ PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future)
+
+def create_future(state=PENDING, exception=None, result=None):
+ f = Future(0)
+ f._state = state
+ f._exception = exception
+ f._result = result
+ return f
+
+PENDING_FUTURE = create_future(state=PENDING)
+RUNNING_FUTURE = create_future(state=RUNNING)
+CANCELLED_FUTURE = create_future(state=CANCELLED)
+CANCELLED_AND_NOTIFIED_FUTURE = create_future(state=CANCELLED_AND_NOTIFIED)
+EXCEPTION_FUTURE = create_future(state=FINISHED, exception=IOError())
+SUCCESSFUL_FUTURE = create_future(state=FINISHED, result=42)
+
+class Call(object):
+ CALL_LOCKS = {}
+ def __init__(self, manual_finish=False, result=42):
+ called_event = multiprocessing.Event()
+ can_finish = multiprocessing.Event()
+
+ self._result = result
+ self._called_event_id = id(called_event)
+ self._can_finish_event_id = id(can_finish)
+
+ self.CALL_LOCKS[self._called_event_id] = called_event
+ self.CALL_LOCKS[self._can_finish_event_id] = can_finish
+
+ if not manual_finish:
+ self._can_finish.set()
+
+ @property
+ def _can_finish(self):
+ return self.CALL_LOCKS[self._can_finish_event_id]
+
+ @property
+ def _called_event(self):
+ return self.CALL_LOCKS[self._called_event_id]
+
+ def wait_on_called(self):
+ self._called_event.wait()
+
+ def set_can(self):
+ self._can_finish.set()
+
+ def called(self):
+ return self._called_event.is_set()
+
+ def __call__(self):
+ if self._called_event.is_set(): print('called twice')
+
+ self._called_event.set()
+ self._can_finish.wait()
+ return self._result
+
+ def close(self):
+ del self.CALL_LOCKS[self._called_event_id]
+ del self.CALL_LOCKS[self._can_finish_event_id]
+
+class ExceptionCall(Call):
+ def __call__(self):
+ assert not self._called_event.is_set(), 'already called'
+
+ self._called_event.set()
+ self._can_finish.wait()
+ raise ZeroDivisionError()
+
+class ExecutorShutdownTest(unittest.TestCase):
+ def test_run_after_shutdown(self):
+ call1 = Call()
+ try:
+ self.executor.shutdown()
+ self.assertRaises(RuntimeError,
+ self.executor.run_to_futures,
+ [call1])
+ finally:
+ call1.close()
+
+ def _start_some_futures(self):
+ call1 = Call(manual_finish=True)
+ call2 = Call(manual_finish=True)
+ call3 = Call(manual_finish=True)
+
+ try:
+ self.executor.run_to_futures([call1, call2, call3],
+ return_when=futures.RETURN_IMMEDIATELY)
+
+ call1.wait_on_called()
+ call2.wait_on_called()
+ call3.wait_on_called()
+
+ call1.set_can()
+ call2.set_can()
+ call3.set_can()
+ finally:
+ call1.close()
+ call2.close()
+ call3.close()
+
+class ThreadPoolShutdownTest(ExecutorShutdownTest):
+ def setUp(self):
+ self.executor = futures.ThreadPoolExecutor(max_threads=5)
+
+ def tearDown(self):
+ self.executor.shutdown()
+
+ def test_threads_terminate(self):
+ self._start_some_futures()
+ self.assertEqual(len(self.executor._threads), 3)
+ self.executor.shutdown()
+ for t in self.executor._threads:
+ t.join()
+
+ def test_context_manager_shutdown(self):
+ with futures.ThreadPoolExecutor(max_threads=5) as e:
+ executor = e
+ self.assertEqual(list(e.map(abs, range(-5, 5))),
+ [5, 4, 3, 2, 1, 0, 1, 2, 3, 4])
+
+ for t in executor._threads:
+ t.join()
+
+class ProcessPoolShutdownTest(ExecutorShutdownTest):
+ def setUp(self):
+ self.executor = futures.ProcessPoolExecutor(max_processes=5)
+
+ def tearDown(self):
+ self.executor.shutdown()
+
+ def test_processes_terminate(self):
+ self._start_some_futures()
+ self.assertEqual(len(self.executor._processes), 5)
+ self.executor.shutdown()
+ for p in self.executor._processes:
+ p.join()
+
+ def test_context_manager_shutdown(self):
+ with futures.ProcessPoolExecutor(max_processes=5) as e:
+ executor = e
+ self.assertEqual(list(e.map(abs, range(-5, 5))),
+ [5, 4, 3, 2, 1, 0, 1, 2, 3, 4])
+
+ for p in self.executor._processes:
+ p.join()
+
+class WaitsTest(unittest.TestCase):
+ def test_concurrent_waits(self):
+ def wait_for_ALL_COMPLETED():
+ fs.wait(return_when=futures.ALL_COMPLETED)
+ self.assertTrue(f1.done())
+ self.assertTrue(f2.done())
+ self.assertTrue(f3.done())
+ self.assertTrue(f4.done())
+ all_completed.release()
+
+ def wait_for_FIRST_COMPLETED():
+ fs.wait(return_when=futures.FIRST_COMPLETED)
+ self.assertTrue(f1.done())
+ self.assertFalse(f2.done()) # XXX
+ self.assertFalse(f3.done())
+ self.assertFalse(f4.done())
+ first_completed.release()
+
+ def wait_for_FIRST_EXCEPTION():
+ fs.wait(return_when=futures.FIRST_EXCEPTION)
+ self.assertTrue(f1.done())
+ self.assertTrue(f2.done())
+ self.assertFalse(f3.done()) # XXX
+ self.assertFalse(f4.done())
+ first_exception.release()
+
+ all_completed = threading.Semaphore(0)
+ first_completed = threading.Semaphore(0)
+ first_exception = threading.Semaphore(0)
+
+ call1 = Call(manual_finish=True)
+ call2 = ExceptionCall(manual_finish=True)
+ call3 = Call(manual_finish=True)
+ call4 = Call()
+
+ try:
+ fs = self.executor.run_to_futures(
+ [call1, call2, call3, call4],
+ return_when=futures.RETURN_IMMEDIATELY)
+ f1, f2, f3, f4 = fs
+
+ threads = []
+ for wait_test in [wait_for_ALL_COMPLETED,
+ wait_for_FIRST_COMPLETED,
+ wait_for_FIRST_EXCEPTION]:
+ t = threading.Thread(target=wait_test)
+ t.start()
+ threads.append(t)
+
+ time.sleep(1) # give threads enough time to execute wait
+
+ call1.set_can()
+ first_completed.acquire()
+ call2.set_can()
+ first_exception.acquire()
+ call3.set_can()
+ all_completed.acquire()
+
+ self.executor.shutdown()
+ finally:
+ call1.close()
+ call2.close()
+ call3.close()
+ call4.close()
+
+class ThreadPoolWaitTests(WaitsTest):
+ def setUp(self):
+ self.executor = futures.ThreadPoolExecutor(max_threads=1)
+
+ def tearDown(self):
+ self.executor.shutdown()
+
+class ProcessPoolWaitTests(WaitsTest):
+ def setUp(self):
+ self.executor = futures.ProcessPoolExecutor(max_processes=1)
+
+ def tearDown(self):
+ self.executor.shutdown()
+
+class CancelTests(unittest.TestCase):
+ def test_cancel_states(self):
+ call1 = Call(manual_finish=True)
+ call2 = Call()
+ call3 = Call()
+ call4 = Call()
+
+ try:
+ fs = self.executor.run_to_futures(
+ [call1, call2, call3, call4],
+ return_when=futures.RETURN_IMMEDIATELY)
+ f1, f2, f3, f4 = fs
+
+ call1.wait_on_called()
+ self.assertEqual(f1.cancel(), False)
+ self.assertEqual(f2.cancel(), True)
+ self.assertEqual(f4.cancel(), True)
+ self.assertEqual(f1.cancelled(), False)
+ self.assertEqual(f2.cancelled(), True)
+ self.assertEqual(f3.cancelled(), False)
+ self.assertEqual(f4.cancelled(), True)
+ self.assertEqual(f1.done(), False)
+ self.assertEqual(f2.done(), True)
+ self.assertEqual(f3.done(), False)
+ self.assertEqual(f4.done(), True)
+
+ call1.set_can()
+ fs.wait(return_when=futures.ALL_COMPLETED)
+ self.assertEqual(f1.result(), 42)
+ self.assertRaises(futures.CancelledError, f2.result)
+ self.assertRaises(futures.CancelledError, f2.exception)
+ self.assertEqual(f3.result(), 42)
+ self.assertRaises(futures.CancelledError, f4.result)
+ self.assertRaises(futures.CancelledError, f4.exception)
+
+ self.assertEqual(call2.called(), False)
+ self.assertEqual(call4.called(), False)
+ finally:
+ call1.close()
+ call2.close()
+ call3.close()
+ call4.close()
+
+ def test_wait_for_individual_cancel(self):
+ def end_call():
+ time.sleep(1)
+ f2.cancel()
+ call1.set_can()
+
+ call1 = Call(manual_finish=True)
+ call2 = Call()
+
+ try:
+ fs = self.executor.run_to_futures(
+ [call1, call2],
+ return_when=futures.RETURN_IMMEDIATELY)
+ f1, f2 = fs
+
+ call1.wait_on_called()
+ t = threading.Thread(target=end_call)
+ t.start()
+ self.assertRaises(futures.CancelledError, f2.result)
+ self.assertRaises(futures.CancelledError, f2.exception)
+ t.join()
+ finally:
+ call1.close()
+ call2.close()
+
+ def test_wait_with_already_cancelled_futures(self):
+ call1 = Call(manual_finish=True)
+ call2 = Call()
+ call3 = Call()
+ call4 = Call()
+
+ try:
+ fs = self.executor.run_to_futures(
+ [call1, call2, call3, call4],
+ return_when=futures.RETURN_IMMEDIATELY)
+ f1, f2, f3, f4 = fs
+
+ call1.wait_on_called()
+ self.assertTrue(f2.cancel())
+ self.assertTrue(f3.cancel())
+ call1.set_can()
+ time.sleep(0.1)
+
+ fs.wait(return_when=futures.ALL_COMPLETED)
+ finally:
+ call1.close()
+ call2.close()
+ call3.close()
+ call4.close()
+
+ def test_cancel_all(self):
+ call1 = Call(manual_finish=True)
+ call2 = Call()
+ call3 = Call()
+ call4 = Call()
+
+ try:
+ fs = self.executor.run_to_futures(
+ [call1, call2, call3, call4],
+ return_when=futures.RETURN_IMMEDIATELY)
+ f1, f2, f3, f4 = fs
+
+ call1.wait_on_called()
+ self.assertRaises(futures.TimeoutError, fs.cancel, timeout=0)
+ call1.set_can()
+ fs.cancel()
+
+ self.assertFalse(f1.cancelled())
+ self.assertTrue(f2.cancelled())
+ self.assertTrue(f3.cancelled())
+ self.assertTrue(f4.cancelled())
+ finally:
+ call1.close()
+ call2.close()
+ call3.close()
+ call4.close()
+
+class ThreadPoolCancelTests(CancelTests):
+ def setUp(self):
+ self.executor = futures.ThreadPoolExecutor(max_threads=1)
+
+ def tearDown(self):
+ self.executor.shutdown()
+
+class ProcessPoolCancelTests(WaitsTest):
+ def setUp(self):
+ self.executor = futures.ProcessPoolExecutor(max_processes=1)
+
+ def tearDown(self):
+ self.executor.shutdown()
+
+class ExecutorTest(unittest.TestCase):
+ # Executor.shutdown() and context manager usage is tested by
+ # ExecutorShutdownTest.
+ def test_run_to_futures(self):
+ call1 = Call(result=1)
+ call2 = Call(result=2)
+ call3 = Call(manual_finish=True)
+ call4 = Call()
+ call5 = Call()
+
+ try:
+ f1, f2, f3, f4, f5 = self.executor.run_to_futures(
+ [call1, call2, call3, call4, call5],
+ return_when=futures.RETURN_IMMEDIATELY)
+
+ call3.wait_on_called()
+
+ self.assertTrue(f1.done())
+ self.assertFalse(f1.running())
+ self.assertEqual(f1.index, 0)
+
+ self.assertTrue(f2.done())
+ self.assertFalse(f2.running())
+ self.assertEqual(f2.index, 1)
+
+ self.assertFalse(f3.done())
+ self.assertTrue(f3.running())
+ self.assertEqual(f3.index, 2)
+
+ # ProcessPoolExecutor may mark some futures as running before they
+ # actually are.
+ self.assertFalse(f4.done())
+ self.assertEqual(f4.index, 3)
+
+ self.assertFalse(f5.done())
+ self.assertEqual(f5.index, 4)
+ finally:
+ call1.close()
+ call2.close()
+ call3.close()
+ call4.close()
+ call5.close()
+
+ def test_run_to_results(self):
+ call1 = Call(result=1)
+ call2 = Call(result=2)
+ call3 = Call(result=3)
+ try:
+ self.assertEqual(
+ list(self.executor.run_to_results([call1, call2, call3])),
+ [1, 2, 3])
+ finally:
+ call1.close()
+ call2.close()
+ call3.close()
+
+ def test_run_to_results_exception(self):
+ call1 = Call(result=1)
+ call2 = Call(result=2)
+ call3 = ExceptionCall()
+ try:
+ i = self.executor.run_to_results([call1, call2, call3])
+
+ self.assertEqual(i.__next__(), 1)
+ self.assertEqual(i.__next__(), 2)
+ self.assertRaises(ZeroDivisionError, i.__next__)
+ finally:
+ call1.close()
+ call2.close()
+ call3.close()
+
+ def test_run_to_results_timeout(self):
+ call1 = Call(result=1)
+ call2 = Call(result=2)
+ call3 = Call(manual_finish=True)
+
+ try:
+ i = self.executor.run_to_results([call1, call2, call3], timeout=1)
+ self.assertEqual(i.__next__(), 1)
+ self.assertEqual(i.__next__(), 2)
+ self.assertRaises(futures.TimeoutError, i.__next__)
+ call3.set_can()
+ finally:
+ call1.close()
+ call2.close()
+ call3.close()
+
+ def test_map(self):
+ self.assertEqual(
+ list(self.executor.map(pow, range(10), range(10))),
+ list(map(pow, range(10), range(10))))
+
+ def test_map_exception(self):
+ i = self.executor.map(divmod, [1, 1, 1, 1], [2, 3, 0, 5])
+ self.assertEqual(i.__next__(), (0, 1))
+ self.assertEqual(i.__next__(), (0, 1))
+ self.assertRaises(ZeroDivisionError, i.__next__)
+
+class ThreadPoolExecutorTest(ExecutorTest):
+ def setUp(self):
+ self.executor = futures.ThreadPoolExecutor(max_threads=1)
+
+ def tearDown(self):
+ self.executor.shutdown()
+
+class ProcessPoolExecutorTest(ExecutorTest):
+ def setUp(self):
+ self.executor = futures.ProcessPoolExecutor(max_processes=1)
+
+ def tearDown(self):
+ self.executor.shutdown()
+
+class FutureTests(unittest.TestCase):
+ # Future.index() is tested by ExecutorTest
+ # Future.cancel() is further tested by CancelTests.
+
+ def test_repr(self):
+ self.assertEqual(repr(PENDING_FUTURE), '<Future state=pending>')
+ self.assertEqual(repr(RUNNING_FUTURE), '<Future state=running>')
+ self.assertEqual(repr(CANCELLED_FUTURE), '<Future state=cancelled>')
+ self.assertEqual(repr(CANCELLED_AND_NOTIFIED_FUTURE),
+ '<Future state=cancelled>')
+ self.assertEqual(repr(EXCEPTION_FUTURE),
+ '<Future state=finished raised IOError>')
+ self.assertEqual(repr(SUCCESSFUL_FUTURE),
+ '<Future state=finished returned int>')
+
+ create_future
+
+ def test_cancel(self):
+ f1 = create_future(state=PENDING)
+ f2 = create_future(state=RUNNING)
+ f3 = create_future(state=CANCELLED)
+ f4 = create_future(state=CANCELLED_AND_NOTIFIED)
+ f5 = create_future(state=FINISHED, exception=IOError())
+ f6 = create_future(state=FINISHED, result=5)
+
+ self.assertTrue(f1.cancel())
+ self.assertEquals(f1._state, CANCELLED)
+
+ self.assertFalse(f2.cancel())
+ self.assertEquals(f2._state, RUNNING)
+
+ self.assertTrue(f3.cancel())
+ self.assertEquals(f3._state, CANCELLED)
+
+ self.assertTrue(f4.cancel())
+ self.assertEquals(f4._state, CANCELLED_AND_NOTIFIED)
+
+ self.assertFalse(f5.cancel())
+ self.assertEquals(f5._state, FINISHED)
+
+ self.assertFalse(f6.cancel())
+ self.assertEquals(f6._state, FINISHED)
+
+ def test_cancelled(self):
+ self.assertFalse(PENDING_FUTURE.cancelled())
+ self.assertFalse(RUNNING_FUTURE.cancelled())
+ self.assertTrue(CANCELLED_FUTURE.cancelled())
+ self.assertTrue(CANCELLED_AND_NOTIFIED_FUTURE.cancelled())
+ self.assertFalse(EXCEPTION_FUTURE.cancelled())
+ self.assertFalse(SUCCESSFUL_FUTURE.cancelled())
+
+ def test_done(self):
+ self.assertFalse(PENDING_FUTURE.done())
+ self.assertFalse(RUNNING_FUTURE.done())
+ self.assertTrue(CANCELLED_FUTURE.done())
+ self.assertTrue(CANCELLED_AND_NOTIFIED_FUTURE.done())
+ self.assertTrue(EXCEPTION_FUTURE.done())
+ self.assertTrue(SUCCESSFUL_FUTURE.done())
+
+ def test_running(self):
+ self.assertFalse(PENDING_FUTURE.running())
+ self.assertTrue(RUNNING_FUTURE.running())
+ self.assertFalse(CANCELLED_FUTURE.running())
+ self.assertFalse(CANCELLED_AND_NOTIFIED_FUTURE.running())
+ self.assertFalse(EXCEPTION_FUTURE.running())
+ self.assertFalse(SUCCESSFUL_FUTURE.running())
+
+ def test_result_with_timeout(self):
+ self.assertRaises(futures.TimeoutError,
+ PENDING_FUTURE.result, timeout=0)
+ self.assertRaises(futures.TimeoutError,
+ RUNNING_FUTURE.result, timeout=0)
+ self.assertRaises(futures.CancelledError,
+ CANCELLED_FUTURE.result, timeout=0)
+ self.assertRaises(futures.CancelledError,
+ CANCELLED_AND_NOTIFIED_FUTURE.result, timeout=0)
+ self.assertRaises(IOError, EXCEPTION_FUTURE.result, timeout=0)
+ self.assertEqual(SUCCESSFUL_FUTURE.result(timeout=0), 42)
+
+ def test_result_with_success(self):
+ def notification():
+ time.sleep(0.1)
+ with f1._condition:
+ f1._state = FINISHED
+ f1._result = 42
+ f1._condition.notify_all()
+
+ f1 = create_future(state=PENDING)
+ t = threading.Thread(target=notification)
+ t.start()
+
+ self.assertEquals(f1.result(timeout=1), 42)
+
+ def test_result_with_cancel(self):
+ def notification():
+ time.sleep(0.1)
+ with f1._condition:
+ f1._state = CANCELLED
+ f1._condition.notify_all()
+
+ f1 = create_future(state=PENDING)
+ t = threading.Thread(target=notification)
+ t.start()
+
+ self.assertRaises(futures.CancelledError, f1.result, timeout=1)
+
+ def test_exception_with_timeout(self):
+ self.assertRaises(futures.TimeoutError,
+ PENDING_FUTURE.exception, timeout=0)
+ self.assertRaises(futures.TimeoutError,
+ RUNNING_FUTURE.exception, timeout=0)
+ self.assertRaises(futures.CancelledError,
+ CANCELLED_FUTURE.exception, timeout=0)
+ self.assertRaises(futures.CancelledError,
+ CANCELLED_AND_NOTIFIED_FUTURE.exception, timeout=0)
+ self.assertTrue(isinstance(EXCEPTION_FUTURE.exception(timeout=0),
+ IOError))
+ self.assertEqual(SUCCESSFUL_FUTURE.exception(timeout=0), None)
+
+ def test_exception_with_success(self):
+ def notification():
+ time.sleep(0.1)
+ with f1._condition:
+ f1._state = FINISHED
+ f1._exception = IOError()
+ f1._condition.notify_all()
+
+ f1 = create_future(state=PENDING)
+ t = threading.Thread(target=notification)
+ t.start()
+
+ self.assertTrue(isinstance(f1.exception(timeout=1), IOError))
+
+class FutureListTests(unittest.TestCase):
+ # FutureList.wait() is further tested by WaitsTest.
+ # FutureList.cancel() is tested by CancelTests.
+ def test_wait_RETURN_IMMEDIATELY(self):
+ f = futures.FutureList(futures=None, event_sink=None)
+ f.wait(return_when=futures.RETURN_IMMEDIATELY)
+
+ def test_wait_timeout(self):
+ f = futures.FutureList([PENDING_FUTURE],
+ futures._base.ThreadEventSink())
+
+ for t in [futures.FIRST_COMPLETED,
+ futures.FIRST_EXCEPTION,
+ futures.ALL_COMPLETED]:
+ f.wait(timeout=0.1, return_when=t)
+ self.assertFalse(PENDING_FUTURE.done())
+
+ def test_wait_all_done(self):
+ f = futures.FutureList([CANCELLED_FUTURE,
+ CANCELLED_AND_NOTIFIED_FUTURE,
+ SUCCESSFUL_FUTURE,
+ EXCEPTION_FUTURE],
+ futures._base.ThreadEventSink())
+
+ f.wait(return_when=futures.ALL_COMPLETED)
+
+ def test_filters(self):
+ fs = [PENDING_FUTURE,
+ RUNNING_FUTURE,
+ CANCELLED_FUTURE,
+ CANCELLED_AND_NOTIFIED_FUTURE,
+ EXCEPTION_FUTURE,
+ SUCCESSFUL_FUTURE]
+ f = futures.FutureList(fs, None)
+
+ self.assertEqual(list(f.running_futures()), [RUNNING_FUTURE])
+ self.assertEqual(list(f.cancelled_futures()),
+ [CANCELLED_FUTURE,
+ CANCELLED_AND_NOTIFIED_FUTURE])
+ self.assertEqual(list(f.done_futures()),
+ [CANCELLED_FUTURE,
+ CANCELLED_AND_NOTIFIED_FUTURE,
+ EXCEPTION_FUTURE,
+ SUCCESSFUL_FUTURE])
+ self.assertEqual(list(f.successful_futures()),
+ [SUCCESSFUL_FUTURE])
+ self.assertEqual(list(f.exception_futures()),
+ [EXCEPTION_FUTURE])
+
+ def test_has_running_futures(self):
+ self.assertFalse(
+ futures.FutureList([PENDING_FUTURE,
+ CANCELLED_FUTURE,
+ CANCELLED_AND_NOTIFIED_FUTURE,
+ SUCCESSFUL_FUTURE,
+ EXCEPTION_FUTURE],
+ None).has_running_futures())
+ self.assertTrue(
+ futures.FutureList([RUNNING_FUTURE],
+ None).has_running_futures())
+
+ def test_has_cancelled_futures(self):
+ self.assertFalse(
+ futures.FutureList([PENDING_FUTURE,
+ RUNNING_FUTURE,
+ SUCCESSFUL_FUTURE,
+ EXCEPTION_FUTURE],
+ None).has_cancelled_futures())
+ self.assertTrue(
+ futures.FutureList([CANCELLED_FUTURE],
+ None).has_cancelled_futures())
+
+ self.assertTrue(
+ futures.FutureList([CANCELLED_AND_NOTIFIED_FUTURE],
+ None).has_cancelled_futures())
+
+ def test_has_done_futures(self):
+ self.assertFalse(
+ futures.FutureList([PENDING_FUTURE,
+ RUNNING_FUTURE],
+ None).has_done_futures())
+ self.assertTrue(
+ futures.FutureList([CANCELLED_FUTURE],
+ None).has_done_futures())
+
+ self.assertTrue(
+ futures.FutureList([CANCELLED_AND_NOTIFIED_FUTURE],
+ None).has_done_futures())
+
+ self.assertTrue(
+ futures.FutureList([EXCEPTION_FUTURE],
+ None).has_done_futures())
+
+ self.assertTrue(
+ futures.FutureList([SUCCESSFUL_FUTURE],
+ None).has_done_futures())
+
+ def test_has_successful_futures(self):
+ self.assertFalse(
+ futures.FutureList([PENDING_FUTURE,
+ RUNNING_FUTURE,
+ CANCELLED_FUTURE,
+ CANCELLED_AND_NOTIFIED_FUTURE,
+ EXCEPTION_FUTURE],
+ None).has_successful_futures())
+
+ self.assertTrue(
+ futures.FutureList([SUCCESSFUL_FUTURE],
+ None).has_successful_futures())
+
+ def test_has_exception_futures(self):
+ self.assertFalse(
+ futures.FutureList([PENDING_FUTURE,
+ RUNNING_FUTURE,
+ CANCELLED_FUTURE,
+ CANCELLED_AND_NOTIFIED_FUTURE,
+ SUCCESSFUL_FUTURE],
+ None).has_exception_futures())
+
+ self.assertTrue(
+ futures.FutureList([EXCEPTION_FUTURE],
+ None).has_exception_futures())
+
+ def test_get_item(self):
+ fs = [PENDING_FUTURE, RUNNING_FUTURE, CANCELLED_FUTURE]
+ f = futures.FutureList(fs, None)
+ self.assertEqual(f[0], PENDING_FUTURE)
+ self.assertEqual(f[1], RUNNING_FUTURE)
+ self.assertEqual(f[2], CANCELLED_FUTURE)
+ self.assertRaises(IndexError, f.__getitem__, 3)
+
+ def test_len(self):
+ f = futures.FutureList([PENDING_FUTURE,
+ RUNNING_FUTURE,
+ CANCELLED_FUTURE],
+ None)
+ self.assertEqual(len(f), 3)
+
+ def test_iter(self):
+ fs = [PENDING_FUTURE, RUNNING_FUTURE, CANCELLED_FUTURE]
+ f = futures.FutureList(fs, None)
+ self.assertEqual(list(iter(f)), fs)
+
+ def test_contains(self):
+ f = futures.FutureList([PENDING_FUTURE,
+ RUNNING_FUTURE],
+ None)
+ self.assertTrue(PENDING_FUTURE in f)
+ self.assertTrue(RUNNING_FUTURE in f)
+ self.assertFalse(CANCELLED_FUTURE in f)
+
+ def test_repr(self):
+ pending = create_future(state=PENDING)
+ cancelled = create_future(state=CANCELLED)
+ cancelled2 = create_future(state=CANCELLED_AND_NOTIFIED)
+ running = create_future(state=RUNNING)
+ finished = create_future(state=FINISHED)
+
+ f = futures.FutureList(
+ [PENDING_FUTURE] * 4 + [CANCELLED_FUTURE] * 2 +
+ [CANCELLED_AND_NOTIFIED_FUTURE] +
+ [RUNNING_FUTURE] * 2 +
+ [SUCCESSFUL_FUTURE, EXCEPTION_FUTURE] * 3,
+ None)
+
+ self.assertEqual(repr(f),
+ '<FutureList #futures=15 '
+ '[#pending=4 #cancelled=3 #running=2 #finished=6]>')
+
+def test_main():
+ test.support.run_unittest(ProcessPoolCancelTests,
+ ThreadPoolCancelTests,
+ ProcessPoolExecutorTest,
+ ThreadPoolExecutorTest,
+ ProcessPoolWaitTests,
+ ThreadPoolWaitTests,
+ FutureTests,
+ FutureListTests,
+ ProcessPoolShutdownTest,
+ ThreadPoolShutdownTest)
+
+if __name__ == "__main__":
+ test_main() \ No newline at end of file