summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorbrian quinlan <brian.quinlan@gmail.com>2009-05-23 23:39:33 +0000
committerbrian quinlan <brian.quinlan@gmail.com>2009-05-23 23:39:33 +0000
commitefc958fb0e68add5a512a633aea88276e91f5eab (patch)
treedf07d6fd3983185459e697c817a74748c165a956
parent8733734ee058cc9bc64130cb1c8be1b90e5ee2eb (diff)
downloadfutures-efc958fb0e68add5a512a633aea88276e91f5eab.tar.gz
'Working' 2.x implementation
-rw-r--r--python2/crawl.py8
-rw-r--r--python2/futures/__init__.py8
-rw-r--r--python2/futures/_base.py73
-rw-r--r--python2/futures/process.py35
-rw-r--r--python2/futures/thread.py34
-rw-r--r--python2/primes.py17
-rw-r--r--python2/test_futures.py32
-rw-r--r--python3/futures/process.py2
-rw-r--r--python3/futures/thread.py1
-rw-r--r--python3/test_futures.py2
10 files changed, 139 insertions, 73 deletions
diff --git a/python2/crawl.py b/python2/crawl.py
index 10e35c3..a597c76 100644
--- a/python2/crawl.py
+++ b/python2/crawl.py
@@ -3,7 +3,7 @@ import functools
import futures.thread
import time
import timeit
-import urllib.request
+import urllib2
URLS = ['http://www.google.com/',
'http://www.apple.com/',
@@ -14,7 +14,7 @@ URLS = ['http://www.google.com/',
'http://www.sweetapp.com/'] * 5
def load_url(url, timeout):
- return urllib.request.urlopen(url, timeout=timeout).read()
+ return urllib2.urlopen(url, timeout=timeout).read()
def download_urls_sequential(urls, timeout=60):
url_to_content = {}
@@ -49,9 +49,9 @@ def main():
functools.partial(download_urls_with_executor,
URLS,
futures.ThreadPoolExecutor(10)))]:
- print('%s: ' % name.ljust(12), end='')
+ print '%s: ' % name.ljust(12),
start = time.time()
fn()
- print('%.2f seconds' % (time.time() - start))
+ print '%.2f seconds' % (time.time() - start)
main()
diff --git a/python2/futures/__init__.py b/python2/futures/__init__.py
index 5f599ad..22f10db 100644
--- a/python2/futures/__init__.py
+++ b/python2/futures/__init__.py
@@ -3,4 +3,10 @@ from futures._base import (FIRST_COMPLETED, FIRST_EXCEPTION,
CancelledError, TimeoutError,
Future, FutureList)
from futures.thread import ThreadPoolExecutor
-from futures.process import ProcessPoolExecutor
+
+try:
+ import multiprocessing
+except ImportError:
+ pass
+else:
+ from futures.process import ProcessPoolExecutor
diff --git a/python2/futures/_base.py b/python2/futures/_base.py
index 19cabe8..95c4004 100644
--- a/python2/futures/_base.py
+++ b/python2/futures/_base.py
@@ -37,20 +37,34 @@ LOGGER.addHandler(_handler)
del _handler
def set_future_exception(future, event_sink, exception):
- with future._condition:
+ future._condition.acquire()
+ try:
future._exception = exception
- with event_sink._condition:
+ event_sink._condition.acquire()
+ try:
future._state = FINISHED
event_sink.add_exception()
- future._condition.notify_all()
+ finally:
+ event_sink._condition.release()
+
+ future._condition.notifyAll()
+ finally:
+ future._condition.release()
def set_future_result(future, event_sink, result):
- with future._condition:
+ future._condition.acquire()
+ try:
future._result = result
- with event_sink._condition:
+ event_sink._condition.acquire()
+ try:
future._state = FINISHED
event_sink.add_result()
- future._condition.notify_all()
+ finally:
+ event_sink._condition.release()
+
+ future._condition.notifyAll()
+ finally:
+ future._condition.release()
class Error(Exception):
pass
@@ -73,7 +87,8 @@ class Future(object):
self._index = index
def __repr__(self):
- with self._condition:
+ self._condition.acquire()
+ try:
if self._state == FINISHED:
if self._exception:
return '<Future state=%s raised %s>' % (
@@ -84,6 +99,8 @@ class Future(object):
_STATE_TO_DESCRIPTION_MAP[self._state],
self._result.__class__.__name__)
return '<Future state=%s>' % _STATE_TO_DESCRIPTION_MAP[self._state]
+ finally:
+ self._condition.release()
@property
def index(self):
@@ -96,7 +113,8 @@ class Future(object):
Returns True if the future was cancelled, False otherwise. A future
cannot be cancelled if it is running or has already completed.
"""
- with self._condition:
+ self._condition.acquire()
+ try:
if self._state in [RUNNING, FINISHED]:
return False
@@ -104,20 +122,31 @@ class Future(object):
self._state = CANCELLED
self._condition.notify_all()
return True
+ finally:
+ self._condition.release()
def cancelled(self):
"""Return True if the future has cancelled."""
- with self._condition:
+ self._condition.acquire()
+ try:
return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]
+ finally:
+ self._condition.release()
def running(self):
- with self._condition:
+ self._condition.acquire()
+ try:
return self._state == RUNNING
+ finally:
+ self._condition.release()
def done(self):
"""Return True of the future was cancelled or finished executing."""
- with self._condition:
+ self._condition.acquire()
+ try:
return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]
+ finally:
+ self._condition.release()
def __get_result(self):
if self._exception:
@@ -141,7 +170,8 @@ class Future(object):
timeout.
Exception: If the call raised then that exception will be raised.
"""
- with self._condition:
+ self._condition.acquire()
+ try:
if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
raise CancelledError()
elif self._state == FINISHED:
@@ -155,6 +185,8 @@ class Future(object):
return self.__get_result()
else:
raise TimeoutError()
+ finally:
+ self._condition.release()
def exception(self, timeout=None):
"""Return the exception raised by the call that the future represents.
@@ -174,7 +206,8 @@ class Future(object):
timeout.
"""
- with self._condition:
+ self._condition.acquire()
+ try:
if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
raise CancelledError()
elif self._state == FINISHED:
@@ -188,6 +221,8 @@ class Future(object):
return self._exception
else:
raise TimeoutError()
+ finally:
+ self._condition.release()
class _FirstCompletedWaitTracker(object):
def __init__(self):
@@ -278,7 +313,8 @@ class FutureList(object):
if return_when == RETURN_IMMEDIATELY:
return
- with self._event_sink._condition:
+ self._event_sink._condition.acquire()
+ try:
# Make a quick exit if every future is already done. This check is
# necessary because, if every future is in the
# CANCELLED_AND_NOTIFIED or FINISHED state then the WaitTracker will
@@ -306,6 +342,8 @@ class FutureList(object):
pending_count, stop_on_exception=False)
self._event_sink.add(completed_tracker)
+ finally:
+ self._event_sink._condition.release()
try:
completed_tracker.event.wait(timeout)
@@ -376,7 +414,7 @@ class FutureList(object):
return future in self._futures
def __repr__(self):
- states = {state: 0 for state in FUTURE_STATES}
+ states = dict([(state, 0) for state in FUTURE_STATES])
for f in self:
states[f._state] += 1
@@ -449,7 +487,7 @@ class Executor(object):
except TimeoutError:
pass
- def map(self, func, *iterables, timeout=None):
+ def map(self, func, *iterables, **kwargs):
"""Returns a iterator equivalent to map(fn, iter).
Args:
@@ -467,8 +505,9 @@ class Executor(object):
before the given timeout.
Exception: If fn(*args) raises for any values.
"""
+ timeout = kwargs.get('timeout') or None
calls = [functools.partial(func, *args) for args in zip(*iterables)]
- return self.run_to_results(calls, timeout)
+ return self.run_to_results(calls, timeout=timeout)
def shutdown(self):
"""Clean-up. No other methods can be called afterwards."""
diff --git a/python2/futures/process.py b/python2/futures/process.py
index ad6dc6d..044a80a 100644
--- a/python2/futures/process.py
+++ b/python2/futures/process.py
@@ -6,7 +6,7 @@ from futures._base import (PENDING, RUNNING, CANCELLED,
set_future_exception, set_future_result,
Executor, Future, FutureList, ThreadEventSink)
-import queue
+import Queue
import multiprocessing
import threading
@@ -31,13 +31,13 @@ def _process_worker(call_queue, result_queue, shutdown):
while True:
try:
call_item = call_queue.get(block=True, timeout=0.1)
- except queue.Empty:
+ except Queue.Empty:
if shutdown.is_set():
return
else:
try:
r = call_item.call()
- except BaseException as e:
+ except BaseException, e:
result_queue.put(_ResultItem(call_item.work_id,
exception=e))
else:
@@ -55,7 +55,7 @@ class ProcessPoolExecutor(Executor):
# responsive.
self._call_queue = multiprocessing.Queue(self._max_processes + 1)
self._result_queue = multiprocessing.Queue()
- self._work_ids = queue.Queue()
+ self._work_ids = Queue.Queue()
self._queue_management_thread = None
self._processes = set()
@@ -70,19 +70,22 @@ class ProcessPoolExecutor(Executor):
while True:
try:
work_id = self._work_ids.get(block=False)
- except queue.Empty:
+ except Queue.Empty:
return
else:
work_item = self._pending_work_items[work_id]
if work_item.future.cancelled():
- with work_item.future._condition:
- work_item.future._condition.notify_all()
+ work_item.future._condition.acquire()
+ work_item.future._condition.notify_all()
+ work_item.future._condition.release()
+
work_item.completion_tracker.add_cancelled()
continue
else:
- with work_item.future._condition:
- work_item.future._state = RUNNING
+ work_item.future._condition.acquire()
+ work_item.future._state = RUNNING
+ work_item.future._condition.release()
self._call_queue.put(_CallItem(work_id, work_item.call),
block=True)
@@ -95,7 +98,7 @@ class ProcessPoolExecutor(Executor):
try:
result_item = self._result_queue.get(block=True,
timeout=0.1)
- except queue.Empty:
+ except Queue.Empty:
if self._shutdown_thread and not self._pending_work_items:
self._shutdown_process_event.set()
return
@@ -116,7 +119,6 @@ class ProcessPoolExecutor(Executor):
if self._queue_management_thread is None:
self._queue_management_thread = threading.Thread(
target=self._result)
- self._queue_management_thread.daemon = True
self._queue_management_thread.start()
for _ in range(len(self._processes), self._max_processes):
@@ -125,12 +127,12 @@ class ProcessPoolExecutor(Executor):
args=(self._call_queue,
self._result_queue,
self._shutdown_process_event))
- p.daemon = True
p.start()
self._processes.add(p)
def run_to_futures(self, calls, timeout=None, return_when=ALL_COMPLETED):
- with self._shutdown_lock:
+ self._shutdown_lock.acquire()
+ try:
if self._shutdown_thread:
raise RuntimeError('cannot run new futures after shutdown')
@@ -149,7 +151,12 @@ class ProcessPoolExecutor(Executor):
fl = FutureList(futures, event_sink)
fl.wait(timeout=timeout, return_when=return_when)
return fl
+ finally:
+ self._shutdown_lock.release()
def shutdown(self):
- with self._shutdown_lock:
+ self._shutdown_lock.acquire()
+ try:
self._shutdown_thread = True
+ finally:
+ self._shutdown_lock.release()
diff --git a/python2/futures/thread.py b/python2/futures/thread.py
index 9e3275f..0ffe798 100644
--- a/python2/futures/thread.py
+++ b/python2/futures/thread.py
@@ -6,7 +6,7 @@ from futures._base import (PENDING, RUNNING, CANCELLED,
LOGGER,
set_future_exception, set_future_result,
Executor, Future, FutureList, ThreadEventSink)
-import queue
+import Queue
import threading
class _WorkItem(object):
@@ -16,23 +16,29 @@ class _WorkItem(object):
self.completion_tracker = completion_tracker
def run(self):
- with self.future._condition:
+ self.future._condition.acquire()
+ try:
if self.future._state == PENDING:
self.future._state = RUNNING
elif self.future._state == CANCELLED:
- with self.completion_tracker._condition:
+ self.completion_tracker._condition.acquire()
+ try:
self.future._state = CANCELLED_AND_NOTIFIED
self.completion_tracker.add_cancelled()
- return
+ return
+ finally:
+ self.completion_tracker._condition.release()
else:
LOGGER.critical('Future %s in unexpected state: %d',
id(self.future),
self.future._state)
return
+ finally:
+ self.future._condition.release()
try:
result = self.call()
- except BaseException as e:
+ except BaseException, e:
set_future_exception(self.future, self.completion_tracker, e)
else:
set_future_result(self.future, self.completion_tracker, result)
@@ -40,34 +46,35 @@ class _WorkItem(object):
class ThreadPoolExecutor(Executor):
def __init__(self, max_threads):
self._max_threads = max_threads
- self._work_queue = queue.Queue()
+ self._work_queue = Queue.Queue()
self._threads = set()
self._shutdown = False
self._shutdown_lock = threading.Lock()
def _worker(self):
+ empty = Queue.Empty
try:
while True:
try:
work_item = self._work_queue.get(block=True, timeout=0.1)
- except queue.Empty:
+ except empty:
if self._shutdown:
return
else:
work_item.run()
- except BaseException as e:
+ except BaseException, e:
LOGGER.critical('Exception in worker', exc_info=True)
def _adjust_thread_count(self):
for _ in range(len(self._threads),
min(self._max_threads, self._work_queue.qsize())):
t = threading.Thread(target=self._worker)
- t.daemon = True
t.start()
self._threads.add(t)
def run_to_futures(self, calls, timeout=None, return_when=ALL_COMPLETED):
- with self._shutdown_lock:
+ self._shutdown_lock.acquire()
+ try:
if self._shutdown:
raise RuntimeError('cannot run new futures after shutdown')
@@ -83,7 +90,12 @@ class ThreadPoolExecutor(Executor):
fl = FutureList(futures, event_sink)
fl.wait(timeout=timeout, return_when=return_when)
return fl
+ finally:
+ self._shutdown_lock.release()
def shutdown(self):
- with self._shutdown_lock:
+ self._shutdown_lock.acquire()
+ try:
self._shutdown = True
+ finally:
+ self._shutdown_lock.release()
diff --git a/python2/primes.py b/python2/primes.py
index 7e83ea0..ac684d6 100644
--- a/python2/primes.py
+++ b/python2/primes.py
@@ -23,22 +23,29 @@ def sequential():
return list(map(is_prime, PRIMES))
def with_process_pool_executor():
- with futures.ProcessPoolExecutor(10) as executor:
+ executor = futures.ProcessPoolExecutor(10)
+ try:
return list(executor.map(is_prime, PRIMES))
+ finally:
+ executor.shutdown()
def with_thread_pool_executor():
- with futures.ThreadPoolExecutor(10) as executor:
+ executor = futures.ThreadPoolExecutor(10)
+ try:
return list(executor.map(is_prime, PRIMES))
+ finally:
+ executor.shutdown()
def main():
for name, fn in [('sequential', sequential),
('processes', with_process_pool_executor),
('threads', with_thread_pool_executor)]:
- print('%s: ' % name.ljust(12), end='')
+ print '%s: ' % name.ljust(12),
+
start = time.time()
if fn() != [True] * len(PRIMES):
- print('failed')
+ print 'failed'
else:
- print('%.2f seconds' % (time.time() - start))
+ print '%.2f seconds' % (time.time() - start)
main() \ No newline at end of file
diff --git a/python2/test_futures.py b/python2/test_futures.py
index 7a4425a..225b440 100644
--- a/python2/test_futures.py
+++ b/python2/test_futures.py
@@ -1,10 +1,8 @@
-import test.support
-from test.support import verbose
-
import unittest
import threading
import time
import multiprocessing
+from test import test_support
import futures
import futures._base
@@ -168,16 +166,15 @@ class WaitsTest(unittest.TestCase):
def wait_for_FIRST_COMPLETED():
fs.wait(return_when=futures.FIRST_COMPLETED)
self.assertTrue(f1.done())
- self.assertFalse(f2.done()) # XXX
+ self.assertFalse(f2.done())
self.assertFalse(f3.done())
self.assertFalse(f4.done())
first_completed.release()
-
def wait_for_FIRST_EXCEPTION():
fs.wait(return_when=futures.FIRST_EXCEPTION)
self.assertTrue(f1.done())
self.assertTrue(f2.done())
- self.assertFalse(f3.done()) # XXX
+ self.assertFalse(f3.done())
self.assertFalse(f4.done())
first_exception.release()
@@ -205,7 +202,7 @@ class WaitsTest(unittest.TestCase):
threads.append(t)
time.sleep(1) # give threads enough time to execute wait
-
+
call1.set_can()
first_completed.acquire()
call2.set_can()
@@ -412,6 +409,7 @@ class ExecutorTest(unittest.TestCase):
self.assertFalse(f5.done())
self.assertEqual(f5.index, 4)
finally:
+ call3.set_can() # Let the call finish executing.
call1.close()
call2.close()
call3.close()
@@ -438,9 +436,9 @@ class ExecutorTest(unittest.TestCase):
try:
i = self.executor.run_to_results([call1, call2, call3])
- self.assertEqual(i.__next__(), 1)
- self.assertEqual(i.__next__(), 2)
- self.assertRaises(ZeroDivisionError, i.__next__)
+ self.assertEqual(i.next(), 1)
+ self.assertEqual(i.next(), 2)
+ self.assertRaises(ZeroDivisionError, i.next)
finally:
call1.close()
call2.close()
@@ -453,9 +451,9 @@ class ExecutorTest(unittest.TestCase):
try:
i = self.executor.run_to_results([call1, call2, call3], timeout=1)
- self.assertEqual(i.__next__(), 1)
- self.assertEqual(i.__next__(), 2)
- self.assertRaises(futures.TimeoutError, i.__next__)
+ self.assertEqual(i.next(), 1)
+ self.assertEqual(i.next(), 2)
+ self.assertRaises(futures.TimeoutError, i.next)
call3.set_can()
finally:
call1.close()
@@ -469,9 +467,9 @@ class ExecutorTest(unittest.TestCase):
def test_map_exception(self):
i = self.executor.map(divmod, [1, 1, 1, 1], [2, 3, 0, 5])
- self.assertEqual(i.__next__(), (0, 1))
- self.assertEqual(i.__next__(), (0, 1))
- self.assertRaises(ZeroDivisionError, i.__next__)
+ self.assertEqual(i.next(), (0, 1))
+ self.assertEqual(i.next(), (0, 1))
+ self.assertRaises(ZeroDivisionError, i.next)
class ThreadPoolExecutorTest(ExecutorTest):
def setUp(self):
@@ -793,7 +791,7 @@ class FutureListTests(unittest.TestCase):
'[#pending=4 #cancelled=3 #running=2 #finished=6]>')
def test_main():
- test.support.run_unittest(ProcessPoolCancelTests,
+ test_support.run_unittest(ProcessPoolCancelTests,
ThreadPoolCancelTests,
ProcessPoolExecutorTest,
ThreadPoolExecutorTest,
diff --git a/python3/futures/process.py b/python3/futures/process.py
index ad6dc6d..5c1eb4b 100644
--- a/python3/futures/process.py
+++ b/python3/futures/process.py
@@ -116,7 +116,6 @@ class ProcessPoolExecutor(Executor):
if self._queue_management_thread is None:
self._queue_management_thread = threading.Thread(
target=self._result)
- self._queue_management_thread.daemon = True
self._queue_management_thread.start()
for _ in range(len(self._processes), self._max_processes):
@@ -125,7 +124,6 @@ class ProcessPoolExecutor(Executor):
args=(self._call_queue,
self._result_queue,
self._shutdown_process_event))
- p.daemon = True
p.start()
self._processes.add(p)
diff --git a/python3/futures/thread.py b/python3/futures/thread.py
index 9e3275f..18b8c07 100644
--- a/python3/futures/thread.py
+++ b/python3/futures/thread.py
@@ -62,7 +62,6 @@ class ThreadPoolExecutor(Executor):
for _ in range(len(self._threads),
min(self._max_threads, self._work_queue.qsize())):
t = threading.Thread(target=self._worker)
- t.daemon = True
t.start()
self._threads.add(t)
diff --git a/python3/test_futures.py b/python3/test_futures.py
index 7a4425a..102f748 100644
--- a/python3/test_futures.py
+++ b/python3/test_futures.py
@@ -1,5 +1,4 @@
import test.support
-from test.support import verbose
import unittest
import threading
@@ -412,6 +411,7 @@ class ExecutorTest(unittest.TestCase):
self.assertFalse(f5.done())
self.assertEqual(f5.index, 4)
finally:
+ call3.set_can() # Let the call finish executing.
call1.close()
call2.close()
call3.close()