diff options
author | Alex Gr?nholm <alex.gronholm@nextday.fi> | 2010-12-18 06:01:09 +0000 |
---|---|---|
committer | Alex Gr?nholm <alex.gronholm@nextday.fi> | 2010-12-18 06:01:09 +0000 |
commit | 3eb85180cce06b3cfc3cc6e75ba97bd0bbed23f3 (patch) | |
tree | 3a8beeebe683fff8cf64ac87c89b2dbed595ca54 | |
parent | 635b1f0d5bb08ed01a71fcfd146816db87da495e (diff) | |
download | futures-3eb85180cce06b3cfc3cc6e75ba97bd0bbed23f3.tar.gz |
Moved the code in the futures package to concurrent.futures as per PEP 3148; unified the codebase to support both Python 2 and 3 in a single tree; added support to Python 2.5; added tox.ini for easy testing with multiple Python versions
-rw-r--r-- | concurrent/__init__.py | 3 | ||||
-rw-r--r-- | concurrent/futures/__init__.py | 18 | ||||
-rw-r--r-- | concurrent/futures/_base.py (renamed from python2/futures/_base.py) | 15 | ||||
-rw-r--r-- | concurrent/futures/_compat.py | 107 | ||||
-rw-r--r-- | concurrent/futures/process.py (renamed from python3/futures/process.py) | 18 | ||||
-rw-r--r-- | concurrent/futures/thread.py (renamed from python3/futures/thread.py) | 20 | ||||
-rw-r--r-- | crawl.py (renamed from python2/crawl.py) | 28 | ||||
-rw-r--r-- | docs/index.rst | 10 | ||||
-rw-r--r-- | futures/__init__.py | 24 | ||||
-rw-r--r-- | futures/process.py | 1 | ||||
-rw-r--r-- | futures/thread.py | 1 | ||||
-rw-r--r-- | primes.py (renamed from python3/primes.py) | 15 | ||||
-rw-r--r-- | python2/futures/__init__.py | 18 | ||||
-rw-r--r-- | python2/futures/process.py | 337 | ||||
-rw-r--r-- | python2/futures/thread.py | 136 | ||||
-rw-r--r-- | python2/primes.py | 47 | ||||
-rwxr-xr-x | python2/setup.py | 18 | ||||
-rw-r--r-- | python3/crawl.py | 68 | ||||
-rw-r--r-- | python3/futures/__init__.py | 18 | ||||
-rw-r--r-- | python3/futures/_base.py | 569 | ||||
-rwxr-xr-x | python3/setup.py | 18 | ||||
-rw-r--r-- | python3/test_futures.py | 819 | ||||
-rwxr-xr-x | setup.py | 32 | ||||
-rw-r--r-- | test_futures.py (renamed from python2/test_futures.py) | 50 | ||||
-rw-r--r-- | tox.ini | 11 |
25 files changed, 295 insertions, 2106 deletions
diff --git a/concurrent/__init__.py b/concurrent/__init__.py new file mode 100644 index 0000000..b36383a --- /dev/null +++ b/concurrent/__init__.py @@ -0,0 +1,3 @@ +from pkgutil import extend_path + +__path__ = extend_path(__path__, __name__) diff --git a/concurrent/futures/__init__.py b/concurrent/futures/__init__.py new file mode 100644 index 0000000..b5231f8 --- /dev/null +++ b/concurrent/futures/__init__.py @@ -0,0 +1,18 @@ +# Copyright 2009 Brian Quinlan. All Rights Reserved. +# Licensed to PSF under a Contributor Agreement. + +"""Execute computations asynchronously using threads or processes.""" + +__author__ = 'Brian Quinlan (brian@sweetapp.com)' + +from concurrent.futures._base import (FIRST_COMPLETED, + FIRST_EXCEPTION, + ALL_COMPLETED, + CancelledError, + TimeoutError, + Future, + Executor, + wait, + as_completed) +from concurrent.futures.process import ProcessPoolExecutor +from concurrent.futures.thread import ThreadPoolExecutor diff --git a/python2/futures/_base.py b/concurrent/futures/_base.py index 58a2c5e..1d90211 100644 --- a/python2/futures/_base.py +++ b/concurrent/futures/_base.py @@ -1,14 +1,19 @@ # Copyright 2009 Brian Quinlan. All Rights Reserved. # Licensed to PSF under a Contributor Agreement. -__author__ = 'Brian Quinlan (brian@sweetapp.com)' - -import collections +from __future__ import with_statement import functools import logging import threading import time +try: + from collections import namedtuple +except ImportError: + from concurrent.futures._compat import namedtuple + +__author__ = 'Brian Quinlan (brian@sweetapp.com)' + FIRST_COMPLETED = 'FIRST_COMPLETED' FIRST_EXCEPTION = 'FIRST_EXCEPTION' ALL_COMPLETED = 'ALL_COMPLETED' @@ -40,7 +45,7 @@ _STATE_TO_DESCRIPTION_MAP = { } # Logger for internal use by the futures package. -LOGGER = logging.getLogger("futures") +LOGGER = logging.getLogger("concurrent.futures") STDERR_HANDLER = logging.StreamHandler() LOGGER.addHandler(STDERR_HANDLER) @@ -227,7 +232,7 @@ def as_completed(fs, timeout=None): for f in fs: f._waiters.remove(waiter) -DoneAndNotDoneFutures = collections.namedtuple( +DoneAndNotDoneFutures = namedtuple( 'DoneAndNotDoneFutures', 'done not_done') def wait(fs, timeout=None, return_when=ALL_COMPLETED): """Wait for the futures in the given sequence to complete. diff --git a/concurrent/futures/_compat.py b/concurrent/futures/_compat.py new file mode 100644 index 0000000..03175b7 --- /dev/null +++ b/concurrent/futures/_compat.py @@ -0,0 +1,107 @@ +import sys + +#if sys.version_info >= (2, 6): +# from collections import namedtuple +#else: +## Copied from Python 2.6 standard library +from keyword import iskeyword as _iskeyword +from operator import itemgetter as _itemgetter +_sys = sys + +def namedtuple(typename, field_names, verbose=False): + """Returns a new subclass of tuple with named fields. + + >>> Point = namedtuple('Point', 'x y') + >>> Point.__doc__ # docstring for the new class + 'Point(x, y)' + >>> p = Point(11, y=22) # instantiate with positional args or keywords + >>> p[0] + p[1] # indexable like a plain tuple + 33 + >>> x, y = p # unpack like a regular tuple + >>> x, y + (11, 22) + >>> p.x + p.y # fields also accessable by name + 33 + >>> d = p._asdict() # convert to a dictionary + >>> d['x'] + 11 + >>> Point(**d) # convert from a dictionary + Point(x=11, y=22) + >>> p._replace(x=100) # _replace() is like str.replace() but targets named fields + Point(x=100, y=22) + + """ + + # Parse and validate the field names. Validation serves two purposes, + # generating informative error messages and preventing template injection attacks. + if isinstance(field_names, basestring): + field_names = field_names.replace(',', ' ').split() # names separated by whitespace and/or commas + field_names = tuple(map(str, field_names)) + for name in (typename,) + field_names: + if not all(c.isalnum() or c=='_' for c in name): + raise ValueError('Type names and field names can only contain alphanumeric characters and underscores: %r' % name) + if _iskeyword(name): + raise ValueError('Type names and field names cannot be a keyword: %r' % name) + if name[0].isdigit(): + raise ValueError('Type names and field names cannot start with a number: %r' % name) + seen_names = set() + for name in field_names: + if name.startswith('_'): + raise ValueError('Field names cannot start with an underscore: %r' % name) + if name in seen_names: + raise ValueError('Encountered duplicate field name: %r' % name) + seen_names.add(name) + + # Create and fill-in the class template + numfields = len(field_names) + argtxt = repr(field_names).replace("'", "")[1:-1] # tuple repr without parens or quotes + reprtxt = ', '.join('%s=%%r' % name for name in field_names) + dicttxt = ', '.join('%r: t[%d]' % (name, pos) for pos, name in enumerate(field_names)) + template = '''class %(typename)s(tuple): + '%(typename)s(%(argtxt)s)' \n + __slots__ = () \n + _fields = %(field_names)r \n + def __new__(_cls, %(argtxt)s): + return _tuple.__new__(_cls, (%(argtxt)s)) \n + @classmethod + def _make(cls, iterable, new=tuple.__new__, len=len): + 'Make a new %(typename)s object from a sequence or iterable' + result = new(cls, iterable) + if len(result) != %(numfields)d: + raise TypeError('Expected %(numfields)d arguments, got %%d' %% len(result)) + return result \n + def __repr__(self): + return '%(typename)s(%(reprtxt)s)' %% self \n + def _asdict(t): + 'Return a new dict which maps field names to their values' + return {%(dicttxt)s} \n + def _replace(_self, **kwds): + 'Return a new %(typename)s object replacing specified fields with new values' + result = _self._make(map(kwds.pop, %(field_names)r, _self)) + if kwds: + raise ValueError('Got unexpected field names: %%r' %% kwds.keys()) + return result \n + def __getnewargs__(self): + return tuple(self) \n\n''' % locals() + for i, name in enumerate(field_names): + template += ' %s = _property(_itemgetter(%d))\n' % (name, i) + if verbose: + print template + + # Execute the template string in a temporary namespace and + # support tracing utilities by setting a value for frame.f_globals['__name__'] + namespace = dict(_itemgetter=_itemgetter, __name__='namedtuple_%s' % typename, + _property=property, _tuple=tuple) + try: + exec template in namespace + except SyntaxError, e: + raise SyntaxError(e.message + ':\n' + template) + result = namespace[typename] + + # For pickling to work, the __module__ variable needs to be set to the frame + # where the named tuple is created. Bypass this step in enviroments where + # sys._getframe is not defined (Jython for example). + if hasattr(_sys, '_getframe'): + result.__module__ = _sys._getframe(1).f_globals.get('__name__', '__main__') + + return result diff --git a/python3/futures/process.py b/concurrent/futures/process.py index 6de870f..87dc789 100644 --- a/python3/futures/process.py +++ b/concurrent/futures/process.py @@ -43,14 +43,21 @@ Process #1..n: _ResultItems in "Request Q" """ -__author__ = 'Brian Quinlan (brian@sweetapp.com)' - +from __future__ import with_statement import atexit -from futures import _base -import queue import multiprocessing import threading import weakref +import sys + +from concurrent.futures import _base + +try: + import queue +except ImportError: + import Queue as queue + +__author__ = 'Brian Quinlan (brian@sweetapp.com)' # Workers are created as daemon threads and processes. This is done to allow the # interpreter to exit when there are still idle processes in a @@ -137,7 +144,8 @@ def _process_worker(call_queue, result_queue, shutdown): else: try: r = call_item.fn(*call_item.args, **call_item.kwargs) - except BaseException as e: + except BaseException: + e = sys.exc_info()[1] result_queue.put(_ResultItem(call_item.work_id, exception=e)) else: diff --git a/python3/futures/thread.py b/concurrent/futures/thread.py index a2d96bf..ce0dda0 100644 --- a/python3/futures/thread.py +++ b/concurrent/futures/thread.py @@ -3,13 +3,20 @@ """Implements ThreadPoolExecutor.""" -__author__ = 'Brian Quinlan (brian@sweetapp.com)' - +from __future__ import with_statement import atexit -from futures import _base -import queue import threading import weakref +import sys + +from concurrent.futures import _base + +try: + import queue +except ImportError: + import Queue as queue + +__author__ = 'Brian Quinlan (brian@sweetapp.com)' # Workers are created as daemon threads. This is done to allow the interpreter # to exit when there are still idle threads in a ThreadPoolExecutor's thread @@ -63,7 +70,8 @@ class _WorkItem(object): try: result = self.fn(*self.args, **self.kwargs) - except BaseException as e: + except BaseException: + e = sys.exc_info()[1] self.future.set_exception(e) else: self.future.set_result(result) @@ -84,7 +92,7 @@ def _worker(executor_reference, work_queue): del executor else: work_item.run() - except BaseException as e: + except BaseException: _base.LOGGER.critical('Exception in worker', exc_info=True) class ThreadPoolExecutor(_base.Executor): diff --git a/python2/crawl.py b/crawl.py index f1d29ca..86e0af7 100644 --- a/python2/crawl.py +++ b/crawl.py @@ -1,11 +1,17 @@ """Compare the speed of downloading URLs sequentially vs. using futures.""" -import datetime import functools -import futures.thread import time import timeit -import urllib2 +import sys + +try: + from urllib2 import urlopen +except ImportError: + from urllib.request import urlopen + +from concurrent.futures import (as_completed, ThreadPoolExecutor, + ProcessPoolExecutor) URLS = ['http://www.google.com/', 'http://www.apple.com/', @@ -20,7 +26,8 @@ URLS = ['http://www.google.com/', 'http://www.blogger.com/'] def load_url(url, timeout): - return urllib2.urlopen(url, timeout=timeout).read() + kwargs = {'timeout': timeout} if sys.version_info >= (2, 6) else {} + return urlopen(url, **kwargs).read() def download_urls_sequential(urls, timeout=60): url_to_content = {} @@ -37,7 +44,7 @@ def download_urls_with_executor(urls, executor, timeout=60): future_to_url = dict((executor.submit(load_url, url, timeout), url) for url in urls) - for future in futures.as_completed(future_to_url): + for future in as_completed(future_to_url): try: url_to_content[future_to_url[future]] = future.result() except: @@ -52,17 +59,16 @@ def main(): ('processes', functools.partial(download_urls_with_executor, URLS, - futures.ProcessPoolExecutor(10))), + ProcessPoolExecutor(10))), ('threads', functools.partial(download_urls_with_executor, URLS, - futures.ThreadPoolExecutor(10)))]: - print name.ljust(12), + ThreadPoolExecutor(10)))]: + sys.stdout.write('%s: ' % name.ljust(12)) start = time.time() url_map = fn() - print '%.2f seconds (%d of %d downloaded)' % (time.time() - start, - len(url_map), - len(URLS)) + sys.stdout.write('%.2f seconds (%d of %d downloaded)\n' % + (time.time() - start, len(url_map), len(URLS))) if __name__ == '__main__': main() diff --git a/docs/index.rst b/docs/index.rst index ac006a8..a5d084c 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -1,11 +1,11 @@ -:mod:`futures` --- Asynchronous computation +:mod:`concurrent.futures` --- Asynchronous computation =========================================== -.. module:: futures +.. module:: concurrent.futures :synopsis: Execute computations asynchronously using threads or processes. -The :mod:`futures` module provides a high-level interface for asynchronously -executing callables. +The :mod:`concurrent.futures` module provides a high-level interface for +asynchronously executing callables. The asynchronous execution can be be performed by threads using :class:`ThreadPoolExecutor` or seperate processes using @@ -120,7 +120,7 @@ ThreadPoolExecutor Example ^^^^^^^^^^^^^^^^^^^^^^^^^^ :: - import futures + from concurrent import futures import urllib.request URLS = ['http://www.foxnews.com/', diff --git a/futures/__init__.py b/futures/__init__.py new file mode 100644 index 0000000..8f8b234 --- /dev/null +++ b/futures/__init__.py @@ -0,0 +1,24 @@ +# Copyright 2009 Brian Quinlan. All Rights Reserved. +# Licensed to PSF under a Contributor Agreement. + +"""Execute computations asynchronously using threads or processes.""" + +import warnings + +from concurrent.futures import (FIRST_COMPLETED, + FIRST_EXCEPTION, + ALL_COMPLETED, + CancelledError, + TimeoutError, + Future, + Executor, + wait, + as_completed, + ProcessPoolExecutor, + ThreadPoolExecutor) + +__author__ = 'Brian Quinlan (brian@sweetapp.com)' + +warnings.warn('The futures package has been deprecated. ' + 'Use the concurrent.futures package instead.', + DeprecationWarning) diff --git a/futures/process.py b/futures/process.py new file mode 100644 index 0000000..e9d37b1 --- /dev/null +++ b/futures/process.py @@ -0,0 +1 @@ +from concurrent.futures import ProcessPoolExecutor diff --git a/futures/thread.py b/futures/thread.py new file mode 100644 index 0000000..f6bd05d --- /dev/null +++ b/futures/thread.py @@ -0,0 +1 @@ +from concurrent.futures import ThreadPoolExecutor diff --git a/python3/primes.py b/primes.py index 5152fcb..0da2b3e 100644 --- a/python3/primes.py +++ b/primes.py @@ -1,6 +1,9 @@ -import futures +from __future__ import with_statement import math import time +import sys + +from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor PRIMES = [ 112272535095293, @@ -25,23 +28,23 @@ def sequential(): return list(map(is_prime, PRIMES)) def with_process_pool_executor(): - with futures.ProcessPoolExecutor(10) as executor: + with ProcessPoolExecutor(10) as executor: return list(executor.map(is_prime, PRIMES)) def with_thread_pool_executor(): - with futures.ThreadPoolExecutor(10) as executor: + with ThreadPoolExecutor(10) as executor: return list(executor.map(is_prime, PRIMES)) def main(): for name, fn in [('sequential', sequential), ('processes', with_process_pool_executor), ('threads', with_thread_pool_executor)]: - print('%s: ' % name.ljust(12), end='') + sys.stdout.write('%s: ' % name.ljust(12)) start = time.time() if fn() != [True] * len(PRIMES): - print('failed') + sys.stdout.write('failed\n') else: - print('%.2f seconds' % (time.time() - start)) + sys.stdout.write('%.2f seconds\n' % (time.time() - start)) if __name__ == '__main__': main() diff --git a/python2/futures/__init__.py b/python2/futures/__init__.py deleted file mode 100644 index 8331d53..0000000 --- a/python2/futures/__init__.py +++ /dev/null @@ -1,18 +0,0 @@ -# Copyright 2009 Brian Quinlan. All Rights Reserved. -# Licensed to PSF under a Contributor Agreement. - -"""Execute computations asynchronously using threads or processes.""" - -__author__ = 'Brian Quinlan (brian@sweetapp.com)' - -from futures._base import (FIRST_COMPLETED, - FIRST_EXCEPTION, - ALL_COMPLETED, - CancelledError, - TimeoutError, - Future, - Executor, - wait, - as_completed) -from futures.process import ProcessPoolExecutor -from futures.thread import ThreadPoolExecutor diff --git a/python2/futures/process.py b/python2/futures/process.py deleted file mode 100644 index ec48377..0000000 --- a/python2/futures/process.py +++ /dev/null @@ -1,337 +0,0 @@ -# Copyright 2009 Brian Quinlan. All Rights Reserved. -# Licensed to PSF under a Contributor Agreement. - -"""Implements ProcessPoolExecutor. - -The follow diagram and text describe the data-flow through the system: - -|======================= In-process =====================|== Out-of-process ==| - -+----------+ +----------+ +--------+ +-----------+ +---------+ -| | => | Work Ids | => | | => | Call Q | => | | -| | +----------+ | | +-----------+ | | -| | | ... | | | | ... | | | -| | | 6 | | | | 5, call() | | | -| | | 7 | | | | ... | | | -| Process | | ... | | Local | +-----------+ | Process | -| Pool | +----------+ | Worker | | #1..n | -| Executor | | Thread | | | -| | +----------- + | | +-----------+ | | -| | <=> | Work Items | <=> | | <= | Result Q | <= | | -| | +------------+ | | +-----------+ | | -| | | 6: call() | | | | ... | | | -| | | future | | | | 4, result | | | -| | | ... | | | | 3, except | | | -+----------+ +------------+ +--------+ +-----------+ +---------+ - -Executor.submit() called: -- creates a uniquely numbered _WorkItem and adds it to the "Work Items" dict -- adds the id of the _WorkItem to the "Work Ids" queue - -Local worker thread: -- reads work ids from the "Work Ids" queue and looks up the corresponding - WorkItem from the "Work Items" dict: if the work item has been cancelled then - it is simply removed from the dict, otherwise it is repackaged as a - _CallItem and put in the "Call Q". New _CallItems are put in the "Call Q" - until "Call Q" is full. NOTE: the size of the "Call Q" is kept small because - calls placed in the "Call Q" can no longer be cancelled with Future.cancel(). -- reads _ResultItems from "Result Q", updates the future stored in the - "Work Items" dict and deletes the dict entry - -Process #1..n: -- reads _CallItems from "Call Q", executes the calls, and puts the resulting - _ResultItems in "Request Q" -""" - -__author__ = 'Brian Quinlan (brian@sweetapp.com)' - -import atexit -import _base -import Queue -import multiprocessing -import threading -import weakref - -# Workers are created as daemon threads and processes. This is done to allow the -# interpreter to exit when there are still idle processes in a -# ProcessPoolExecutor's process pool (i.e. shutdown() was not called). However, -# allowing workers to die with the interpreter has two undesirable properties: -# - The workers would still be running during interpretor shutdown, -# meaning that they would fail in unpredictable ways. -# - The workers could be killed while evaluating a work item, which could -# be bad if the callable being evaluated has external side-effects e.g. -# writing to a file. -# -# To work around this problem, an exit handler is installed which tells the -# workers to exit when their work queues are empty and then waits until the -# threads/processes finish. - -_thread_references = set() -_shutdown = False - -def _python_exit(): - global _shutdown - _shutdown = True - for thread_reference in _thread_references: - thread = thread_reference() - if thread is not None: - thread.join() - -def _remove_dead_thread_references(): - """Remove inactive threads from _thread_references. - - Should be called periodically to prevent memory leaks in scenarios such as: - >>> while True: - >>> ... t = ThreadPoolExecutor(max_workers=5) - >>> ... t.map(int, ['1', '2', '3', '4', '5']) - """ - for thread_reference in set(_thread_references): - if thread_reference() is None: - _thread_references.discard(thread_reference) - -# Controls how many more calls than processes will be queued in the call queue. -# A smaller number will mean that processes spend more time idle waiting for -# work while a larger number will make Future.cancel() succeed less frequently -# (Futures in the call queue cannot be cancelled). -EXTRA_QUEUED_CALLS = 1 - -class _WorkItem(object): - def __init__(self, future, fn, args, kwargs): - self.future = future - self.fn = fn - self.args = args - self.kwargs = kwargs - -class _ResultItem(object): - def __init__(self, work_id, exception=None, result=None): - self.work_id = work_id - self.exception = exception - self.result = result - -class _CallItem(object): - def __init__(self, work_id, fn, args, kwargs): - self.work_id = work_id - self.fn = fn - self.args = args - self.kwargs = kwargs - -def _process_worker(call_queue, result_queue, shutdown): - """Evaluates calls from call_queue and places the results in result_queue. - - This worker is run in a seperate process. - - Args: - call_queue: A multiprocessing.Queue of _CallItems that will be read and - evaluated by the worker. - result_queue: A multiprocessing.Queue of _ResultItems that will written - to by the worker. - shutdown: A multiprocessing.Event that will be set as a signal to the - worker that it should exit when call_queue is empty. - """ - while True: - try: - call_item = call_queue.get(block=True, timeout=0.1) - except Queue.Empty: - if shutdown.is_set(): - return - else: - try: - r = call_item.fn(*call_item.args, **call_item.kwargs) - except BaseException as e: - result_queue.put(_ResultItem(call_item.work_id, - exception=e)) - else: - result_queue.put(_ResultItem(call_item.work_id, - result=r)) - -def _add_call_item_to_queue(pending_work_items, - work_ids, - call_queue): - """Fills call_queue with _WorkItems from pending_work_items. - - This function never blocks. - - Args: - pending_work_items: A dict mapping work ids to _WorkItems e.g. - {5: <_WorkItem...>, 6: <_WorkItem...>, ...} - work_ids: A queue.Queue of work ids e.g. Queue([5, 6, ...]). Work ids - are consumed and the corresponding _WorkItems from - pending_work_items are transformed into _CallItems and put in - call_queue. - call_queue: A multiprocessing.Queue that will be filled with _CallItems - derived from _WorkItems. - """ - while True: - if call_queue.full(): - return - try: - work_id = work_ids.get(block=False) - except Queue.Empty: - return - else: - work_item = pending_work_items[work_id] - - if work_item.future.set_running_or_notify_cancel(): - call_queue.put(_CallItem(work_id, - work_item.fn, - work_item.args, - work_item.kwargs), - block=True) - else: - del pending_work_items[work_id] - continue - -def _queue_manangement_worker(executor_reference, - processes, - pending_work_items, - work_ids_queue, - call_queue, - result_queue, - shutdown_process_event): - """Manages the communication between this process and the worker processes. - - This function is run in a local thread. - - Args: - executor_reference: A weakref.ref to the ProcessPoolExecutor that owns - this thread. Used to determine if the ProcessPoolExecutor has been - garbage collected and that this function can exit. - process: A list of the multiprocessing.Process instances used as - workers. - pending_work_items: A dict mapping work ids to _WorkItems e.g. - {5: <_WorkItem...>, 6: <_WorkItem...>, ...} - work_ids_queue: A queue.Queue of work ids e.g. Queue([5, 6, ...]). - call_queue: A multiprocessing.Queue that will be filled with _CallItems - derived from _WorkItems for processing by the process workers. - result_queue: A multiprocessing.Queue of _ResultItems generated by the - process workers. - shutdown_process_event: A multiprocessing.Event used to signal the - process workers that they should exit when their work queue is - empty. - """ - while True: - _add_call_item_to_queue(pending_work_items, - work_ids_queue, - call_queue) - - try: - result_item = result_queue.get(block=True, timeout=0.1) - except Queue.Empty: - executor = executor_reference() - # No more work items can be added if: - # - The interpreter is shutting down OR - # - The executor that owns this worker has been collected OR - # - The executor that owns this worker has been shutdown. - if _shutdown or executor is None or executor._shutdown_thread: - # Since no new work items can be added, it is safe to shutdown - # this thread if there are no pending work items. - if not pending_work_items: - shutdown_process_event.set() - - # If .join() is not called on the created processes then - # some multiprocessing.Queue methods may deadlock on Mac OS - # X. - for p in processes: - p.join() - return - del executor - else: - work_item = pending_work_items[result_item.work_id] - del pending_work_items[result_item.work_id] - - if result_item.exception: - work_item.future.set_exception(result_item.exception) - else: - work_item.future.set_result(result_item.result) - -class ProcessPoolExecutor(_base.Executor): - def __init__(self, max_workers=None): - """Initializes a new ProcessPoolExecutor instance. - - Args: - max_workers: The maximum number of processes that can be used to - execute the given calls. If None or not given then as many - worker processes will be created as the machine has processors. - """ - _remove_dead_thread_references() - - if max_workers is None: - self._max_workers = multiprocessing.cpu_count() - else: - self._max_workers = max_workers - - # Make the call queue slightly larger than the number of processes to - # prevent the worker processes from idling. But don't make it too big - # because futures in the call queue cannot be cancelled. - self._call_queue = multiprocessing.Queue(self._max_workers + - EXTRA_QUEUED_CALLS) - self._result_queue = multiprocessing.Queue() - self._work_ids = Queue.Queue() - self._queue_management_thread = None - self._processes = set() - - # Shutdown is a two-step process. - self._shutdown_thread = False - self._shutdown_process_event = multiprocessing.Event() - self._shutdown_lock = threading.Lock() - self._queue_count = 0 - self._pending_work_items = {} - - def _start_queue_management_thread(self): - if self._queue_management_thread is None: - self._queue_management_thread = threading.Thread( - target=_queue_manangement_worker, - args=(weakref.ref(self), - self._processes, - self._pending_work_items, - self._work_ids, - self._call_queue, - self._result_queue, - self._shutdown_process_event)) - self._queue_management_thread.daemon = True - self._queue_management_thread.start() - _thread_references.add(weakref.ref(self._queue_management_thread)) - - def _adjust_process_count(self): - for _ in range(len(self._processes), self._max_workers): - p = multiprocessing.Process( - target=_process_worker, - args=(self._call_queue, - self._result_queue, - self._shutdown_process_event)) - p.start() - self._processes.add(p) - - def submit(self, fn, *args, **kwargs): - with self._shutdown_lock: - if self._shutdown_thread: - raise RuntimeError('cannot schedule new futures after shutdown') - - f = _base.Future() - w = _WorkItem(f, fn, args, kwargs) - - self._pending_work_items[self._queue_count] = w - self._work_ids.put(self._queue_count) - self._queue_count += 1 - - self._start_queue_management_thread() - self._adjust_process_count() - return f - submit.__doc__ = _base.Executor.submit.__doc__ - - def shutdown(self, wait=True): - with self._shutdown_lock: - self._shutdown_thread = True - if wait: - if self._queue_management_thread: - self._queue_management_thread.join() - # To reduce the risk of openning too many files, remove references to - # objects that use file descriptors. - self._queue_management_thread = None - self._call_queue = None - self._result_queue = None - self._shutdown_process_event = None - self._processes = None - shutdown.__doc__ = _base.Executor.shutdown.__doc__ - -atexit.register(_python_exit) diff --git a/python2/futures/thread.py b/python2/futures/thread.py deleted file mode 100644 index 3f1584a..0000000 --- a/python2/futures/thread.py +++ /dev/null @@ -1,136 +0,0 @@ -# Copyright 2009 Brian Quinlan. All Rights Reserved. -# Licensed to PSF under a Contributor Agreement. - -"""Implements ThreadPoolExecutor.""" - -__author__ = 'Brian Quinlan (brian@sweetapp.com)' - -import atexit -import _base -import Queue -import threading -import weakref - -# Workers are created as daemon threads. This is done to allow the interpreter -# to exit when there are still idle threads in a ThreadPoolExecutor's thread -# pool (i.e. shutdown() was not called). However, allowing workers to die with -# the interpreter has two undesirable properties: -# - The workers would still be running during interpretor shutdown, -# meaning that they would fail in unpredictable ways. -# - The workers could be killed while evaluating a work item, which could -# be bad if the callable being evaluated has external side-effects e.g. -# writing to a file. -# -# To work around this problem, an exit handler is installed which tells the -# workers to exit when their work queues are empty and then waits until the -# threads finish. - -_thread_references = set() -_shutdown = False - -def _python_exit(): - global _shutdown - _shutdown = True - for thread_reference in _thread_references: - thread = thread_reference() - if thread is not None: - thread.join() - -def _remove_dead_thread_references(): - """Remove inactive threads from _thread_references. - - Should be called periodically to prevent memory leaks in scenarios such as: - >>> while True: - ... t = ThreadPoolExecutor(max_workers=5) - ... t.map(int, ['1', '2', '3', '4', '5']) - """ - for thread_reference in set(_thread_references): - if thread_reference() is None: - _thread_references.discard(thread_reference) - -atexit.register(_python_exit) - -class _WorkItem(object): - def __init__(self, future, fn, args, kwargs): - self.future = future - self.fn = fn - self.args = args - self.kwargs = kwargs - - def run(self): - if not self.future.set_running_or_notify_cancel(): - return - - try: - result = self.fn(*self.args, **self.kwargs) - except BaseException as e: - self.future.set_exception(e) - else: - self.future.set_result(result) - -def _worker(executor_reference, work_queue): - try: - while True: - try: - work_item = work_queue.get(block=True, timeout=0.1) - except Queue.Empty: - executor = executor_reference() - # Exit if: - # - The interpreter is shutting down OR - # - The executor that owns the worker has been collected OR - # - The executor that owns the worker has been shutdown. - if _shutdown or executor is None or executor._shutdown: - return - del executor - else: - work_item.run() - except BaseException as e: - _base.LOGGER.critical('Exception in worker', exc_info=True) - -class ThreadPoolExecutor(_base.Executor): - def __init__(self, max_workers): - """Initializes a new ThreadPoolExecutor instance. - - Args: - max_workers: The maximum number of threads that can be used to - execute the given calls. - """ - _remove_dead_thread_references() - - self._max_workers = max_workers - self._work_queue = Queue.Queue() - self._threads = set() - self._shutdown = False - self._shutdown_lock = threading.Lock() - - def submit(self, fn, *args, **kwargs): - with self._shutdown_lock: - if self._shutdown: - raise RuntimeError('cannot schedule new futures after shutdown') - - f = _base.Future() - w = _WorkItem(f, fn, args, kwargs) - - self._work_queue.put(w) - self._adjust_thread_count() - return f - submit.__doc__ = _base.Executor.submit.__doc__ - - def _adjust_thread_count(self): - # TODO(bquinlan): Should avoid creating new threads if there are more - # idle threads than items in the work queue. - if len(self._threads) < self._max_workers: - t = threading.Thread(target=_worker, - args=(weakref.ref(self), self._work_queue)) - t.daemon = True - t.start() - self._threads.add(t) - _thread_references.add(weakref.ref(t)) - - def shutdown(self, wait=True): - with self._shutdown_lock: - self._shutdown = True - if wait: - for t in self._threads: - t.join() - shutdown.__doc__ = _base.Executor.shutdown.__doc__ diff --git a/python2/primes.py b/python2/primes.py deleted file mode 100644 index fa6c355..0000000 --- a/python2/primes.py +++ /dev/null @@ -1,47 +0,0 @@ -import futures -import math -import time - -PRIMES = [ - 112272535095293, - 112582705942171, - 112272535095293, - 115280095190773, - 115797848077099, - 117450548693743, - 993960000099397] - -def is_prime(n): - if n % 2 == 0: - return False - - sqrt_n = int(math.floor(math.sqrt(n))) - for i in range(3, sqrt_n + 1, 2): - if n % i == 0: - return False - return True - -def sequential(): - return list(map(is_prime, PRIMES)) - -def with_process_pool_executor(): - with futures.ProcessPoolExecutor(10) as executor: - return list(executor.map(is_prime, PRIMES)) - -def with_thread_pool_executor(): - with futures.ThreadPoolExecutor(10) as executor: - return list(executor.map(is_prime, PRIMES)) - -def main(): - for name, fn in [('sequential', sequential), - ('processes', with_process_pool_executor), - ('threads', with_thread_pool_executor)]: - print name.ljust(12), - start = time.time() - if fn() != [True] * len(PRIMES): - print 'failed' - else: - print '%.2f seconds' % (time.time() - start) - -if __name__ == '__main__': - main() diff --git a/python2/setup.py b/python2/setup.py deleted file mode 100755 index 0465a9b..0000000 --- a/python2/setup.py +++ /dev/null @@ -1,18 +0,0 @@ -#!/usr/bin/env python - -from distutils.core import setup - -setup(name='futures', - version='2.0', - description='Java-style futures implementation in Python 2.x', - author='Brian Quinlan', - author_email='brian@sweetapp.com', - url='http://code.google.com/p/pythonfutures', - download_url='http://pypi.python.org/pypi/futures3/', - packages=['futures'], - license='BSD', - classifiers=['License :: OSI Approved :: BSD License', - 'Development Status :: 5 - Production/Stable', - 'Intended Audience :: Developers', - 'Programming Language :: Python :: 2'] - ) diff --git a/python3/crawl.py b/python3/crawl.py deleted file mode 100644 index 7135682..0000000 --- a/python3/crawl.py +++ /dev/null @@ -1,68 +0,0 @@ -"""Compare the speed of downloading URLs sequentially vs. using futures.""" - -import datetime -import functools -import futures.thread -import time -import timeit -import urllib.request - -URLS = ['http://www.google.com/', - 'http://www.apple.com/', - 'http://www.ibm.com', - 'http://www.thisurlprobablydoesnotexist.com', - 'http://www.slashdot.org/', - 'http://www.python.org/', - 'http://www.bing.com/', - 'http://www.facebook.com/', - 'http://www.yahoo.com/', - 'http://www.youtube.com/', - 'http://www.blogger.com/'] - -def load_url(url, timeout): - return urllib.request.urlopen(url, timeout=timeout).read() - -def download_urls_sequential(urls, timeout=60): - url_to_content = {} - for url in urls: - try: - url_to_content[url] = load_url(url, timeout=timeout) - except: - pass - return url_to_content - -def download_urls_with_executor(urls, executor, timeout=60): - try: - url_to_content = {} - future_to_url = dict((executor.submit(load_url, url, timeout), url) - for url in urls) - - for future in futures.as_completed(future_to_url): - try: - url_to_content[future_to_url[future]] = future.result() - except: - pass - return url_to_content - finally: - executor.shutdown() - -def main(): - for name, fn in [('sequential', - functools.partial(download_urls_sequential, URLS)), - ('processes', - functools.partial(download_urls_with_executor, - URLS, - futures.ProcessPoolExecutor(10))), - ('threads', - functools.partial(download_urls_with_executor, - URLS, - futures.ThreadPoolExecutor(10)))]: - print('%s: ' % name.ljust(12), end='') - start = time.time() - url_map = fn() - print('%.2f seconds (%d of %d downloaded)' % (time.time() - start, - len(url_map), - len(URLS))) - -if __name__ == '__main__': - main() diff --git a/python3/futures/__init__.py b/python3/futures/__init__.py deleted file mode 100644 index 8331d53..0000000 --- a/python3/futures/__init__.py +++ /dev/null @@ -1,18 +0,0 @@ -# Copyright 2009 Brian Quinlan. All Rights Reserved. -# Licensed to PSF under a Contributor Agreement. - -"""Execute computations asynchronously using threads or processes.""" - -__author__ = 'Brian Quinlan (brian@sweetapp.com)' - -from futures._base import (FIRST_COMPLETED, - FIRST_EXCEPTION, - ALL_COMPLETED, - CancelledError, - TimeoutError, - Future, - Executor, - wait, - as_completed) -from futures.process import ProcessPoolExecutor -from futures.thread import ThreadPoolExecutor diff --git a/python3/futures/_base.py b/python3/futures/_base.py deleted file mode 100644 index 143f330..0000000 --- a/python3/futures/_base.py +++ /dev/null @@ -1,569 +0,0 @@ -# Copyright 2009 Brian Quinlan. All Rights Reserved. -# Licensed to PSF under a Contributor Agreement. - -__author__ = 'Brian Quinlan (brian@sweetapp.com)' - -import collections -import functools -import logging -import threading -import time - -FIRST_COMPLETED = 'FIRST_COMPLETED' -FIRST_EXCEPTION = 'FIRST_EXCEPTION' -ALL_COMPLETED = 'ALL_COMPLETED' -_AS_COMPLETED = '_AS_COMPLETED' - -# Possible future states (for internal use by the futures package). -PENDING = 'PENDING' -RUNNING = 'RUNNING' -# The future was cancelled by the user... -CANCELLED = 'CANCELLED' -# ...and _Waiter.add_cancelled() was called by a worker. -CANCELLED_AND_NOTIFIED = 'CANCELLED_AND_NOTIFIED' -FINISHED = 'FINISHED' - -_FUTURE_STATES = [ - PENDING, - RUNNING, - CANCELLED, - CANCELLED_AND_NOTIFIED, - FINISHED -] - -_STATE_TO_DESCRIPTION_MAP = { - PENDING: "pending", - RUNNING: "running", - CANCELLED: "cancelled", - CANCELLED_AND_NOTIFIED: "cancelled", - FINISHED: "finished" -} - -# Logger for internal use by the futures package. -LOGGER = logging.getLogger("futures") -STDERR_HANDLER = logging.StreamHandler() -LOGGER.addHandler(STDERR_HANDLER) - -class Error(Exception): - """Base class for all future-related exceptions.""" - pass - -class CancelledError(Error): - """The Future was cancelled.""" - pass - -class TimeoutError(Error): - """The operation exceeded the given deadline.""" - pass - -class _Waiter(object): - """Provides the event that wait() and as_completed() block on.""" - def __init__(self): - self.event = threading.Event() - self.finished_futures = [] - - def add_result(self, future): - self.finished_futures.append(future) - - def add_exception(self, future): - self.finished_futures.append(future) - - def add_cancelled(self, future): - self.finished_futures.append(future) - -class _AsCompletedWaiter(_Waiter): - """Used by as_completed().""" - - def __init__(self): - super(_AsCompletedWaiter, self).__init__() - self.lock = threading.Lock() - - def add_result(self, future): - with self.lock: - super(_AsCompletedWaiter, self).add_result(future) - self.event.set() - - def add_exception(self, future): - with self.lock: - super(_AsCompletedWaiter, self).add_exception(future) - self.event.set() - - def add_cancelled(self, future): - with self.lock: - super(_AsCompletedWaiter, self).add_cancelled(future) - self.event.set() - -class _FirstCompletedWaiter(_Waiter): - """Used by wait(return_when=FIRST_COMPLETED).""" - - def add_result(self, future): - super().add_result(future) - self.event.set() - - def add_exception(self, future): - super().add_exception(future) - self.event.set() - - def add_cancelled(self, future): - super().add_cancelled(future) - self.event.set() - -class _AllCompletedWaiter(_Waiter): - """Used by wait(return_when=FIRST_EXCEPTION and ALL_COMPLETED).""" - - def __init__(self, num_pending_calls, stop_on_exception): - self.num_pending_calls = num_pending_calls - self.stop_on_exception = stop_on_exception - super().__init__() - - def _decrement_pending_calls(self): - self.num_pending_calls -= 1 - if not self.num_pending_calls: - self.event.set() - - def add_result(self, future): - super().add_result(future) - self._decrement_pending_calls() - - def add_exception(self, future): - super().add_exception(future) - if self.stop_on_exception: - self.event.set() - else: - self._decrement_pending_calls() - - def add_cancelled(self, future): - super().add_cancelled(future) - self._decrement_pending_calls() - -class _AcquireFutures(object): - """A context manager that does an ordered acquire of Future conditions.""" - - def __init__(self, futures): - self.futures = sorted(futures, key=id) - - def __enter__(self): - for future in self.futures: - future._condition.acquire() - - def __exit__(self, *args): - for future in self.futures: - future._condition.release() - -def _create_and_install_waiters(fs, return_when): - if return_when == _AS_COMPLETED: - waiter = _AsCompletedWaiter() - elif return_when == FIRST_COMPLETED: - waiter = _FirstCompletedWaiter() - else: - pending_count = sum( - f._state not in [CANCELLED_AND_NOTIFIED, FINISHED] for f in fs) - - if return_when == FIRST_EXCEPTION: - waiter = _AllCompletedWaiter(pending_count, stop_on_exception=True) - elif return_when == ALL_COMPLETED: - waiter = _AllCompletedWaiter(pending_count, stop_on_exception=False) - else: - raise ValueError("Invalid return condition: %r" % return_when) - - for f in fs: - f._waiters.append(waiter) - - return waiter - -def as_completed(fs, timeout=None): - """An iterator over the given futures that yields each as it completes. - - Args: - fs: The sequence of Futures (possibly created by different Executors) to - iterate over. - timeout: The maximum number of seconds to wait. If None, then there - is no limit on the wait time. - - Returns: - An iterator that yields the given Futures as they complete (finished or - cancelled). - - Raises: - TimeoutError: If the entire result iterator could not be generated - before the given timeout. - """ - if timeout is not None: - end_time = timeout + time.time() - - with _AcquireFutures(fs): - finished = set( - f for f in fs - if f._state in [CANCELLED_AND_NOTIFIED, FINISHED]) - pending = set(fs) - finished - waiter = _create_and_install_waiters(fs, _AS_COMPLETED) - - try: - for future in finished: - yield future - - while pending: - if timeout is None: - wait_timeout = None - else: - wait_timeout = end_time - time.time() - if wait_timeout < 0: - raise TimeoutError( - '%d (of %d) futures unfinished' % ( - len(pending), len(fs))) - - waiter.event.wait(wait_timeout) - - with waiter.lock: - finished = waiter.finished_futures - waiter.finished_futures = [] - waiter.event.clear() - - for future in finished: - yield future - pending.remove(future) - - finally: - for f in fs: - f._waiters.remove(waiter) - -DoneAndNotDoneFutures = collections.namedtuple( - 'DoneAndNotDoneFutures', 'done not_done') -def wait(fs, timeout=None, return_when=ALL_COMPLETED): - """Wait for the futures in the given sequence to complete. - - Args: - fs: The sequence of Futures (possibly created by different Executors) to - wait upon. - timeout: The maximum number of seconds to wait. If None, then there - is no limit on the wait time. - return_when: Indicates when this function should return. The options - are: - - FIRST_COMPLETED - Return when any future finishes or is - cancelled. - FIRST_EXCEPTION - Return when any future finishes by raising an - exception. If no future raises an exception - then it is equivalent to ALL_COMPLETED. - ALL_COMPLETED - Return when all futures finish or are cancelled. - - Returns: - A named 2-tuple of sets. The first set, named 'done', contains the - futures that completed (is finished or cancelled) before the wait - completed. The second set, named 'not_done', contains uncompleted - futures. - """ - with _AcquireFutures(fs): - done = set(f for f in fs - if f._state in [CANCELLED_AND_NOTIFIED, FINISHED]) - not_done = set(fs) - done - - if (return_when == FIRST_COMPLETED) and done: - return DoneAndNotDoneFutures(done, not_done) - elif (return_when == FIRST_EXCEPTION) and done: - if any(f for f in done - if not f.cancelled() and f.exception() is not None): - return DoneAndNotDoneFutures(done, not_done) - - if len(done) == len(fs): - return DoneAndNotDoneFutures(done, not_done) - - waiter = _create_and_install_waiters(fs, return_when) - - waiter.event.wait(timeout) - for f in fs: - f._waiters.remove(waiter) - - done.update(waiter.finished_futures) - return DoneAndNotDoneFutures(done, set(fs) - done) - -class Future(object): - """Represents the result of an asynchronous computation.""" - - def __init__(self): - """Initializes the future. Should not be called by clients.""" - self._condition = threading.Condition() - self._state = PENDING - self._result = None - self._exception = None - self._waiters = [] - self._done_callbacks = [] - - def _invoke_callbacks(self): - for callback in self._done_callbacks: - try: - callback(self) - except Exception: - LOGGER.exception('exception calling callback for %r', self) - - def __repr__(self): - with self._condition: - if self._state == FINISHED: - if self._exception: - return '<Future at %s state=%s raised %s>' % ( - hex(id(self)), - _STATE_TO_DESCRIPTION_MAP[self._state], - self._exception.__class__.__name__) - else: - return '<Future at %s state=%s returned %s>' % ( - hex(id(self)), - _STATE_TO_DESCRIPTION_MAP[self._state], - self._result.__class__.__name__) - return '<Future at %s state=%s>' % ( - hex(id(self)), - _STATE_TO_DESCRIPTION_MAP[self._state]) - - def cancel(self): - """Cancel the future if possible. - - Returns True if the future was cancelled, False otherwise. A future - cannot be cancelled if it is running or has already completed. - """ - with self._condition: - if self._state in [RUNNING, FINISHED]: - return False - - if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: - return True - - self._state = CANCELLED - self._condition.notify_all() - - self._invoke_callbacks() - return True - - def cancelled(self): - """Return True if the future has cancelled.""" - with self._condition: - return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED] - - def running(self): - """Return True if the future is currently executing.""" - with self._condition: - return self._state == RUNNING - - def done(self): - """Return True of the future was cancelled or finished executing.""" - with self._condition: - return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED] - - def __get_result(self): - if self._exception: - raise self._exception - else: - return self._result - - def add_done_callback(self, fn): - """Attaches a callable that will be called when the future finishes. - - Args: - fn: A callable that will be called with this future as its only - argument when the future completes or is cancelled. The callable - will always be called by a thread in the same process in which - it was added. If the future has already completed or been - cancelled then the callable will be called immediately. These - callables are called in the order that they were added. - """ - with self._condition: - if self._state not in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]: - self._done_callbacks.append(fn) - return - fn(self) - - def result(self, timeout=None): - """Return the result of the call that the future represents. - - Args: - timeout: The number of seconds to wait for the result if the future - isn't done. If None, then there is no limit on the wait time. - - Returns: - The result of the call that the future represents. - - Raises: - CancelledError: If the future was cancelled. - TimeoutError: If the future didn't finish executing before the given - timeout. - Exception: If the call raised then that exception will be raised. - """ - with self._condition: - if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: - raise CancelledError() - elif self._state == FINISHED: - return self.__get_result() - - self._condition.wait(timeout) - - if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: - raise CancelledError() - elif self._state == FINISHED: - return self.__get_result() - else: - raise TimeoutError() - - def exception(self, timeout=None): - """Return the exception raised by the call that the future represents. - - Args: - timeout: The number of seconds to wait for the exception if the - future isn't done. If None, then there is no limit on the wait - time. - - Returns: - The exception raised by the call that the future represents or None - if the call completed without raising. - - Raises: - CancelledError: If the future was cancelled. - TimeoutError: If the future didn't finish executing before the given - timeout. - """ - - with self._condition: - if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: - raise CancelledError() - elif self._state == FINISHED: - return self._exception - - self._condition.wait(timeout) - - if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: - raise CancelledError() - elif self._state == FINISHED: - return self._exception - else: - raise TimeoutError() - - # The following methods should only be used by Executors and in tests. - def set_running_or_notify_cancel(self): - """Mark the future as running or process any cancel notifications. - - Should only be used by Executor implementations and unit tests. - - If the future has been cancelled (cancel() was called and returned - True) then any threads waiting on the future completing (though calls - to as_completed() or wait()) are notified and False is returned. - - If the future was not cancelled then it is put in the running state - (future calls to running() will return True) and True is returned. - - This method should be called by Executor implementations before - executing the work associated with this future. If this method returns - False then the work should not be executed. - - Returns: - False if the Future was cancelled, True otherwise. - - Raises: - RuntimeError: if this method was already called or if set_result() - or set_exception() was called. - """ - with self._condition: - if self._state == CANCELLED: - self._state = CANCELLED_AND_NOTIFIED - for waiter in self._waiters: - waiter.add_cancelled(self) - # self._condition.notify_all() is not necessary because - # self.cancel() triggers a notification. - return False - elif self._state == PENDING: - self._state = RUNNING - return True - else: - LOGGER.critical('Future %s in unexpected state: %s', - id(self.future), - self.future._state) - raise RuntimeError('Future in unexpected state') - - def set_result(self, result): - """Sets the return value of work associated with the future. - - Should only be used by Executor implementations and unit tests. - """ - with self._condition: - self._result = result - self._state = FINISHED - for waiter in self._waiters: - waiter.add_result(self) - self._condition.notify_all() - self._invoke_callbacks() - - def set_exception(self, exception): - """Sets the result of the future as being the given exception. - - Should only be used by Executor implementations and unit tests. - """ - with self._condition: - self._exception = exception - self._state = FINISHED - for waiter in self._waiters: - waiter.add_exception(self) - self._condition.notify_all() - self._invoke_callbacks() - -class Executor(object): - """This is an abstract base class for concrete asynchronous executors.""" - - def submit(self, fn, *args, **kwargs): - """Submits a callable to be executed with the given arguments. - - Schedules the callable to be executed as fn(*args, **kwargs) and returns - a Future instance representing the execution of the callable. - - Returns: - A Future representing the given call. - """ - raise NotImplementedError() - - def map(self, fn, *iterables, timeout=None): - """Returns a iterator equivalent to map(fn, iter). - - Args: - fn: A callable that will take take as many arguments as there are - passed iterables. - timeout: The maximum number of seconds to wait. If None, then there - is no limit on the wait time. - - Returns: - An iterator equivalent to: map(func, *iterables) but the calls may - be evaluated out-of-order. - - Raises: - TimeoutError: If the entire result iterator could not be generated - before the given timeout. - Exception: If fn(*args) raises for any values. - """ - if timeout is not None: - end_time = timeout + time.time() - - fs = [self.submit(fn, *args) for args in zip(*iterables)] - - try: - for future in fs: - if timeout is None: - yield future.result() - else: - yield future.result(end_time - time.time()) - finally: - for future in fs: - future.cancel() - - def shutdown(self, wait=True): - """Clean-up the resources associated with the Executor. - - It is safe to call this method several times. Otherwise, no other - methods can be called after this one. - - Args: - wait: If True then shutdown will not return until all running - futures have finished executing and the resources used by the - executor have been reclaimed. - """ - pass - - def __enter__(self): - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - self.shutdown(wait=True) - return False diff --git a/python3/setup.py b/python3/setup.py deleted file mode 100755 index fcd05f2..0000000 --- a/python3/setup.py +++ /dev/null @@ -1,18 +0,0 @@ -#!/usr/bin/env python3 - -from distutils.core import setup - -setup(name='futures3', - version='1.0', - description='Java-style futures implementation in Python 3.x', - author='Brian Quinlan', - author_email='brian@sweetapp.com', - url='http://code.google.com/p/pythonfutures', - download_url='http://pypi.python.org/pypi/futures3/', - packages=['futures'], - license='BSD', - classifiers=['License :: OSI Approved :: BSD License', - 'Development Status :: 5 - Production/Stable', - 'Intended Audience :: Developers', - 'Programming Language :: Python :: 3'] - ) diff --git a/python3/test_futures.py b/python3/test_futures.py deleted file mode 100644 index 98eea27..0000000 --- a/python3/test_futures.py +++ /dev/null @@ -1,819 +0,0 @@ -import io -import logging -import multiprocessing -import sys -import threading -import test.support -import time -import unittest - -if sys.platform.startswith('win'): - import ctypes - import ctypes.wintypes - -import futures -from futures._base import ( - PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future, - LOGGER, STDERR_HANDLER, wait) -import futures.process - -def create_future(state=PENDING, exception=None, result=None): - f = Future() - f._state = state - f._exception = exception - f._result = result - return f - -PENDING_FUTURE = create_future(state=PENDING) -RUNNING_FUTURE = create_future(state=RUNNING) -CANCELLED_FUTURE = create_future(state=CANCELLED) -CANCELLED_AND_NOTIFIED_FUTURE = create_future(state=CANCELLED_AND_NOTIFIED) -EXCEPTION_FUTURE = create_future(state=FINISHED, exception=IOError()) -SUCCESSFUL_FUTURE = create_future(state=FINISHED, result=42) - -def mul(x, y): - return x * y - -class Call(object): - """A call that can be submitted to a future.Executor for testing. - - The call signals when it is called and waits for an event before finishing. - """ - CALL_LOCKS = {} - def _create_event(self): - if sys.platform.startswith('win'): - class SECURITY_ATTRIBUTES(ctypes.Structure): - _fields_ = [("nLength", ctypes.wintypes.DWORD), - ("lpSecurityDescriptor", ctypes.wintypes.LPVOID), - ("bInheritHandle", ctypes.wintypes.BOOL)] - - s = SECURITY_ATTRIBUTES() - s.nLength = ctypes.sizeof(s) - s.lpSecurityDescriptor = None - s.bInheritHandle = True - - handle = ctypes.windll.kernel32.CreateEventA(ctypes.pointer(s), - True, - False, - None) - assert handle is not None - return handle - else: - event = multiprocessing.Event() - self.CALL_LOCKS[id(event)] = event - return id(event) - - def _wait_on_event(self, handle): - if sys.platform.startswith('win'): - r = ctypes.windll.kernel32.WaitForSingleObject(handle, 5 * 1000) - assert r == 0 - else: - self.CALL_LOCKS[handle].wait() - - def _signal_event(self, handle): - if sys.platform.startswith('win'): - r = ctypes.windll.kernel32.SetEvent(handle) - assert r != 0 - else: - self.CALL_LOCKS[handle].set() - - def __init__(self, manual_finish=False, result=42): - self._called_event = self._create_event() - self._can_finish = self._create_event() - - self._result = result - - if not manual_finish: - self._signal_event(self._can_finish) - - def wait_on_called(self): - self._wait_on_event(self._called_event) - - def set_can(self): - self._signal_event(self._can_finish) - - def __call__(self): - self._signal_event(self._called_event) - self._wait_on_event(self._can_finish) - - return self._result - - def close(self): - self.set_can() - if sys.platform.startswith('win'): - ctypes.windll.kernel32.CloseHandle(self._called_event) - ctypes.windll.kernel32.CloseHandle(self._can_finish) - else: - del self.CALL_LOCKS[self._called_event] - del self.CALL_LOCKS[self._can_finish] - -class ExceptionCall(Call): - def __call__(self): - self._signal_event(self._called_event) - self._wait_on_event(self._can_finish) - raise ZeroDivisionError() - -class MapCall(Call): - def __init__(self, result=42): - super().__init__(manual_finish=True, result=result) - - def __call__(self, manual_finish): - if manual_finish: - super().__call__() - return self._result - -class ExecutorShutdownTest(unittest.TestCase): - def test_run_after_shutdown(self): - self.executor.shutdown() - self.assertRaises(RuntimeError, - self.executor.submit, - pow, 2, 5) - - - def _start_some_futures(self): - call1 = Call(manual_finish=True) - call2 = Call(manual_finish=True) - call3 = Call(manual_finish=True) - - try: - self.executor.submit(call1) - self.executor.submit(call2) - self.executor.submit(call3) - - call1.wait_on_called() - call2.wait_on_called() - call3.wait_on_called() - - call1.set_can() - call2.set_can() - call3.set_can() - finally: - call1.close() - call2.close() - call3.close() - -class ThreadPoolShutdownTest(ExecutorShutdownTest): - def setUp(self): - self.executor = futures.ThreadPoolExecutor(max_workers=5) - - def tearDown(self): - self.executor.shutdown(wait=True) - - def test_threads_terminate(self): - self._start_some_futures() - self.assertEqual(len(self.executor._threads), 3) - self.executor.shutdown() - for t in self.executor._threads: - t.join() - - def test_context_manager_shutdown(self): - with futures.ThreadPoolExecutor(max_workers=5) as e: - executor = e - self.assertEqual(list(e.map(abs, range(-5, 5))), - [5, 4, 3, 2, 1, 0, 1, 2, 3, 4]) - - for t in executor._threads: - t.join() - - def test_del_shutdown(self): - executor = futures.ThreadPoolExecutor(max_workers=5) - executor.map(abs, range(-5, 5)) - threads = executor._threads - del executor - - for t in threads: - t.join() - -class ProcessPoolShutdownTest(ExecutorShutdownTest): - def setUp(self): - self.executor = futures.ProcessPoolExecutor(max_workers=5) - - def tearDown(self): - self.executor.shutdown(wait=True) - - def test_processes_terminate(self): - self._start_some_futures() - self.assertEqual(len(self.executor._processes), 5) - processes = self.executor._processes - self.executor.shutdown() - - for p in processes: - p.join() - - def test_context_manager_shutdown(self): - with futures.ProcessPoolExecutor(max_workers=5) as e: - executor = e - self.assertEqual(list(e.map(abs, range(-5, 5))), - [5, 4, 3, 2, 1, 0, 1, 2, 3, 4]) - - for p in self.executor._processes: - p.join() - - def test_del_shutdown(self): - executor = futures.ProcessPoolExecutor(max_workers=5) - list(executor.map(abs, range(-5, 5))) - queue_management_thread = executor._queue_management_thread - processes = executor._processes - del executor - - queue_management_thread.join() - for p in processes: - p.join() - -class WaitTests(unittest.TestCase): - def test_first_completed(self): - def wait_test(): - while not future1._waiters: - pass - call1.set_can() - - call1 = Call(manual_finish=True) - call2 = Call(manual_finish=True) - try: - future1 = self.executor.submit(call1) - future2 = self.executor.submit(call2) - - t = threading.Thread(target=wait_test) - t.start() - done, not_done = futures.wait( - [CANCELLED_FUTURE, future1, future2], - return_when=futures.FIRST_COMPLETED) - - self.assertEquals(set([future1]), done) - self.assertEquals(set([CANCELLED_FUTURE, future2]), not_done) - finally: - call1.close() - call2.close() - - def test_first_completed_one_already_completed(self): - call1 = Call(manual_finish=True) - try: - future1 = self.executor.submit(call1) - - finished, pending = futures.wait( - [SUCCESSFUL_FUTURE, future1], - return_when=futures.FIRST_COMPLETED) - - self.assertEquals(set([SUCCESSFUL_FUTURE]), finished) - self.assertEquals(set([future1]), pending) - finally: - call1.close() - - def test_first_exception(self): - def wait_test(): - while not future1._waiters: - pass - call1.set_can() - call2.set_can() - - call1 = Call(manual_finish=True) - call2 = ExceptionCall(manual_finish=True) - call3 = Call(manual_finish=True) - try: - future1 = self.executor.submit(call1) - future2 = self.executor.submit(call2) - future3 = self.executor.submit(call3) - - t = threading.Thread(target=wait_test) - t.start() - finished, pending = futures.wait( - [future1, future2, future3], - return_when=futures.FIRST_EXCEPTION) - - self.assertEquals(set([future1, future2]), finished) - self.assertEquals(set([future3]), pending) - finally: - call1.close() - call2.close() - call3.close() - - def test_first_exception_some_already_complete(self): - def wait_test(): - while not future1._waiters: - pass - call1.set_can() - - call1 = ExceptionCall(manual_finish=True) - call2 = Call(manual_finish=True) - try: - future1 = self.executor.submit(call1) - future2 = self.executor.submit(call2) - - t = threading.Thread(target=wait_test) - t.start() - finished, pending = futures.wait( - [SUCCESSFUL_FUTURE, - CANCELLED_FUTURE, - CANCELLED_AND_NOTIFIED_FUTURE, - future1, future2], - return_when=futures.FIRST_EXCEPTION) - - self.assertEquals(set([SUCCESSFUL_FUTURE, - CANCELLED_AND_NOTIFIED_FUTURE, - future1]), finished) - self.assertEquals(set([CANCELLED_FUTURE, future2]), pending) - - - finally: - call1.close() - call2.close() - - def test_first_exception_one_already_failed(self): - call1 = Call(manual_finish=True) - try: - future1 = self.executor.submit(call1) - - finished, pending = futures.wait( - [EXCEPTION_FUTURE, future1], - return_when=futures.FIRST_EXCEPTION) - - self.assertEquals(set([EXCEPTION_FUTURE]), finished) - self.assertEquals(set([future1]), pending) - finally: - call1.close() - - def test_all_completed(self): - def wait_test(): - while not future1._waiters: - pass - call1.set_can() - call2.set_can() - - call1 = Call(manual_finish=True) - call2 = Call(manual_finish=True) - try: - future1 = self.executor.submit(call1) - future2 = self.executor.submit(call2) - - t = threading.Thread(target=wait_test) - t.start() - finished, pending = futures.wait( - [future1, future2], - return_when=futures.ALL_COMPLETED) - - self.assertEquals(set([future1, future2]), finished) - self.assertEquals(set(), pending) - - - finally: - call1.close() - call2.close() - - def test_all_completed_some_already_completed(self): - def wait_test(): - while not future1._waiters: - pass - - future4.cancel() - call1.set_can() - call2.set_can() - call3.set_can() - - self.assertLessEqual( - futures.process.EXTRA_QUEUED_CALLS, - 1, - 'this test assumes that future4 will be cancelled before it is ' - 'queued to run - which might not be the case if ' - 'ProcessPoolExecutor is too aggresive in scheduling futures') - call1 = Call(manual_finish=True) - call2 = Call(manual_finish=True) - call3 = Call(manual_finish=True) - call4 = Call(manual_finish=True) - try: - future1 = self.executor.submit(call1) - future2 = self.executor.submit(call2) - future3 = self.executor.submit(call3) - future4 = self.executor.submit(call4) - - t = threading.Thread(target=wait_test) - t.start() - finished, pending = futures.wait( - [SUCCESSFUL_FUTURE, - CANCELLED_AND_NOTIFIED_FUTURE, - future1, future2, future3, future4], - return_when=futures.ALL_COMPLETED) - - self.assertEquals(set([SUCCESSFUL_FUTURE, - CANCELLED_AND_NOTIFIED_FUTURE, - future1, future2, future3, future4]), - finished) - self.assertEquals(set(), pending) - finally: - call1.close() - call2.close() - call3.close() - call4.close() - - def test_timeout(self): - def wait_test(): - while not future1._waiters: - pass - call1.set_can() - - call1 = Call(manual_finish=True) - call2 = Call(manual_finish=True) - try: - future1 = self.executor.submit(call1) - future2 = self.executor.submit(call2) - - t = threading.Thread(target=wait_test) - t.start() - finished, pending = futures.wait( - [CANCELLED_AND_NOTIFIED_FUTURE, - EXCEPTION_FUTURE, - SUCCESSFUL_FUTURE, - future1, future2], - timeout=1, - return_when=futures.ALL_COMPLETED) - - self.assertEquals(set([CANCELLED_AND_NOTIFIED_FUTURE, - EXCEPTION_FUTURE, - SUCCESSFUL_FUTURE, - future1]), finished) - self.assertEquals(set([future2]), pending) - - - finally: - call1.close() - call2.close() - - -class ThreadPoolWaitTests(WaitTests): - def setUp(self): - self.executor = futures.ThreadPoolExecutor(max_workers=1) - - def tearDown(self): - self.executor.shutdown(wait=True) - -class ProcessPoolWaitTests(WaitTests): - def setUp(self): - self.executor = futures.ProcessPoolExecutor(max_workers=1) - - def tearDown(self): - self.executor.shutdown(wait=True) - -class AsCompletedTests(unittest.TestCase): - # TODO(brian@sweetapp.com): Should have a test with a non-zero timeout. - def test_no_timeout(self): - def wait_test(): - while not future1._waiters: - pass - call1.set_can() - call2.set_can() - - call1 = Call(manual_finish=True) - call2 = Call(manual_finish=True) - try: - future1 = self.executor.submit(call1) - future2 = self.executor.submit(call2) - - t = threading.Thread(target=wait_test) - t.start() - completed = set(futures.as_completed( - [CANCELLED_AND_NOTIFIED_FUTURE, - EXCEPTION_FUTURE, - SUCCESSFUL_FUTURE, - future1, future2])) - self.assertEquals(set( - [CANCELLED_AND_NOTIFIED_FUTURE, - EXCEPTION_FUTURE, - SUCCESSFUL_FUTURE, - future1, future2]), - completed) - finally: - call1.close() - call2.close() - - def test_zero_timeout(self): - call1 = Call(manual_finish=True) - try: - future1 = self.executor.submit(call1) - completed_futures = set() - try: - for future in futures.as_completed( - [CANCELLED_AND_NOTIFIED_FUTURE, - EXCEPTION_FUTURE, - SUCCESSFUL_FUTURE, - future1], - timeout=0): - completed_futures.add(future) - except futures.TimeoutError: - pass - - self.assertEquals(set([CANCELLED_AND_NOTIFIED_FUTURE, - EXCEPTION_FUTURE, - SUCCESSFUL_FUTURE]), - completed_futures) - finally: - call1.close() - -class ThreadPoolAsCompletedTests(AsCompletedTests): - def setUp(self): - self.executor = futures.ThreadPoolExecutor(max_workers=1) - - def tearDown(self): - self.executor.shutdown(wait=True) - -class ProcessPoolAsCompletedTests(AsCompletedTests): - def setUp(self): - self.executor = futures.ProcessPoolExecutor(max_workers=1) - - def tearDown(self): - self.executor.shutdown(wait=True) - -class ExecutorTest(unittest.TestCase): - # Executor.shutdown() and context manager usage is tested by - # ExecutorShutdownTest. - def test_submit(self): - future = self.executor.submit(pow, 2, 8) - self.assertEquals(256, future.result()) - - def test_submit_keyword(self): - future = self.executor.submit(mul, 2, y=8) - self.assertEquals(16, future.result()) - - def test_map(self): - self.assertEqual( - list(self.executor.map(pow, range(10), range(10))), - list(map(pow, range(10), range(10)))) - - def test_map_exception(self): - i = self.executor.map(divmod, [1, 1, 1, 1], [2, 3, 0, 5]) - self.assertEqual(i.__next__(), (0, 1)) - self.assertEqual(i.__next__(), (0, 1)) - self.assertRaises(ZeroDivisionError, i.__next__) - - def test_map_timeout(self): - results = [] - timeout_call = MapCall() - try: - try: - for i in self.executor.map(timeout_call, - [False, False, True], - timeout=1): - results.append(i) - except futures.TimeoutError: - pass - else: - self.fail('expected TimeoutError') - finally: - timeout_call.close() - - self.assertEquals([42, 42], results) - -class ThreadPoolExecutorTest(ExecutorTest): - def setUp(self): - self.executor = futures.ThreadPoolExecutor(max_workers=1) - - def tearDown(self): - self.executor.shutdown(wait=True) - -class ProcessPoolExecutorTest(ExecutorTest): - def setUp(self): - self.executor = futures.ProcessPoolExecutor(max_workers=1) - - def tearDown(self): - self.executor.shutdown(wait=True) - -class FutureTests(unittest.TestCase): - def test_done_callback_with_result(self): - callback_result = None - def fn(callback_future): - nonlocal callback_result - callback_result = callback_future.result() - - f = Future() - f.add_done_callback(fn) - f.set_result(5) - self.assertEquals(5, callback_result) - - def test_done_callback_with_exception(self): - callback_exception = None - def fn(callback_future): - nonlocal callback_exception - callback_exception = callback_future.exception() - - f = Future() - f.add_done_callback(fn) - f.set_exception(Exception('test')) - self.assertEquals(('test',), callback_exception.args) - - def test_done_callback_with_cancel(self): - was_cancelled = None - def fn(callback_future): - nonlocal was_cancelled - was_cancelled = callback_future.cancelled() - - f = Future() - f.add_done_callback(fn) - self.assertTrue(f.cancel()) - self.assertTrue(was_cancelled) - - def test_done_callback_raises(self): - LOGGER.removeHandler(STDERR_HANDLER) - logging_stream = io.StringIO() - handler = logging.StreamHandler(logging_stream) - LOGGER.addHandler(handler) - try: - raising_was_called = False - fn_was_called = False - - def raising_fn(callback_future): - nonlocal raising_was_called - raising_was_called = True - raise Exception('doh!') - - def fn(callback_future): - nonlocal fn_was_called - fn_was_called = True - - f = Future() - f.add_done_callback(raising_fn) - f.add_done_callback(fn) - f.set_result(5) - self.assertTrue(raising_was_called) - self.assertTrue(fn_was_called) - self.assertIn('Exception: doh!', logging_stream.getvalue()) - finally: - LOGGER.removeHandler(handler) - LOGGER.addHandler(STDERR_HANDLER) - - def test_done_callback_already_successful(self): - callback_result = None - def fn(callback_future): - nonlocal callback_result - callback_result = callback_future.result() - - f = Future() - f.set_result(5) - f.add_done_callback(fn) - self.assertEquals(5, callback_result) - - def test_done_callback_already_failed(self): - callback_exception = None - def fn(callback_future): - nonlocal callback_exception - callback_exception = callback_future.exception() - - f = Future() - f.set_exception(Exception('test')) - f.add_done_callback(fn) - self.assertEquals(('test',), callback_exception.args) - - def test_done_callback_already_cancelled(self): - was_cancelled = None - def fn(callback_future): - nonlocal was_cancelled - was_cancelled = callback_future.cancelled() - - f = Future() - self.assertTrue(f.cancel()) - f.add_done_callback(fn) - self.assertTrue(was_cancelled) - - def test_repr(self): - self.assertRegexpMatches(repr(PENDING_FUTURE), - '<Future at 0x[0-9a-f]+ state=pending>') - self.assertRegexpMatches(repr(RUNNING_FUTURE), - '<Future at 0x[0-9a-f]+ state=running>') - self.assertRegexpMatches(repr(CANCELLED_FUTURE), - '<Future at 0x[0-9a-f]+ state=cancelled>') - self.assertRegexpMatches(repr(CANCELLED_AND_NOTIFIED_FUTURE), - '<Future at 0x[0-9a-f]+ state=cancelled>') - self.assertRegexpMatches( - repr(EXCEPTION_FUTURE), - '<Future at 0x[0-9a-f]+ state=finished raised IOError>') - self.assertRegexpMatches( - repr(SUCCESSFUL_FUTURE), - '<Future at 0x[0-9a-f]+ state=finished returned int>') - - - def test_cancel(self): - f1 = create_future(state=PENDING) - f2 = create_future(state=RUNNING) - f3 = create_future(state=CANCELLED) - f4 = create_future(state=CANCELLED_AND_NOTIFIED) - f5 = create_future(state=FINISHED, exception=IOError()) - f6 = create_future(state=FINISHED, result=5) - - self.assertTrue(f1.cancel()) - self.assertEquals(f1._state, CANCELLED) - - self.assertFalse(f2.cancel()) - self.assertEquals(f2._state, RUNNING) - - self.assertTrue(f3.cancel()) - self.assertEquals(f3._state, CANCELLED) - - self.assertTrue(f4.cancel()) - self.assertEquals(f4._state, CANCELLED_AND_NOTIFIED) - - self.assertFalse(f5.cancel()) - self.assertEquals(f5._state, FINISHED) - - self.assertFalse(f6.cancel()) - self.assertEquals(f6._state, FINISHED) - - def test_cancelled(self): - self.assertFalse(PENDING_FUTURE.cancelled()) - self.assertFalse(RUNNING_FUTURE.cancelled()) - self.assertTrue(CANCELLED_FUTURE.cancelled()) - self.assertTrue(CANCELLED_AND_NOTIFIED_FUTURE.cancelled()) - self.assertFalse(EXCEPTION_FUTURE.cancelled()) - self.assertFalse(SUCCESSFUL_FUTURE.cancelled()) - - def test_done(self): - self.assertFalse(PENDING_FUTURE.done()) - self.assertFalse(RUNNING_FUTURE.done()) - self.assertTrue(CANCELLED_FUTURE.done()) - self.assertTrue(CANCELLED_AND_NOTIFIED_FUTURE.done()) - self.assertTrue(EXCEPTION_FUTURE.done()) - self.assertTrue(SUCCESSFUL_FUTURE.done()) - - def test_running(self): - self.assertFalse(PENDING_FUTURE.running()) - self.assertTrue(RUNNING_FUTURE.running()) - self.assertFalse(CANCELLED_FUTURE.running()) - self.assertFalse(CANCELLED_AND_NOTIFIED_FUTURE.running()) - self.assertFalse(EXCEPTION_FUTURE.running()) - self.assertFalse(SUCCESSFUL_FUTURE.running()) - - def test_result_with_timeout(self): - self.assertRaises(futures.TimeoutError, - PENDING_FUTURE.result, timeout=0) - self.assertRaises(futures.TimeoutError, - RUNNING_FUTURE.result, timeout=0) - self.assertRaises(futures.CancelledError, - CANCELLED_FUTURE.result, timeout=0) - self.assertRaises(futures.CancelledError, - CANCELLED_AND_NOTIFIED_FUTURE.result, timeout=0) - self.assertRaises(IOError, EXCEPTION_FUTURE.result, timeout=0) - self.assertEqual(SUCCESSFUL_FUTURE.result(timeout=0), 42) - - def test_result_with_success(self): - # TODO(brian@sweetapp.com): This test is timing dependant. - def notification(): - # Wait until the main thread is waiting for the result. - time.sleep(1) - f1.set_result(42) - - f1 = create_future(state=PENDING) - t = threading.Thread(target=notification) - t.start() - - self.assertEquals(f1.result(timeout=5), 42) - - def test_result_with_cancel(self): - # TODO(brian@sweetapp.com): This test is timing dependant. - def notification(): - # Wait until the main thread is waiting for the result. - time.sleep(1) - f1.cancel() - - f1 = create_future(state=PENDING) - t = threading.Thread(target=notification) - t.start() - - self.assertRaises(futures.CancelledError, f1.result, timeout=5) - - def test_exception_with_timeout(self): - self.assertRaises(futures.TimeoutError, - PENDING_FUTURE.exception, timeout=0) - self.assertRaises(futures.TimeoutError, - RUNNING_FUTURE.exception, timeout=0) - self.assertRaises(futures.CancelledError, - CANCELLED_FUTURE.exception, timeout=0) - self.assertRaises(futures.CancelledError, - CANCELLED_AND_NOTIFIED_FUTURE.exception, timeout=0) - self.assertTrue(isinstance(EXCEPTION_FUTURE.exception(timeout=0), - IOError)) - self.assertEqual(SUCCESSFUL_FUTURE.exception(timeout=0), None) - - def test_exception_with_success(self): - def notification(): - # Wait until the main thread is waiting for the exception. - time.sleep(1) - with f1._condition: - f1._state = FINISHED - f1._exception = IOError() - f1._condition.notify_all() - - f1 = create_future(state=PENDING) - t = threading.Thread(target=notification) - t.start() - - self.assertTrue(isinstance(f1.exception(timeout=5), IOError)) - -def test_main(): - test.support.run_unittest(ProcessPoolExecutorTest, - ThreadPoolExecutorTest, - ProcessPoolWaitTests, - ThreadPoolWaitTests, - ProcessPoolAsCompletedTests, - ThreadPoolAsCompletedTests, - FutureTests, - ProcessPoolShutdownTest, - ThreadPoolShutdownTest) - -if __name__ == "__main__": - test_main() diff --git a/setup.py b/setup.py new file mode 100755 index 0000000..744ca96 --- /dev/null +++ b/setup.py @@ -0,0 +1,32 @@ +#!/usr/bin/env python +import sys + +extras = {} +try: + from setuptools import setup + if sys.version_info < (2, 6): + extras['install_requires'] = ['multiprocessing'] +except: + from distutils.core import setup + +setup(name='futures', + version='2.1', + description='Backport of the concurrent.futures package from Python 3.2', + author='Brian Quinlan', + author_email='brian@sweetapp.com', + maintainer='Alex Gronholm', + maintainer_email='alex.gronholm+pypi@nextday.fi', + url='http://code.google.com/p/pythonfutures', + download_url='http://pypi.python.org/pypi/futures3/', + packages=['futures', 'concurrent.futures'], + license='BSD', + classifiers=['License :: OSI Approved :: BSD License', + 'Development Status :: 5 - Production/Stable', + 'Intended Audience :: Developers', + 'Programming Language :: Python :: 2.5', + 'Programming Language :: Python :: 2.6', + 'Programming Language :: Python :: 2.7', + 'Programming Language :: Python :: 3', + 'Programming Language :: Python :: 3.1'], + **extras + ) diff --git a/python2/test_futures.py b/test_futures.py index 2d5672b..bbc21a0 100644 --- a/python2/test_futures.py +++ b/test_futures.py @@ -1,22 +1,33 @@ +from __future__ import with_statement import logging import multiprocessing import re -import StringIO import sys import threading -from test import test_support import time import unittest +try: + from StringIO import StringIO +except ImportError: + from io import StringIO + +try: + from test.test_support import run_unittest +except ImportError: + from test.support import run_unittest + +if sys.version_info < (3, 0): + next = lambda x: x.next() + if sys.platform.startswith('win'): import ctypes import ctypes.wintypes -import futures -from futures._base import ( +from concurrent import futures +from concurrent.futures._base import ( PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future, - LOGGER, STDERR_HANDLER, wait) -import futures.process + LOGGER, STDERR_HANDLER) def create_future(state=PENDING, exception=None, result=None): f = Future() @@ -539,9 +550,9 @@ class ExecutorTest(unittest.TestCase): def test_map_exception(self): i = self.executor.map(divmod, [1, 1, 1, 1], [2, 3, 0, 5]) - self.assertEqual(i.next(), (0, 1)) - self.assertEqual(i.next(), (0, 1)) - self.assertRaises(ZeroDivisionError, i.next) + self.assertEqual(next(i), (0, 1)) + self.assertEqual(next(i), (0, 1)) + self.assertRaises(ZeroDivisionError, next, i) def test_map_timeout(self): results = [] @@ -608,7 +619,7 @@ class FutureTests(unittest.TestCase): def test_done_callback_raises(self): LOGGER.removeHandler(STDERR_HANDLER) - logging_stream = StringIO.StringIO() + logging_stream = StringIO() handler = logging.StreamHandler(logging_stream) LOGGER.addHandler(handler) try: @@ -679,7 +690,6 @@ class FutureTests(unittest.TestCase): '<Future at 0x[0-9a-f]+L? state=finished returned int>', repr(SUCCESSFUL_FUTURE))) - def test_cancel(self): f1 = create_future(state=PENDING) f2 = create_future(state=RUNNING) @@ -797,15 +807,15 @@ class FutureTests(unittest.TestCase): self.assertTrue(isinstance(f1.exception(timeout=5), IOError)) def test_main(): - test_support.run_unittest(ProcessPoolExecutorTest, - ThreadPoolExecutorTest, - ProcessPoolWaitTests, - ThreadPoolWaitTests, - ProcessPoolAsCompletedTests, - ThreadPoolAsCompletedTests, - FutureTests, - ProcessPoolShutdownTest, - ThreadPoolShutdownTest) + run_unittest(ProcessPoolExecutorTest, + ThreadPoolExecutorTest, + ProcessPoolWaitTests, + ThreadPoolWaitTests, + ProcessPoolAsCompletedTests, + ThreadPoolAsCompletedTests, + FutureTests, + ProcessPoolShutdownTest, + ThreadPoolShutdownTest) if __name__ == "__main__": test_main() @@ -0,0 +1,11 @@ +[tox] +envlist = py25,py26,py27,py31 + +[testenv] +commands={envpython} test_futures.py [] + +#[testenv:py24] +#deps=multiprocessing +# +#[testenv:py25] +#deps=multiprocessing |