diff options
author | brian quinlan <brian.quinlan@gmail.com> | 2009-05-23 23:39:33 +0000 |
---|---|---|
committer | brian quinlan <brian.quinlan@gmail.com> | 2009-05-23 23:39:33 +0000 |
commit | efc958fb0e68add5a512a633aea88276e91f5eab (patch) | |
tree | df07d6fd3983185459e697c817a74748c165a956 | |
parent | 8733734ee058cc9bc64130cb1c8be1b90e5ee2eb (diff) | |
download | futures-efc958fb0e68add5a512a633aea88276e91f5eab.tar.gz |
'Working' 2.x implementation
-rw-r--r-- | python2/crawl.py | 8 | ||||
-rw-r--r-- | python2/futures/__init__.py | 8 | ||||
-rw-r--r-- | python2/futures/_base.py | 73 | ||||
-rw-r--r-- | python2/futures/process.py | 35 | ||||
-rw-r--r-- | python2/futures/thread.py | 34 | ||||
-rw-r--r-- | python2/primes.py | 17 | ||||
-rw-r--r-- | python2/test_futures.py | 32 | ||||
-rw-r--r-- | python3/futures/process.py | 2 | ||||
-rw-r--r-- | python3/futures/thread.py | 1 | ||||
-rw-r--r-- | python3/test_futures.py | 2 |
10 files changed, 139 insertions, 73 deletions
diff --git a/python2/crawl.py b/python2/crawl.py index 10e35c3..a597c76 100644 --- a/python2/crawl.py +++ b/python2/crawl.py @@ -3,7 +3,7 @@ import functools import futures.thread import time import timeit -import urllib.request +import urllib2 URLS = ['http://www.google.com/', 'http://www.apple.com/', @@ -14,7 +14,7 @@ URLS = ['http://www.google.com/', 'http://www.sweetapp.com/'] * 5 def load_url(url, timeout): - return urllib.request.urlopen(url, timeout=timeout).read() + return urllib2.urlopen(url, timeout=timeout).read() def download_urls_sequential(urls, timeout=60): url_to_content = {} @@ -49,9 +49,9 @@ def main(): functools.partial(download_urls_with_executor, URLS, futures.ThreadPoolExecutor(10)))]: - print('%s: ' % name.ljust(12), end='') + print '%s: ' % name.ljust(12), start = time.time() fn() - print('%.2f seconds' % (time.time() - start)) + print '%.2f seconds' % (time.time() - start) main() diff --git a/python2/futures/__init__.py b/python2/futures/__init__.py index 5f599ad..22f10db 100644 --- a/python2/futures/__init__.py +++ b/python2/futures/__init__.py @@ -3,4 +3,10 @@ from futures._base import (FIRST_COMPLETED, FIRST_EXCEPTION, CancelledError, TimeoutError, Future, FutureList) from futures.thread import ThreadPoolExecutor -from futures.process import ProcessPoolExecutor + +try: + import multiprocessing +except ImportError: + pass +else: + from futures.process import ProcessPoolExecutor diff --git a/python2/futures/_base.py b/python2/futures/_base.py index 19cabe8..95c4004 100644 --- a/python2/futures/_base.py +++ b/python2/futures/_base.py @@ -37,20 +37,34 @@ LOGGER.addHandler(_handler) del _handler def set_future_exception(future, event_sink, exception): - with future._condition: + future._condition.acquire() + try: future._exception = exception - with event_sink._condition: + event_sink._condition.acquire() + try: future._state = FINISHED event_sink.add_exception() - future._condition.notify_all() + finally: + event_sink._condition.release() + + future._condition.notifyAll() + finally: + future._condition.release() def set_future_result(future, event_sink, result): - with future._condition: + future._condition.acquire() + try: future._result = result - with event_sink._condition: + event_sink._condition.acquire() + try: future._state = FINISHED event_sink.add_result() - future._condition.notify_all() + finally: + event_sink._condition.release() + + future._condition.notifyAll() + finally: + future._condition.release() class Error(Exception): pass @@ -73,7 +87,8 @@ class Future(object): self._index = index def __repr__(self): - with self._condition: + self._condition.acquire() + try: if self._state == FINISHED: if self._exception: return '<Future state=%s raised %s>' % ( @@ -84,6 +99,8 @@ class Future(object): _STATE_TO_DESCRIPTION_MAP[self._state], self._result.__class__.__name__) return '<Future state=%s>' % _STATE_TO_DESCRIPTION_MAP[self._state] + finally: + self._condition.release() @property def index(self): @@ -96,7 +113,8 @@ class Future(object): Returns True if the future was cancelled, False otherwise. A future cannot be cancelled if it is running or has already completed. """ - with self._condition: + self._condition.acquire() + try: if self._state in [RUNNING, FINISHED]: return False @@ -104,20 +122,31 @@ class Future(object): self._state = CANCELLED self._condition.notify_all() return True + finally: + self._condition.release() def cancelled(self): """Return True if the future has cancelled.""" - with self._condition: + self._condition.acquire() + try: return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED] + finally: + self._condition.release() def running(self): - with self._condition: + self._condition.acquire() + try: return self._state == RUNNING + finally: + self._condition.release() def done(self): """Return True of the future was cancelled or finished executing.""" - with self._condition: + self._condition.acquire() + try: return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED] + finally: + self._condition.release() def __get_result(self): if self._exception: @@ -141,7 +170,8 @@ class Future(object): timeout. Exception: If the call raised then that exception will be raised. """ - with self._condition: + self._condition.acquire() + try: if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: raise CancelledError() elif self._state == FINISHED: @@ -155,6 +185,8 @@ class Future(object): return self.__get_result() else: raise TimeoutError() + finally: + self._condition.release() def exception(self, timeout=None): """Return the exception raised by the call that the future represents. @@ -174,7 +206,8 @@ class Future(object): timeout. """ - with self._condition: + self._condition.acquire() + try: if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: raise CancelledError() elif self._state == FINISHED: @@ -188,6 +221,8 @@ class Future(object): return self._exception else: raise TimeoutError() + finally: + self._condition.release() class _FirstCompletedWaitTracker(object): def __init__(self): @@ -278,7 +313,8 @@ class FutureList(object): if return_when == RETURN_IMMEDIATELY: return - with self._event_sink._condition: + self._event_sink._condition.acquire() + try: # Make a quick exit if every future is already done. This check is # necessary because, if every future is in the # CANCELLED_AND_NOTIFIED or FINISHED state then the WaitTracker will @@ -306,6 +342,8 @@ class FutureList(object): pending_count, stop_on_exception=False) self._event_sink.add(completed_tracker) + finally: + self._event_sink._condition.release() try: completed_tracker.event.wait(timeout) @@ -376,7 +414,7 @@ class FutureList(object): return future in self._futures def __repr__(self): - states = {state: 0 for state in FUTURE_STATES} + states = dict([(state, 0) for state in FUTURE_STATES]) for f in self: states[f._state] += 1 @@ -449,7 +487,7 @@ class Executor(object): except TimeoutError: pass - def map(self, func, *iterables, timeout=None): + def map(self, func, *iterables, **kwargs): """Returns a iterator equivalent to map(fn, iter). Args: @@ -467,8 +505,9 @@ class Executor(object): before the given timeout. Exception: If fn(*args) raises for any values. """ + timeout = kwargs.get('timeout') or None calls = [functools.partial(func, *args) for args in zip(*iterables)] - return self.run_to_results(calls, timeout) + return self.run_to_results(calls, timeout=timeout) def shutdown(self): """Clean-up. No other methods can be called afterwards.""" diff --git a/python2/futures/process.py b/python2/futures/process.py index ad6dc6d..044a80a 100644 --- a/python2/futures/process.py +++ b/python2/futures/process.py @@ -6,7 +6,7 @@ from futures._base import (PENDING, RUNNING, CANCELLED, set_future_exception, set_future_result, Executor, Future, FutureList, ThreadEventSink) -import queue +import Queue import multiprocessing import threading @@ -31,13 +31,13 @@ def _process_worker(call_queue, result_queue, shutdown): while True: try: call_item = call_queue.get(block=True, timeout=0.1) - except queue.Empty: + except Queue.Empty: if shutdown.is_set(): return else: try: r = call_item.call() - except BaseException as e: + except BaseException, e: result_queue.put(_ResultItem(call_item.work_id, exception=e)) else: @@ -55,7 +55,7 @@ class ProcessPoolExecutor(Executor): # responsive. self._call_queue = multiprocessing.Queue(self._max_processes + 1) self._result_queue = multiprocessing.Queue() - self._work_ids = queue.Queue() + self._work_ids = Queue.Queue() self._queue_management_thread = None self._processes = set() @@ -70,19 +70,22 @@ class ProcessPoolExecutor(Executor): while True: try: work_id = self._work_ids.get(block=False) - except queue.Empty: + except Queue.Empty: return else: work_item = self._pending_work_items[work_id] if work_item.future.cancelled(): - with work_item.future._condition: - work_item.future._condition.notify_all() + work_item.future._condition.acquire() + work_item.future._condition.notify_all() + work_item.future._condition.release() + work_item.completion_tracker.add_cancelled() continue else: - with work_item.future._condition: - work_item.future._state = RUNNING + work_item.future._condition.acquire() + work_item.future._state = RUNNING + work_item.future._condition.release() self._call_queue.put(_CallItem(work_id, work_item.call), block=True) @@ -95,7 +98,7 @@ class ProcessPoolExecutor(Executor): try: result_item = self._result_queue.get(block=True, timeout=0.1) - except queue.Empty: + except Queue.Empty: if self._shutdown_thread and not self._pending_work_items: self._shutdown_process_event.set() return @@ -116,7 +119,6 @@ class ProcessPoolExecutor(Executor): if self._queue_management_thread is None: self._queue_management_thread = threading.Thread( target=self._result) - self._queue_management_thread.daemon = True self._queue_management_thread.start() for _ in range(len(self._processes), self._max_processes): @@ -125,12 +127,12 @@ class ProcessPoolExecutor(Executor): args=(self._call_queue, self._result_queue, self._shutdown_process_event)) - p.daemon = True p.start() self._processes.add(p) def run_to_futures(self, calls, timeout=None, return_when=ALL_COMPLETED): - with self._shutdown_lock: + self._shutdown_lock.acquire() + try: if self._shutdown_thread: raise RuntimeError('cannot run new futures after shutdown') @@ -149,7 +151,12 @@ class ProcessPoolExecutor(Executor): fl = FutureList(futures, event_sink) fl.wait(timeout=timeout, return_when=return_when) return fl + finally: + self._shutdown_lock.release() def shutdown(self): - with self._shutdown_lock: + self._shutdown_lock.acquire() + try: self._shutdown_thread = True + finally: + self._shutdown_lock.release() diff --git a/python2/futures/thread.py b/python2/futures/thread.py index 9e3275f..0ffe798 100644 --- a/python2/futures/thread.py +++ b/python2/futures/thread.py @@ -6,7 +6,7 @@ from futures._base import (PENDING, RUNNING, CANCELLED, LOGGER, set_future_exception, set_future_result, Executor, Future, FutureList, ThreadEventSink) -import queue +import Queue import threading class _WorkItem(object): @@ -16,23 +16,29 @@ class _WorkItem(object): self.completion_tracker = completion_tracker def run(self): - with self.future._condition: + self.future._condition.acquire() + try: if self.future._state == PENDING: self.future._state = RUNNING elif self.future._state == CANCELLED: - with self.completion_tracker._condition: + self.completion_tracker._condition.acquire() + try: self.future._state = CANCELLED_AND_NOTIFIED self.completion_tracker.add_cancelled() - return + return + finally: + self.completion_tracker._condition.release() else: LOGGER.critical('Future %s in unexpected state: %d', id(self.future), self.future._state) return + finally: + self.future._condition.release() try: result = self.call() - except BaseException as e: + except BaseException, e: set_future_exception(self.future, self.completion_tracker, e) else: set_future_result(self.future, self.completion_tracker, result) @@ -40,34 +46,35 @@ class _WorkItem(object): class ThreadPoolExecutor(Executor): def __init__(self, max_threads): self._max_threads = max_threads - self._work_queue = queue.Queue() + self._work_queue = Queue.Queue() self._threads = set() self._shutdown = False self._shutdown_lock = threading.Lock() def _worker(self): + empty = Queue.Empty try: while True: try: work_item = self._work_queue.get(block=True, timeout=0.1) - except queue.Empty: + except empty: if self._shutdown: return else: work_item.run() - except BaseException as e: + except BaseException, e: LOGGER.critical('Exception in worker', exc_info=True) def _adjust_thread_count(self): for _ in range(len(self._threads), min(self._max_threads, self._work_queue.qsize())): t = threading.Thread(target=self._worker) - t.daemon = True t.start() self._threads.add(t) def run_to_futures(self, calls, timeout=None, return_when=ALL_COMPLETED): - with self._shutdown_lock: + self._shutdown_lock.acquire() + try: if self._shutdown: raise RuntimeError('cannot run new futures after shutdown') @@ -83,7 +90,12 @@ class ThreadPoolExecutor(Executor): fl = FutureList(futures, event_sink) fl.wait(timeout=timeout, return_when=return_when) return fl + finally: + self._shutdown_lock.release() def shutdown(self): - with self._shutdown_lock: + self._shutdown_lock.acquire() + try: self._shutdown = True + finally: + self._shutdown_lock.release() diff --git a/python2/primes.py b/python2/primes.py index 7e83ea0..ac684d6 100644 --- a/python2/primes.py +++ b/python2/primes.py @@ -23,22 +23,29 @@ def sequential(): return list(map(is_prime, PRIMES)) def with_process_pool_executor(): - with futures.ProcessPoolExecutor(10) as executor: + executor = futures.ProcessPoolExecutor(10) + try: return list(executor.map(is_prime, PRIMES)) + finally: + executor.shutdown() def with_thread_pool_executor(): - with futures.ThreadPoolExecutor(10) as executor: + executor = futures.ThreadPoolExecutor(10) + try: return list(executor.map(is_prime, PRIMES)) + finally: + executor.shutdown() def main(): for name, fn in [('sequential', sequential), ('processes', with_process_pool_executor), ('threads', with_thread_pool_executor)]: - print('%s: ' % name.ljust(12), end='') + print '%s: ' % name.ljust(12), + start = time.time() if fn() != [True] * len(PRIMES): - print('failed') + print 'failed' else: - print('%.2f seconds' % (time.time() - start)) + print '%.2f seconds' % (time.time() - start) main()
\ No newline at end of file diff --git a/python2/test_futures.py b/python2/test_futures.py index 7a4425a..225b440 100644 --- a/python2/test_futures.py +++ b/python2/test_futures.py @@ -1,10 +1,8 @@ -import test.support -from test.support import verbose - import unittest import threading import time import multiprocessing +from test import test_support import futures import futures._base @@ -168,16 +166,15 @@ class WaitsTest(unittest.TestCase): def wait_for_FIRST_COMPLETED(): fs.wait(return_when=futures.FIRST_COMPLETED) self.assertTrue(f1.done()) - self.assertFalse(f2.done()) # XXX + self.assertFalse(f2.done()) self.assertFalse(f3.done()) self.assertFalse(f4.done()) first_completed.release() - def wait_for_FIRST_EXCEPTION(): fs.wait(return_when=futures.FIRST_EXCEPTION) self.assertTrue(f1.done()) self.assertTrue(f2.done()) - self.assertFalse(f3.done()) # XXX + self.assertFalse(f3.done()) self.assertFalse(f4.done()) first_exception.release() @@ -205,7 +202,7 @@ class WaitsTest(unittest.TestCase): threads.append(t) time.sleep(1) # give threads enough time to execute wait - + call1.set_can() first_completed.acquire() call2.set_can() @@ -412,6 +409,7 @@ class ExecutorTest(unittest.TestCase): self.assertFalse(f5.done()) self.assertEqual(f5.index, 4) finally: + call3.set_can() # Let the call finish executing. call1.close() call2.close() call3.close() @@ -438,9 +436,9 @@ class ExecutorTest(unittest.TestCase): try: i = self.executor.run_to_results([call1, call2, call3]) - self.assertEqual(i.__next__(), 1) - self.assertEqual(i.__next__(), 2) - self.assertRaises(ZeroDivisionError, i.__next__) + self.assertEqual(i.next(), 1) + self.assertEqual(i.next(), 2) + self.assertRaises(ZeroDivisionError, i.next) finally: call1.close() call2.close() @@ -453,9 +451,9 @@ class ExecutorTest(unittest.TestCase): try: i = self.executor.run_to_results([call1, call2, call3], timeout=1) - self.assertEqual(i.__next__(), 1) - self.assertEqual(i.__next__(), 2) - self.assertRaises(futures.TimeoutError, i.__next__) + self.assertEqual(i.next(), 1) + self.assertEqual(i.next(), 2) + self.assertRaises(futures.TimeoutError, i.next) call3.set_can() finally: call1.close() @@ -469,9 +467,9 @@ class ExecutorTest(unittest.TestCase): def test_map_exception(self): i = self.executor.map(divmod, [1, 1, 1, 1], [2, 3, 0, 5]) - self.assertEqual(i.__next__(), (0, 1)) - self.assertEqual(i.__next__(), (0, 1)) - self.assertRaises(ZeroDivisionError, i.__next__) + self.assertEqual(i.next(), (0, 1)) + self.assertEqual(i.next(), (0, 1)) + self.assertRaises(ZeroDivisionError, i.next) class ThreadPoolExecutorTest(ExecutorTest): def setUp(self): @@ -793,7 +791,7 @@ class FutureListTests(unittest.TestCase): '[#pending=4 #cancelled=3 #running=2 #finished=6]>') def test_main(): - test.support.run_unittest(ProcessPoolCancelTests, + test_support.run_unittest(ProcessPoolCancelTests, ThreadPoolCancelTests, ProcessPoolExecutorTest, ThreadPoolExecutorTest, diff --git a/python3/futures/process.py b/python3/futures/process.py index ad6dc6d..5c1eb4b 100644 --- a/python3/futures/process.py +++ b/python3/futures/process.py @@ -116,7 +116,6 @@ class ProcessPoolExecutor(Executor): if self._queue_management_thread is None: self._queue_management_thread = threading.Thread( target=self._result) - self._queue_management_thread.daemon = True self._queue_management_thread.start() for _ in range(len(self._processes), self._max_processes): @@ -125,7 +124,6 @@ class ProcessPoolExecutor(Executor): args=(self._call_queue, self._result_queue, self._shutdown_process_event)) - p.daemon = True p.start() self._processes.add(p) diff --git a/python3/futures/thread.py b/python3/futures/thread.py index 9e3275f..18b8c07 100644 --- a/python3/futures/thread.py +++ b/python3/futures/thread.py @@ -62,7 +62,6 @@ class ThreadPoolExecutor(Executor): for _ in range(len(self._threads), min(self._max_threads, self._work_queue.qsize())): t = threading.Thread(target=self._worker) - t.daemon = True t.start() self._threads.add(t) diff --git a/python3/test_futures.py b/python3/test_futures.py index 7a4425a..102f748 100644 --- a/python3/test_futures.py +++ b/python3/test_futures.py @@ -1,5 +1,4 @@ import test.support -from test.support import verbose import unittest import threading @@ -412,6 +411,7 @@ class ExecutorTest(unittest.TestCase): self.assertFalse(f5.done()) self.assertEqual(f5.index, 4) finally: + call3.set_can() # Let the call finish executing. call1.close() call2.close() call3.close() |