summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorbrian.quinlan <devnull@localhost>2009-05-04 21:02:48 +0000
committerbrian.quinlan <devnull@localhost>2009-05-04 21:02:48 +0000
commita12b38ce97773a42379c032cb4003411c094c030 (patch)
tree48b6a806f5e7320eec8cd4fe15b5a55cb570ff67
parent3b24ddaa96d138bac457f5d394c3a71947ad2d15 (diff)
downloadfutures-a12b38ce97773a42379c032cb4003411c094c030.tar.gz
First maybe-working version of process pools.
-rw-r--r--crawl.py8
-rw-r--r--futures/__init__.py3
-rw-r--r--futures/_base.py261
-rw-r--r--futures/process.py165
-rw-r--r--futures/thread.py274
-rw-r--r--primes.py46
-rw-r--r--test_futures.py123
7 files changed, 491 insertions, 389 deletions
diff --git a/crawl.py b/crawl.py
index 57f9e8e..74c4436 100644
--- a/crawl.py
+++ b/crawl.py
@@ -11,7 +11,7 @@ URLS = ['http://www.google.com/',
'http://www.thisurlprobablydoesnotexist.com',
'http://www.slashdot.org/',
'http://www.python.org/',
- 'http://www.sweetapp.com/']
+ 'http://www.sweetapp.com/'] * 1000
def load_url(url, timeout):
return urllib.request.urlopen(url, timeout=timeout).read()
@@ -25,14 +25,14 @@ def download_urls(urls, timeout=60):
pass
return url_to_content
-executor = futures.thread.ThreadPoolExecutor(max_threads=100)
+executor = futures.ProcessPoolExecutor(100)
def download_urls_with_futures(urls, timeout=60):
url_to_content = {}
fs = executor.run(
(functools.partial(load_url, url, timeout) for url in urls),
timeout=timeout)
- for url, future in zip(urls, fs.result_futures()):
+ for url, future in zip(urls, fs.successful_futures()):
url_to_content[url] = future.result()
return url_to_content
-print(download_urls(URLS))
+print(download_urls_with_futures(URLS))
diff --git a/futures/__init__.py b/futures/__init__.py
index e69de29..33a3e38 100644
--- a/futures/__init__.py
+++ b/futures/__init__.py
@@ -0,0 +1,3 @@
+from futures._base import CancelledException, TimeoutException, Future, FutureList, FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED, RETURN_IMMEDIATELY
+from futures.thread import ThreadPoolExecutor
+from futures.process import ProcessPoolExecutor
diff --git a/futures/_base.py b/futures/_base.py
new file mode 100644
index 0000000..d7d3337
--- /dev/null
+++ b/futures/_base.py
@@ -0,0 +1,261 @@
+import threading
+
+FIRST_COMPLETED = 0
+FIRST_EXCEPTION = 1
+ALL_COMPLETED = 2
+RETURN_IMMEDIATELY = 3
+
+PENDING = 0
+RUNNING = 1
+CANCELLED = 2
+FINISHED = 3
+
+_STATE_TO_DESCRIPTION_MAP = {
+ PENDING: "pending",
+ RUNNING: "running",
+ CANCELLED: "cancelled",
+ FINISHED: "finished"
+}
+
+class CancelledException(Exception):
+ pass
+
+class TimeoutException(Exception):
+ pass
+
+class Future(object):
+ def __init__(self):
+ self._condition = threading.Condition()
+ self._state = PENDING
+ self._result = None
+ self._exception = None
+
+ def __repr__(self):
+ with self._condition:
+ if self._state == FINISHED:
+ if self._exception:
+ return '<Future state=%s raised %s>' % (
+ _STATE_TO_DESCRIPTION_MAP[self._state],
+ self._exception.__class__.__name__)
+ else:
+ return '<Future state=%s returned %s>' % (
+ _STATE_TO_DESCRIPTION_MAP[self._state],
+ self._result.__class__.__name__)
+ return '<Future state=%s>' % _STATE_TO_DESCRIPTION_MAP[self._state]
+
+ def cancel(self):
+ with self._condition:
+ if self._state in [RUNNING, FINISHED]:
+ return False
+
+ self._state = CANCELLED
+ return True
+
+ def cancelled(self):
+ with self._condition:
+ return self._state == CANCELLED
+
+ def done(self):
+ with self._condition:
+ return self._state in [CANCELLED, FINISHED]
+
+ def __get_result(self):
+ if self._exception:
+ raise self._exception
+ else:
+ return self._result
+
+ def result(self, timeout=None):
+ with self._condition:
+ if self._state == CANCELLED:
+ raise CancelledException()
+ elif self._state == FINISHED:
+ return self.__get_result()
+
+ self._condition.wait(timeout)
+
+ if self._state == CANCELLED:
+ raise CancelledException()
+ elif self._state == FINISHED:
+ return self.__get_result()
+ else:
+ raise TimeoutException()
+
+ def exception(self, timeout=None):
+ with self._condition:
+ if self._state == CANCELLED:
+ raise CancelledException()
+ elif self._state == FINISHED:
+ return self._exception
+
+ self._condition.wait(timeout)
+
+ if self._state == CANCELLED:
+ raise CancelledException()
+ elif self._state == FINISHED:
+ return self._exception
+ else:
+ raise TimeoutException()
+
+class _NullWaitTracker(object):
+ def add_result(self):
+ pass
+
+ def add_exception(self):
+ pass
+
+ def add_cancelled(self):
+ pass
+
+class _FirstCompletedWaitTracker(object):
+ def __init__(self):
+ self.event = threading.Event()
+
+ def add_result(self):
+ self.event.set()
+
+ def add_exception(self):
+ self.event.set()
+
+ def add_cancelled(self):
+ self.event.set()
+
+class _AllCompletedWaitTracker(object):
+ def __init__(self, pending_calls, stop_on_exception):
+ self.event = threading.Event()
+ self.pending_calls = pending_calls
+ self.stop_on_exception = stop_on_exception
+
+ def add_result(self):
+ self.pending_calls -= 1
+ if not self.pending_calls:
+ self.event.set()
+
+ def add_exception(self):
+ self.add_result()
+ if self.stop_on_exception:
+ self.event.set()
+
+ def add_cancelled(self):
+ self.add_result()
+
+class ThreadEventSink(object):
+ def __init__(self):
+ self._condition = threading.Lock()
+ self._waiters = []
+
+ def add(self, e):
+ self._waiters.append(e)
+
+ def add_result(self):
+ with self._condition:
+ for waiter in self._waiters:
+ waiter.add_result()
+
+ def add_exception(self):
+ with self._condition:
+ for waiter in self._waiters:
+ waiter.add_exception()
+
+ def add_cancelled(self):
+ with self._condition:
+ for waiter in self._waiters:
+ waiter.add_cancelled()
+
+class FutureList(object):
+ def __init__(self, futures, event_sink):
+ self._futures = futures
+ self._event_sink = event_sink
+
+ def wait(self, timeout=None, run_until=ALL_COMPLETED):
+ with self._event_sink._condition:
+ if all(f.done() for f in self):
+ return
+
+ if run_until == FIRST_COMPLETED:
+ m = _FirstCompletedWaitTracker()
+ elif run_until == FIRST_EXCEPTION:
+ m = _AllCompletedWaitTracker(len(self), stop_on_exception=True)
+ elif run_until == ALL_COMPLETED:
+ m = _AllCompletedWaitTracker(len(self), stop_on_exception=False)
+ elif run_until == RETURN_IMMEDIATELY:
+ m = _NullWaitTracker()
+ else:
+ raise ValueError()
+
+ self._event_sink.add(m)
+
+ if run_until != RETURN_IMMEDIATELY:
+ m.event.wait(timeout)
+
+ def cancel(self, timeout=None):
+ for f in self:
+ f.cancel()
+ self.wait(timeout=timeout, run_until=ALL_COMPLETED)
+ if any(not f.done() for f in self):
+ raise TimeoutException()
+
+ def has_running_futures(self):
+ return bool(self.running_futures())
+
+ def has_cancelled_futures(self):
+ return bool(self.cancelled_futures())
+
+ def has_done_futures(self):
+ return bool(self.done_futures())
+
+ def has_successful_futures(self):
+ return bool(self.successful_futures())
+
+ def has_exception_futures(self):
+ return bool(self.exception_futures())
+
+ def running_futures(self):
+ return [f for f in self if not f.done() and not f.cancelled()]
+
+ def cancelled_futures(self):
+ return [f for f in self if f.cancelled()]
+
+ def done_futures(self):
+ return [f for f in self if f.done()]
+
+ def successful_futures(self):
+ return [f for f in self
+ if f.done() and not f.cancelled() and f.exception() is None]
+
+ def exception_futures(self):
+ return [f for f in self if f.done() and f.exception() is not None]
+
+ def __getitem__(self, i):
+ return self._futures[i]
+
+ def __len__(self):
+ return len(self._futures)
+
+ def __iter__(self):
+ return iter(self._futures)
+
+ def __contains__(self, f):
+ return f in self._futures
+
+ def __repr__(self):
+ return ('<FutureList #futures=%d '
+ '[#success=%d #exception=%d #cancelled=%d]>' % (
+ len(self),
+ len(self.successful_futures()),
+ len(self.exception_futures()),
+ len(self.cancelled_futures())))
+
+import functools
+class Executor(object):
+ def map(self, fn, iter):
+ calls = [functools.partial(fn, a) for a in iter]
+ return self.runXXX(calls)
+
+ def runXXX(self, calls):
+ fs = self.run(calls, timeout=None, run_until=FIRST_EXCEPTION)
+
+ if fs.has_exception_futures():
+ raise fs.exception_futures()[0].exception()
+ else:
+ return [f.result() for f in fs]
diff --git a/futures/process.py b/futures/process.py
index 945303f..499eeb6 100644
--- a/futures/process.py
+++ b/futures/process.py
@@ -1,94 +1,153 @@
#!/usr/bin/env python
+from futures._base import RUNNING, FINISHED, Executor, ALL_COMPLETED, ThreadEventSink, Future, FutureList
+import queue
+import multiprocessing
+import threading
+
class _WorkItem(object):
def __init__(self, call, future, completion_tracker):
self.call = call
self.future = future
self.completion_tracker = completion_tracker
- def run(self):
- if self.future.cancelled():
- with self.future._condition:
- self.future._condition.notify_all()
- self.completion_tracker.add_cancelled()
- return
+class _ResultItem(object):
+ def __init__(self, work_id, exception=None, result=None):
+ self.work_id = work_id
+ self.exception = exception
+ self.result = result
+
+class _CallItem(object):
+ def __init__(self, work_id, call):
+ self.work_id = work_id
+ self.call = call
- self.future._state = _RUNNING
+def _process_worker(call_queue, result_queue, shutdown):
+ while True:
try:
- r = self.call()
- except BaseException as e:
- with self.future._condition:
- self.future._exception = e
- self.future._state = _FINISHED
- self.future._condition.notify_all()
- self.completion_tracker.add_exception()
+ call_item = call_queue.get(block=True, timeout=0.1)
+ except queue.Empty:
+ if shutdown.is_set():
+ return
else:
- with self.future._condition:
- self.future._result = r
- self.future._state = _FINISHED
- self.future._condition.notify_all()
- self.completion_tracker.add_result()
-
-class XXX:
- def wait(self, timeout=None, run_until=ALL_COMPLETED):
-
- pass
-
-class ProcessPoolExecutor(object):
- def __init__(self, max_processes):
+ try:
+ r = call_item.call()
+ except BaseException as e:
+ result_queue.put(_ResultItem(call_item.work_id,
+ exception=e))
+ else:
+ result_queue.put(_ResultItem(call_item.work_id,
+ result=r))
+
+class ProcessPoolExecutor(Executor):
+ def __init__(self, max_processes=None):
+ if max_processes is None:
+ try:
+ max_processes = multiprocessing.cpu_count()
+ except NotImplementedError:
+ max_processes = 16
+
self._max_processes = max_processes
- self._work_queue = multiprocessing.Queue()
+ self._call_queue = multiprocessing.Queue(self._max_processes + 1)
+ self._result_queue = multiprocessing.Queue()
+ self._work_ids = queue.Queue()
+ self._queue_management_thread = None
self._processes = set()
self._shutdown = False
+ self._shutdown_process_event = multiprocessing.Event()
self._lock = threading.Lock()
self._queue_count = 0
- self._pending_futures = {}
+ self._pending_work_items = {}
+
- def _(self):
+ def _add_call_item_to_queue(self):
while True:
try:
+ work_id = self._work_ids.get(block=False)
+ except queue.Empty:
+ return
+ else:
+ work_item = self._pending_work_items[work_id]
+
+ if work_item.future.cancelled():
+ with work_item.future._condition:
+ work_item.future._condition.notify_all()
+ work_item.completion_tracker.add_cancelled()
+ continue
+ else:
+ with work_item.future._condition:
+ work_item.future._state = RUNNING
+
+ self._call_queue.put(_CallItem(work_id, work_item.call),
+ block=True)
+ if self._call_queue.full():
+ return
+
+ def _result(self):
+ while True:
+ self._add_call_item_to_queue()
+ try:
result_item = self._result_queue.get(block=True,
timeout=0.1)
- except multiprocessing.TimeoutError:
- if self._shutdown:
+ except queue.Empty:
+ if self._shutdown and not self._pending_work_items:
+ self._shutdown_process_event.set()
return
else:
- completion_tracker, future = self._pending_futures[
- result_item.index]
-
+ work_item = self._pending_work_items[result_item.work_id]
+ del self._pending_work_items[result_item.work_id]
+
if result_item.exception:
- with future._condition:
- future._exception = result_item.exception
- future._state = _FINISHED
- future._condition.notify_all()
- completion_tracker.add_exception()
+ with work_item.future._condition:
+ work_item.future._exception = result_item.exception
+ work_item.future._state = FINISHED
+ work_item.future._condition.notify_all()
+ work_item.completion_tracker.add_exception()
else:
- with future._condition:
- future._result = result_item.result
- future._state = _FINISHED
- future._condition.notify_all()
- completion_tracker.add_result()
-
-
+ with work_item.future._condition:
+ work_item.future._result = result_item.result
+ work_item.future._state = FINISHED
+ work_item.future._condition.notify_all()
+ work_item.completion_tracker.add_result()
def _adjust_process_count(self):
-
+ if self._queue_management_thread is None:
+ self._queue_management_thread = threading.Thread(
+ target=self._result)
+ self._queue_management_thread.daemon = True
+ self._queue_management_thread.start()
+
+ for _ in range(len(self._processes), self._max_processes):
+ p = multiprocessing.Process(
+ target=_process_worker,
+ args=(self._call_queue,
+ self._result_queue,
+ self._shutdown_process_event))
+ p.daemon = True
+ p.start()
+ self._processes.add(p)
+
def run(self, calls, timeout=None, run_until=ALL_COMPLETED):
with self._lock:
if self._shutdown:
raise RuntimeError()
futures = []
- event_sink = _ThreadEventSink()
+ event_sink = ThreadEventSink()
+ self._queue_count
for call in calls:
f = Future()
- w = _WorkItem(call, f, event_sink)
- self._work_queue.put(w)
+ self._pending_work_items[self._queue_count] = _WorkItem(
+ call, f, event_sink)
+ self._work_ids.put(self._queue_count)
futures.append(f)
self._queue_count += 1
-
- print('futures:', futures)
+
self._adjust_process_count()
fl = FutureList(futures, event_sink)
fl.wait(timeout=timeout, run_until=run_until)
return fl
+
+ def shutdown(self):
+ with self._lock:
+ self._shutdown = True
diff --git a/futures/thread.py b/futures/thread.py
index a4c779d..3f1b807 100644
--- a/futures/thread.py
+++ b/futures/thread.py
@@ -1,259 +1,9 @@
#!/usr/bin/env python
+from futures._base import RUNNING, FINISHED, Executor, ALL_COMPLETED, ThreadEventSink, Future, FutureList
import queue
import threading
-FIRST_COMPLETED = 0
-FIRST_EXCEPTION = 1
-ALL_COMPLETED = 2
-RETURN_IMMEDIATELY = 3
-
-_PENDING = 0
-_RUNNING = 1
-_CANCELLED = 2
-_FINISHED = 3
-
-_STATE_TO_DESCRIPTION_MAP = {
- _PENDING: "pending",
- _RUNNING: "running",
- _CANCELLED: "cancelled",
- _FINISHED: "finished"
-}
-
-class CancelledException(Exception):
- pass
-
-class TimeoutException(Exception):
- pass
-
-class Future(object):
- def __init__(self):
- self._condition = threading.Condition()
- self._state = _PENDING
- self._result = None
- self._exception = None
-
- def __repr__(self):
- with self._condition:
- if self._state == _FINISHED:
- if self._exception:
- return '<Future state=%s raised %s>' % (
- _STATE_TO_DESCRIPTION_MAP[self._state],
- self._exception.__class__.__name__)
- else:
- return '<Future state=%s returned %s>' % (
- _STATE_TO_DESCRIPTION_MAP[self._state],
- self._result.__class__.__name__)
- return '<Future state=%s>' % _STATE_TO_DESCRIPTION_MAP[self._state]
-
- def cancel(self):
- with self._condition:
- if self._state in [_RUNNING, _FINISHED]:
- return False
-
- self._state = _CANCELLED
- return True
-
- def cancelled(self):
- with self._condition:
- return self._state == _CANCELLED
-
- def done(self):
- with self._condition:
- return self._state in [_CANCELLED, _FINISHED]
-
- def __get_result(self):
- if self._exception:
- raise self._exception
- else:
- return self._result
-
- def result(self, timeout=None):
- with self._condition:
- if self._state == _CANCELLED:
- raise CancelledException()
- elif self._state == _FINISHED:
- return self.__get_result()
-
- print('Waiting...')
- self._condition.wait(timeout)
- print('Post Waiting...')
-
- if self._state == _CANCELLED:
- raise CancelledException()
- elif self._state == _FINISHED:
- return self.__get_result()
- else:
- raise TimeoutException()
-
- def exception(self, timeout=None):
- with self._condition:
- if self._state == _CANCELLED:
- raise CancelledException()
- elif self._state == _FINISHED:
- return self._exception
-
- self._condition.wait(timeout)
-
- if self._state == _CANCELLED:
- raise CancelledException()
- elif self._state == _FINISHED:
- return self._exception
- else:
- raise TimeoutException()
-
-class _NullWaitTracker(object):
- def add_result(self):
- pass
-
- def add_exception(self):
- pass
-
- def add_cancelled(self):
- pass
-
-class _FirstCompletedWaitTracker(object):
- def __init__(self):
- self.event = threading.Event()
-
- def add_result(self):
- self.event.set()
-
- def add_exception(self):
- self.event.set()
-
- def add_cancelled(self):
- self.event.set()
-
-class _AllCompletedWaitTracker(object):
- def __init__(self, pending_calls, stop_on_exception):
- self.event = threading.Event()
- self.pending_calls = pending_calls
- self.stop_on_exception = stop_on_exception
-
- def add_result(self):
- self.pending_calls -= 1
- if not self.pending_calls:
- self.event.set()
-
- def add_exception(self):
- self.add_result()
- if self.stop_on_exception:
- self.event.set()
-
- def add_cancelled(self):
- self.add_result()
-
-class _ThreadEventSink(object):
- def __init__(self):
- self._condition = threading.Lock()
- self._waiters = []
-
- def add(self, e):
- self._waiters.append(e)
-
- def add_result(self):
- with self._condition:
- for waiter in self._waiters:
- waiter.add_result()
-
- def add_exception(self):
- with self._condition:
- for waiter in self._waiters:
- waiter.add_exception()
-
- def add_cancelled(self):
- with self._condition:
- for waiter in self._waiters:
- waiter.add_cancelled()
-
-class FutureList(object):
- def __init__(self, futures, event_sink):
- self._futures = futures
- self._event_sink = event_sink
-
- def wait(self, timeout=None, run_until=ALL_COMPLETED):
- with self._event_sink._condition:
- print('WAIT 123')
- if all(f.done() for f in self):
- return
- print('WAIT 1234')
-
- if run_until == FIRST_COMPLETED:
- m = _FirstCompletedWaitTracker()
- elif run_until == FIRST_EXCEPTION:
- m = _AllCompletedWaitTracker(len(self), stop_on_exception=True)
- elif run_until == ALL_COMPLETED:
- m = _AllCompletedWaitTracker(len(self), stop_on_exception=False)
- elif run_until == RETURN_IMMEDIATELY:
- m = _NullWaitTracker()
- else:
- raise ValueError()
-
- self._event_sink.add(m)
-
- if run_until != RETURN_IMMEDIATELY:
- print('WAIT 12345', timeout)
- m.event.wait(timeout)
-
- def cancel(self, timeout=None):
- for f in self:
- f.cancel()
- self.wait(timeout=timeout, run_until=ALL_COMPLETED)
- if any(not f.done() for f in self):
- raise TimeoutException()
-
- def has_running_futures(self):
- return bool(self.running_futures())
-
- def has_cancelled_futures(self):
- return bool(self.cancelled_futures())
-
- def has_done_futures(self):
- return bool(self.done_futures())
-
- def has_successful_futures(self):
- return bool(self.successful_futures())
-
- def has_exception_futures(self):
- return bool(self.exception_futures())
-
- def running_futures(self):
- return [f for f in self if not f.done() and not f.cancelled()]
-
- def cancelled_futures(self):
- return [f for f in self if f.cancelled()]
-
- def done_futures(self):
- return [f for f in self if f.done()]
-
- def successful_futures(self):
- return [f for f in self
- if f.done() and not f.cancelled() and f.exception() is None]
-
- def exception_futures(self):
- return [f for f in self if f.done() and f.exception() is not None]
-
- def __getitem__(self, i):
- return self._futures[i]
-
- def __len__(self):
- return len(self._futures)
-
- def __iter__(self):
- return iter(self._futures)
-
- def __contains__(self, f):
- return f in self._futures
-
- def __repr__(self):
- return ('<FutureList #futures=%d '
- '[#success=%d #exception=%d #cancelled=%d]>' % (
- len(self),
- len(self.successful_futures()),
- len(self.exception_futures()),
- len(self.cancelled_futures())))
-
class _WorkItem(object):
def __init__(self, call, future, completion_tracker):
self.call = call
@@ -267,23 +17,25 @@ class _WorkItem(object):
self.completion_tracker.add_cancelled()
return
- self.future._state = _RUNNING
+ with self.future._condition:
+ self.future._state = RUNNING
+
try:
r = self.call()
except BaseException as e:
with self.future._condition:
self.future._exception = e
- self.future._state = _FINISHED
+ self.future._state = FINISHED
self.future._condition.notify_all()
self.completion_tracker.add_exception()
else:
with self.future._condition:
self.future._result = r
- self.future._state = _FINISHED
+ self.future._state = FINISHED
self.future._condition.notify_all()
self.completion_tracker.add_result()
-class ThreadPoolExecutor(object):
+class ThreadPoolExecutor(Executor):
def __init__(self, max_threads):
self._max_threads = max_threads
self._work_queue = queue.Queue()
@@ -308,7 +60,6 @@ class ThreadPoolExecutor(object):
def _adjust_thread_count(self):
for _ in range(len(self._threads),
min(self._max_threads, self._work_queue.qsize())):
- print('Creating a thread')
t = threading.Thread(target=self._worker)
t.daemon = True
t.start()
@@ -320,27 +71,18 @@ class ThreadPoolExecutor(object):
raise RuntimeError()
futures = []
- event_sink = _ThreadEventSink()
+ event_sink = ThreadEventSink()
for call in calls:
f = Future()
w = _WorkItem(call, f, event_sink)
self._work_queue.put(w)
futures.append(f)
- print('futures:', futures)
self._adjust_thread_count()
fl = FutureList(futures, event_sink)
fl.wait(timeout=timeout, run_until=run_until)
return fl
- def runXXX(self, calls, timeout=None):
- fs = self.run(calls, timeout, run_util=FIRST_EXCEPTION)
-
- if fs.has_exception_futures():
- raise fs.exception_futures()[0].exception()
- else:
- return [f.result() for f in fs]
-
def shutdown(self):
with self._lock:
self._shutdown = True
diff --git a/primes.py b/primes.py
new file mode 100644
index 0000000..4397c67
--- /dev/null
+++ b/primes.py
@@ -0,0 +1,46 @@
+import futures
+import math
+import time
+
+PRIMES = [
+ 112272535095293,
+ 112582705942171,
+ 112272535095293,
+ 115280095190773,
+ 115797848077099]
+
+def is_prime(n):
+ n = abs(n)
+ i = 2
+ while i <= math.sqrt(n):
+ if n % i == 0:
+ return False
+ i += 1
+ return True
+
+def sequential():
+ return list(map(is_prime, PRIMES))
+
+def with_process_pool_executor():
+ executor = futures.ProcessPoolExecutor()
+ try:
+ return list(executor.map(is_prime, PRIMES))
+ finally:
+ executor.shutdown()
+
+def with_thread_pool_executor():
+ executor = futures.ThreadPoolExecutor(10)
+ try:
+ return list(executor.map(is_prime, PRIMES))
+ finally:
+ executor.shutdown()
+
+def main():
+ for name, fn in [('sequential', sequential),
+ ('processes', with_process_pool_executor),
+ ('threads', with_thread_pool_executor)]:
+ start = time.time()
+ fn()
+ print('%s: %.2f seconds' % (name.ljust(10), time.time() - start))
+
+main() \ No newline at end of file
diff --git a/test_futures.py b/test_futures.py
index c0f0539..91ac955 100644
--- a/test_futures.py
+++ b/test_futures.py
@@ -4,8 +4,9 @@ from test.support import verbose
import unittest
import threading
import time
+import multiprocessing
-import futures.thread as threaded_futures
+import futures
class Call(object):
def __init__(self, manual_finish=False):
@@ -27,20 +28,16 @@ class Call(object):
def __call__(self):
if self._called_event.is_set(): print('called twice')
- print('Doing call...')
self._called_event.set()
self._can_finished.wait()
- print('About to return...')
return 42
class ExceptionCall(Call):
def __call__(self):
assert not self._called_event.is_set(), 'already called'
- print('Doing exception call...')
self._called_event.set()
self._can_finished.wait()
- print('About to raise...')
raise ZeroDivisionError()
class FutureStub(object):
@@ -60,7 +57,7 @@ class FutureStub(object):
class ShutdownTest(unittest.TestCase):
def test_run_after_shutdown(self):
- self.executor = threaded_futures.ThreadPoolExecutor(max_threads=1)
+ self.executor = futures.ThreadPoolExecutor(max_threads=1)
call1 = Call()
self.executor.shutdown()
@@ -69,14 +66,14 @@ class ShutdownTest(unittest.TestCase):
[call1])
def test_threads_terminate(self):
- self.executor = threaded_futures.ThreadPoolExecutor(max_threads=5)
+ self.executor = futures.ThreadPoolExecutor(max_threads=5)
call1 = Call(manual_finish=True)
call2 = Call(manual_finish=True)
call3 = Call(manual_finish=True)
self.executor.run([call1, call2, call3],
- run_until=threaded_futures.RETURN_IMMEDIATELY)
+ run_until=futures.RETURN_IMMEDIATELY)
call1.wait_on_called()
call2.wait_on_called()
@@ -92,40 +89,36 @@ class ShutdownTest(unittest.TestCase):
t.join()
-class ConcurrentWaitsTest(unittest.TestCase):
+class WaitsTest(unittest.TestCase):
def test(self):
def aaa():
- fs.wait(run_until=threaded_futures.ALL_COMPLETED)
+ fs.wait(run_until=futures.ALL_COMPLETED)
self.assertTrue(f1.done())
self.assertTrue(f2.done())
self.assertTrue(f3.done())
self.assertTrue(f4.done())
def bbb():
- fs.wait(run_until=threaded_futures.FIRST_COMPLETED)
+ fs.wait(run_until=futures.FIRST_COMPLETED)
self.assertTrue(f1.done())
self.assertFalse(f2.done())
self.assertFalse(f3.done())
self.assertFalse(f4.done())
def ccc():
- fs.wait(run_until=threaded_futures.FIRST_EXCEPTION)
+ fs.wait(run_until=futures.FIRST_EXCEPTION)
self.assertTrue(f1.done())
self.assertTrue(f2.done())
- print('fs:', fs)
- print(f1, f2, f3, f4)
self.assertFalse(f3.done())
self.assertFalse(f4.done())
- executor = threaded_futures.ThreadPoolExecutor(max_threads=1)
-
call1 = Call(manual_finish=True)
call2 = ExceptionCall(manual_finish=True)
call3 = Call(manual_finish=True)
call4 = Call()
- fs = executor.run([call1, call2, call3, call4],
- run_until=threaded_futures.RETURN_IMMEDIATELY)
+ fs = self.executor.run([call1, call2, call3, call4],
+ run_until=futures.RETURN_IMMEDIATELY)
f1, f2, f3, f4 = fs
threads = []
@@ -144,15 +137,18 @@ class ConcurrentWaitsTest(unittest.TestCase):
call4.set_can()
for t in threads:
- print('join')
t.join()
- print('shutdown')
- executor.shutdown()
- print('done shutdown')
+ self.executor.shutdown()
+
+class ThreadPoolWaitTests(WaitsTest):
+ executor = futures.ThreadPoolExecutor(max_threads=1)
+
+class ProcessPoolWaitTests(WaitsTest):
+ executor = futures.ProcessPoolExecutor(max_processes=1)
class CancelTests(unittest.TestCase):
def test_cancel_states(self):
- executor = threaded_futures.ThreadPoolExecutor(max_threads=1)
+ executor = futures.ThreadPoolExecutor(max_threads=1)
call1 = Call(manual_finish=True)
call2 = Call()
@@ -160,7 +156,7 @@ class CancelTests(unittest.TestCase):
call4 = Call()
fs = executor.run([call1, call2, call3, call4],
- run_until=threaded_futures.RETURN_IMMEDIATELY)
+ run_until=futures.RETURN_IMMEDIATELY)
f1, f2, f3, f4 = fs
call1.wait_on_called()
@@ -177,13 +173,13 @@ class CancelTests(unittest.TestCase):
self.assertEqual(f4.done(), True)
call1.set_can()
- fs.wait(run_until=threaded_futures.ALL_COMPLETED)
+ fs.wait(run_until=futures.ALL_COMPLETED)
self.assertEqual(f1.result(), 42)
- self.assertRaises(threaded_futures.CancelledException, f2.result)
- self.assertRaises(threaded_futures.CancelledException, f2.exception)
+ self.assertRaises(futures.CancelledException, f2.result)
+ self.assertRaises(futures.CancelledException, f2.exception)
self.assertEqual(f3.result(), 42)
- self.assertRaises(threaded_futures.CancelledException, f4.result)
- self.assertRaises(threaded_futures.CancelledException, f4.exception)
+ self.assertRaises(futures.CancelledException, f4.result)
+ self.assertRaises(futures.CancelledException, f4.exception)
self.assertEqual(call2.called(), False)
self.assertEqual(call4.called(), False)
@@ -191,32 +187,28 @@ class CancelTests(unittest.TestCase):
def test_wait_for_individual_cancel(self):
def end_call():
- print ('Here1')
time.sleep(1)
- print ('Here2')
f2.cancel()
- print ('Here3')
call1.set_can()
- print ('Here4')
- executor = threaded_futures.ThreadPoolExecutor(max_threads=1)
+ executor = futures.ThreadPoolExecutor(max_threads=1)
call1 = Call(manual_finish=True)
call2 = Call()
- fs = executor.run([call1, call2], run_until=threaded_futures.RETURN_IMMEDIATELY)
+ fs = executor.run([call1, call2], run_until=futures.RETURN_IMMEDIATELY)
f1, f2 = fs
call1.wait_on_called()
t = threading.Thread(target=end_call)
t.start()
- self.assertRaises(threaded_futures.CancelledException, f2.result)
- self.assertRaises(threaded_futures.CancelledException, f2.exception)
+ self.assertRaises(futures.CancelledException, f2.result)
+ self.assertRaises(futures.CancelledException, f2.exception)
t.join()
executor.shutdown()
def test_cancel_all(self):
- executor = threaded_futures.ThreadPoolExecutor(max_threads=1)
+ executor = futures.ThreadPoolExecutor(max_threads=1)
call1 = Call(manual_finish=True)
call2 = Call()
@@ -224,13 +216,11 @@ class CancelTests(unittest.TestCase):
call4 = Call()
fs = executor.run([call1, call2, call3, call4],
- run_until=threaded_futures.RETURN_IMMEDIATELY)
+ run_until=futures.RETURN_IMMEDIATELY)
f1, f2, f3, f4 = fs
call1.wait_on_called()
- print('HERE!!!')
- self.assertRaises(threaded_futures.TimeoutException, fs.cancel, timeout=0)
- print('HERE 2!!!')
+ self.assertRaises(futures.TimeoutException, fs.cancel, timeout=0)
call1.set_can()
fs.cancel()
@@ -241,12 +231,12 @@ class CancelTests(unittest.TestCase):
executor.shutdown()
def test_cancel_repr(self):
- executor = threaded_futures.ThreadPoolExecutor(max_threads=1)
+ executor = futures.ThreadPoolExecutor(max_threads=1)
call1 = Call(manual_finish=True)
call2 = Call()
- fs = executor.run([call1, call2], run_until=threaded_futures.RETURN_IMMEDIATELY)
+ fs = executor.run([call1, call2], run_until=futures.RETURN_IMMEDIATELY)
f1, f2 = fs
call1.wait_on_called()
@@ -263,7 +253,7 @@ class FutureListTests(unittest.TestCase):
f4 = FutureStub(cancelled=True, done=True)
fs = [f1, f2, f3, f4]
- f = threaded_futures.FutureList(fs, None)
+ f = futures.FutureList(fs, None)
self.assertEqual(f.running_futures(), [f1])
self.assertEqual(f.cancelled_futures(), [f4])
@@ -276,18 +266,18 @@ class FutureListTests(unittest.TestCase):
f2 = FutureStub(cancelled=False, done=True)
self.assertTrue(
- threaded_futures.FutureList([f1], None).has_running_futures())
+ futures.FutureList([f1], None).has_running_futures())
self.assertFalse(
- threaded_futures.FutureList([f2], None).has_running_futures())
+ futures.FutureList([f2], None).has_running_futures())
def test_has_cancelled_futures(self):
f1 = FutureStub(cancelled=True, done=True)
f2 = FutureStub(cancelled=False, done=True)
self.assertTrue(
- threaded_futures.FutureList([f1], None).has_cancelled_futures())
+ futures.FutureList([f1], None).has_cancelled_futures())
self.assertFalse(
- threaded_futures.FutureList([f2], None).has_cancelled_futures())
+ futures.FutureList([f2], None).has_cancelled_futures())
def test_has_done_futures(self):
f1 = FutureStub(cancelled=True, done=True)
@@ -295,11 +285,11 @@ class FutureListTests(unittest.TestCase):
f3 = FutureStub(cancelled=False, done=False)
self.assertTrue(
- threaded_futures.FutureList([f1], None).has_done_futures())
+ futures.FutureList([f1], None).has_done_futures())
self.assertTrue(
- threaded_futures.FutureList([f2], None).has_done_futures())
+ futures.FutureList([f2], None).has_done_futures())
self.assertFalse(
- threaded_futures.FutureList([f3], None).has_done_futures())
+ futures.FutureList([f3], None).has_done_futures())
def test_has_successful_futures(self):
f1 = FutureStub(cancelled=False, done=True)
@@ -308,13 +298,13 @@ class FutureListTests(unittest.TestCase):
f4 = FutureStub(cancelled=True, done=True)
self.assertTrue(
- threaded_futures.FutureList([f1], None).has_successful_futures())
+ futures.FutureList([f1], None).has_successful_futures())
self.assertFalse(
- threaded_futures.FutureList([f2], None).has_successful_futures())
+ futures.FutureList([f2], None).has_successful_futures())
self.assertFalse(
- threaded_futures.FutureList([f3], None).has_successful_futures())
+ futures.FutureList([f3], None).has_successful_futures())
self.assertFalse(
- threaded_futures.FutureList([f4], None).has_successful_futures())
+ futures.FutureList([f4], None).has_successful_futures())
def test_has_exception_futures(self):
f1 = FutureStub(cancelled=False, done=True)
@@ -323,13 +313,13 @@ class FutureListTests(unittest.TestCase):
f4 = FutureStub(cancelled=True, done=True)
self.assertFalse(
- threaded_futures.FutureList([f1], None).has_exception_futures())
+ futures.FutureList([f1], None).has_exception_futures())
self.assertTrue(
- threaded_futures.FutureList([f2], None).has_exception_futures())
+ futures.FutureList([f2], None).has_exception_futures())
self.assertFalse(
- threaded_futures.FutureList([f3], None).has_exception_futures())
+ futures.FutureList([f3], None).has_exception_futures())
self.assertFalse(
- threaded_futures.FutureList([f4], None).has_exception_futures())
+ futures.FutureList([f4], None).has_exception_futures())
def test_get_item(self):
f1 = FutureStub(cancelled=False, done=False)
@@ -337,7 +327,7 @@ class FutureListTests(unittest.TestCase):
f3 = FutureStub(cancelled=False, done=True)
fs = [f1, f2, f3]
- f = threaded_futures.FutureList(fs, None)
+ f = futures.FutureList(fs, None)
self.assertEqual(f[0], f1)
self.assertEqual(f[1], f2)
self.assertEqual(f[2], f3)
@@ -348,7 +338,7 @@ class FutureListTests(unittest.TestCase):
f2 = FutureStub(cancelled=False, done=True)
f3 = FutureStub(cancelled=False, done=True)
- f = threaded_futures.FutureList([f1, f2, f3], None)
+ f = futures.FutureList([f1, f2, f3], None)
self.assertEqual(len(f), 3)
def test_iter(self):
@@ -357,7 +347,7 @@ class FutureListTests(unittest.TestCase):
f3 = FutureStub(cancelled=False, done=True)
fs = [f1, f2, f3]
- f = threaded_futures.FutureList(fs, None)
+ f = futures.FutureList(fs, None)
self.assertEqual(list(iter(f)), fs)
def test_contains(self):
@@ -365,7 +355,7 @@ class FutureListTests(unittest.TestCase):
f2 = FutureStub(cancelled=False, done=True)
f3 = FutureStub(cancelled=False, done=True)
- f = threaded_futures.FutureList([f1, f2], None)
+ f = futures.FutureList([f1, f2], None)
self.assertTrue(f1 in f)
self.assertTrue(f2 in f)
self.assertFalse(f3 in f)
@@ -376,7 +366,7 @@ class FutureListTests(unittest.TestCase):
exception = FutureStub(cancelled=False, done=True, exception=IOError())
cancelled = FutureStub(cancelled=True, done=True)
- f = threaded_futures.FutureList(
+ f = futures.FutureList(
[running] * 4 + [result] * 3 + [exception] * 2 + [cancelled],
None)
@@ -385,7 +375,8 @@ class FutureListTests(unittest.TestCase):
'[#success=3 #exception=2 #cancelled=1]>')
def test_main():
test.support.run_unittest(CancelTests,
- ConcurrentWaitsTest,
+# ProcessPoolWaitTests,
+ ThreadPoolWaitTests,
FutureListTests,
ShutdownTest)