diff options
author | brian.quinlan <devnull@localhost> | 2009-05-03 17:33:27 +0000 |
---|---|---|
committer | brian.quinlan <devnull@localhost> | 2009-05-03 17:33:27 +0000 |
commit | 3b24ddaa96d138bac457f5d394c3a71947ad2d15 (patch) | |
tree | 23f096fcb085112826a416db4c154984db460e3f | |
parent | ce2ea56f9b0267a31aae79cacc939006ca53ea0b (diff) | |
download | futures-3b24ddaa96d138bac457f5d394c3a71947ad2d15.tar.gz |
Initial checkin.
-rw-r--r-- | PEP.txt | 25 | ||||
-rw-r--r-- | copy.py | 65 | ||||
-rw-r--r-- | crawl.py | 38 | ||||
-rw-r--r-- | futures/__init__.py | 0 | ||||
-rw-r--r-- | futures/process.py | 94 | ||||
-rw-r--r-- | futures/thread.py | 346 | ||||
-rw-r--r-- | test_futures.py | 393 |
7 files changed, 961 insertions, 0 deletions
@@ -0,0 +1,25 @@ +""" +Abstract + + Python currently has powerful primitives to construct multi-threaded + applications but parallelizing simple functions requires a lot of + setup work i.e. explicitly launching threads, constructing a + work/results queue, and waiting for completion or some other + termination condition (e.g. exception, success). It is also hard to + manage the global . This PEP proposes the addition + + + Python currently distinguishes between two kinds of integers + (ints): regular or short ints, limited by the size of a C long + (typically 32 or 64 bits), and long ints, which are limited only + by available memory. When operations on short ints yield results + that don't fit in a C long, they raise an error. There are some + other distinctions too. This PEP proposes to do away with most of + the differences in semantics, unifying the two types from the + perspective of the Python user. + + +Abstract + + +"""
\ No newline at end of file @@ -0,0 +1,65 @@ +#!/usr/bin/env python + +import functools +import futures.thread as futures +import shutil +import os +import os.path + +def copy_many(destination, sources_files): + for source_file in sources_files: + copied_files = [] + try: + shutil.copy2(source_file, destination) + except IOError, e: + for delete_file in copied_files: + try: + os.remove(delete_file) + except: + pass + raise + else: + copied_files.append(source_files) + +p = futures.ThreadPoolExecutor(max_threads=15) + +def copy_many(destination, sources_files): + copies = p.run( + (functools.partial(shutil.copy2, sources_file, destination) + for sources_file in sources_files), + run_until=futures.FIRST_EXCEPTION) + + if copies.has_exception_futures(): + copies.cancel() + for f in map(copies: + if not f.exception(): + + functools.partial(os.path.) + raise copies.get_exception_futures()[0].exception() + +""" +def copy_many(sources_and_destinations): + copies = p.run( + (functools.partial(shutil.copytree, source, destination) + for source, destination in sources_and_destinations), + run_until=futures.FIRST_EXCEPTION) + + print('copies:', copies) + if copies.has_exception_futures(): + print('HAS EXCEPTIONS') + copies.cancel(exit_running=True, wait_unit_finished=True) + p.run( + (functools.partial(shutil.rmtree, destination, ignore_errors=True) + for _, destination in sources_and_destinations), + run_until=futures.ALL_COMPLETED) + raise copies.get_exception_futures()[0].exception() + print('copies:', copies) + + +copy_many([('source1', 'destination/source1'), + ('source2', 'destination/source2'), + ('source3', 'destination/source3')]) +""" + +copy_many2('destination', ['source1', 'source2', 'source3', 'source4', 'source5', + 'source6', 'source7', 'source8', 'source9', 'source11'])
\ No newline at end of file diff --git a/crawl.py b/crawl.py new file mode 100644 index 0000000..57f9e8e --- /dev/null +++ b/crawl.py @@ -0,0 +1,38 @@ +import datetime +import functools +import futures.thread +import time +import timeit +import urllib.request + +URLS = ['http://www.google.com/', + 'http://www.apple.com/', + 'http://www.ibm.com', + 'http://www.thisurlprobablydoesnotexist.com', + 'http://www.slashdot.org/', + 'http://www.python.org/', + 'http://www.sweetapp.com/'] + +def load_url(url, timeout): + return urllib.request.urlopen(url, timeout=timeout).read() + +def download_urls(urls, timeout=60): + url_to_content = {} + for url in urls: + try: + url_to_content[url] = load_url(url, timeout=timeout) + except: + pass + return url_to_content + +executor = futures.thread.ThreadPoolExecutor(max_threads=100) +def download_urls_with_futures(urls, timeout=60): + url_to_content = {} + fs = executor.run( + (functools.partial(load_url, url, timeout) for url in urls), + timeout=timeout) + for url, future in zip(urls, fs.result_futures()): + url_to_content[url] = future.result() + return url_to_content + +print(download_urls(URLS)) diff --git a/futures/__init__.py b/futures/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/futures/__init__.py diff --git a/futures/process.py b/futures/process.py new file mode 100644 index 0000000..945303f --- /dev/null +++ b/futures/process.py @@ -0,0 +1,94 @@ +#!/usr/bin/env python + +class _WorkItem(object): + def __init__(self, call, future, completion_tracker): + self.call = call + self.future = future + self.completion_tracker = completion_tracker + + def run(self): + if self.future.cancelled(): + with self.future._condition: + self.future._condition.notify_all() + self.completion_tracker.add_cancelled() + return + + self.future._state = _RUNNING + try: + r = self.call() + except BaseException as e: + with self.future._condition: + self.future._exception = e + self.future._state = _FINISHED + self.future._condition.notify_all() + self.completion_tracker.add_exception() + else: + with self.future._condition: + self.future._result = r + self.future._state = _FINISHED + self.future._condition.notify_all() + self.completion_tracker.add_result() + +class XXX: + def wait(self, timeout=None, run_until=ALL_COMPLETED): + + pass + +class ProcessPoolExecutor(object): + def __init__(self, max_processes): + self._max_processes = max_processes + self._work_queue = multiprocessing.Queue() + self._processes = set() + self._shutdown = False + self._lock = threading.Lock() + self._queue_count = 0 + self._pending_futures = {} + + def _(self): + while True: + try: + result_item = self._result_queue.get(block=True, + timeout=0.1) + except multiprocessing.TimeoutError: + if self._shutdown: + return + else: + completion_tracker, future = self._pending_futures[ + result_item.index] + + if result_item.exception: + with future._condition: + future._exception = result_item.exception + future._state = _FINISHED + future._condition.notify_all() + completion_tracker.add_exception() + else: + with future._condition: + future._result = result_item.result + future._state = _FINISHED + future._condition.notify_all() + completion_tracker.add_result() + + + + def _adjust_process_count(self): + + def run(self, calls, timeout=None, run_until=ALL_COMPLETED): + with self._lock: + if self._shutdown: + raise RuntimeError() + + futures = [] + event_sink = _ThreadEventSink() + for call in calls: + f = Future() + w = _WorkItem(call, f, event_sink) + self._work_queue.put(w) + futures.append(f) + self._queue_count += 1 + + print('futures:', futures) + self._adjust_process_count() + fl = FutureList(futures, event_sink) + fl.wait(timeout=timeout, run_until=run_until) + return fl diff --git a/futures/thread.py b/futures/thread.py new file mode 100644 index 0000000..a4c779d --- /dev/null +++ b/futures/thread.py @@ -0,0 +1,346 @@ +#!/usr/bin/env python + +import queue +import threading + +FIRST_COMPLETED = 0 +FIRST_EXCEPTION = 1 +ALL_COMPLETED = 2 +RETURN_IMMEDIATELY = 3 + +_PENDING = 0 +_RUNNING = 1 +_CANCELLED = 2 +_FINISHED = 3 + +_STATE_TO_DESCRIPTION_MAP = { + _PENDING: "pending", + _RUNNING: "running", + _CANCELLED: "cancelled", + _FINISHED: "finished" +} + +class CancelledException(Exception): + pass + +class TimeoutException(Exception): + pass + +class Future(object): + def __init__(self): + self._condition = threading.Condition() + self._state = _PENDING + self._result = None + self._exception = None + + def __repr__(self): + with self._condition: + if self._state == _FINISHED: + if self._exception: + return '<Future state=%s raised %s>' % ( + _STATE_TO_DESCRIPTION_MAP[self._state], + self._exception.__class__.__name__) + else: + return '<Future state=%s returned %s>' % ( + _STATE_TO_DESCRIPTION_MAP[self._state], + self._result.__class__.__name__) + return '<Future state=%s>' % _STATE_TO_DESCRIPTION_MAP[self._state] + + def cancel(self): + with self._condition: + if self._state in [_RUNNING, _FINISHED]: + return False + + self._state = _CANCELLED + return True + + def cancelled(self): + with self._condition: + return self._state == _CANCELLED + + def done(self): + with self._condition: + return self._state in [_CANCELLED, _FINISHED] + + def __get_result(self): + if self._exception: + raise self._exception + else: + return self._result + + def result(self, timeout=None): + with self._condition: + if self._state == _CANCELLED: + raise CancelledException() + elif self._state == _FINISHED: + return self.__get_result() + + print('Waiting...') + self._condition.wait(timeout) + print('Post Waiting...') + + if self._state == _CANCELLED: + raise CancelledException() + elif self._state == _FINISHED: + return self.__get_result() + else: + raise TimeoutException() + + def exception(self, timeout=None): + with self._condition: + if self._state == _CANCELLED: + raise CancelledException() + elif self._state == _FINISHED: + return self._exception + + self._condition.wait(timeout) + + if self._state == _CANCELLED: + raise CancelledException() + elif self._state == _FINISHED: + return self._exception + else: + raise TimeoutException() + +class _NullWaitTracker(object): + def add_result(self): + pass + + def add_exception(self): + pass + + def add_cancelled(self): + pass + +class _FirstCompletedWaitTracker(object): + def __init__(self): + self.event = threading.Event() + + def add_result(self): + self.event.set() + + def add_exception(self): + self.event.set() + + def add_cancelled(self): + self.event.set() + +class _AllCompletedWaitTracker(object): + def __init__(self, pending_calls, stop_on_exception): + self.event = threading.Event() + self.pending_calls = pending_calls + self.stop_on_exception = stop_on_exception + + def add_result(self): + self.pending_calls -= 1 + if not self.pending_calls: + self.event.set() + + def add_exception(self): + self.add_result() + if self.stop_on_exception: + self.event.set() + + def add_cancelled(self): + self.add_result() + +class _ThreadEventSink(object): + def __init__(self): + self._condition = threading.Lock() + self._waiters = [] + + def add(self, e): + self._waiters.append(e) + + def add_result(self): + with self._condition: + for waiter in self._waiters: + waiter.add_result() + + def add_exception(self): + with self._condition: + for waiter in self._waiters: + waiter.add_exception() + + def add_cancelled(self): + with self._condition: + for waiter in self._waiters: + waiter.add_cancelled() + +class FutureList(object): + def __init__(self, futures, event_sink): + self._futures = futures + self._event_sink = event_sink + + def wait(self, timeout=None, run_until=ALL_COMPLETED): + with self._event_sink._condition: + print('WAIT 123') + if all(f.done() for f in self): + return + print('WAIT 1234') + + if run_until == FIRST_COMPLETED: + m = _FirstCompletedWaitTracker() + elif run_until == FIRST_EXCEPTION: + m = _AllCompletedWaitTracker(len(self), stop_on_exception=True) + elif run_until == ALL_COMPLETED: + m = _AllCompletedWaitTracker(len(self), stop_on_exception=False) + elif run_until == RETURN_IMMEDIATELY: + m = _NullWaitTracker() + else: + raise ValueError() + + self._event_sink.add(m) + + if run_until != RETURN_IMMEDIATELY: + print('WAIT 12345', timeout) + m.event.wait(timeout) + + def cancel(self, timeout=None): + for f in self: + f.cancel() + self.wait(timeout=timeout, run_until=ALL_COMPLETED) + if any(not f.done() for f in self): + raise TimeoutException() + + def has_running_futures(self): + return bool(self.running_futures()) + + def has_cancelled_futures(self): + return bool(self.cancelled_futures()) + + def has_done_futures(self): + return bool(self.done_futures()) + + def has_successful_futures(self): + return bool(self.successful_futures()) + + def has_exception_futures(self): + return bool(self.exception_futures()) + + def running_futures(self): + return [f for f in self if not f.done() and not f.cancelled()] + + def cancelled_futures(self): + return [f for f in self if f.cancelled()] + + def done_futures(self): + return [f for f in self if f.done()] + + def successful_futures(self): + return [f for f in self + if f.done() and not f.cancelled() and f.exception() is None] + + def exception_futures(self): + return [f for f in self if f.done() and f.exception() is not None] + + def __getitem__(self, i): + return self._futures[i] + + def __len__(self): + return len(self._futures) + + def __iter__(self): + return iter(self._futures) + + def __contains__(self, f): + return f in self._futures + + def __repr__(self): + return ('<FutureList #futures=%d ' + '[#success=%d #exception=%d #cancelled=%d]>' % ( + len(self), + len(self.successful_futures()), + len(self.exception_futures()), + len(self.cancelled_futures()))) + +class _WorkItem(object): + def __init__(self, call, future, completion_tracker): + self.call = call + self.future = future + self.completion_tracker = completion_tracker + + def run(self): + if self.future.cancelled(): + with self.future._condition: + self.future._condition.notify_all() + self.completion_tracker.add_cancelled() + return + + self.future._state = _RUNNING + try: + r = self.call() + except BaseException as e: + with self.future._condition: + self.future._exception = e + self.future._state = _FINISHED + self.future._condition.notify_all() + self.completion_tracker.add_exception() + else: + with self.future._condition: + self.future._result = r + self.future._state = _FINISHED + self.future._condition.notify_all() + self.completion_tracker.add_result() + +class ThreadPoolExecutor(object): + def __init__(self, max_threads): + self._max_threads = max_threads + self._work_queue = queue.Queue() + self._threads = set() + self._shutdown = False + self._lock = threading.Lock() + + def _worker(self): + try: + while True: + try: + work_item = self._work_queue.get(block=True, + timeout=0.1) + except queue.Empty: + if self._shutdown: + return + else: + work_item.run() + except BaseException as e: + print('Out e:', e) + + def _adjust_thread_count(self): + for _ in range(len(self._threads), + min(self._max_threads, self._work_queue.qsize())): + print('Creating a thread') + t = threading.Thread(target=self._worker) + t.daemon = True + t.start() + self._threads.add(t) + + def run(self, calls, timeout=None, run_until=ALL_COMPLETED): + with self._lock: + if self._shutdown: + raise RuntimeError() + + futures = [] + event_sink = _ThreadEventSink() + for call in calls: + f = Future() + w = _WorkItem(call, f, event_sink) + self._work_queue.put(w) + futures.append(f) + + print('futures:', futures) + self._adjust_thread_count() + fl = FutureList(futures, event_sink) + fl.wait(timeout=timeout, run_until=run_until) + return fl + + def runXXX(self, calls, timeout=None): + fs = self.run(calls, timeout, run_util=FIRST_EXCEPTION) + + if fs.has_exception_futures(): + raise fs.exception_futures()[0].exception() + else: + return [f.result() for f in fs] + + def shutdown(self): + with self._lock: + self._shutdown = True diff --git a/test_futures.py b/test_futures.py new file mode 100644 index 0000000..c0f0539 --- /dev/null +++ b/test_futures.py @@ -0,0 +1,393 @@ +import test.support +from test.support import verbose + +import unittest +import threading +import time + +import futures.thread as threaded_futures + +class Call(object): + def __init__(self, manual_finish=False): + self._called_event = threading.Event() + + self._can_finished = threading.Event() + if not manual_finish: + self._can_finished.set() + + def wait_on_called(self): + self._called_event.wait() + + def set_can(self): + self._can_finished.set() + + def called(self): + return self._called_event.is_set() + + def __call__(self): + if self._called_event.is_set(): print('called twice') + + print('Doing call...') + self._called_event.set() + self._can_finished.wait() + print('About to return...') + return 42 + +class ExceptionCall(Call): + def __call__(self): + assert not self._called_event.is_set(), 'already called' + + print('Doing exception call...') + self._called_event.set() + self._can_finished.wait() + print('About to raise...') + raise ZeroDivisionError() + +class FutureStub(object): + def __init__(self, cancelled, done, exception=None): + self._cancelled = cancelled + self._done = done + self._exception = exception + + def cancelled(self): + return self._cancelled + + def done(self): + return self._done + + def exception(self): + return self._exception + +class ShutdownTest(unittest.TestCase): + def test_run_after_shutdown(self): + self.executor = threaded_futures.ThreadPoolExecutor(max_threads=1) + + call1 = Call() + self.executor.shutdown() + self.assertRaises(RuntimeError, + self.executor.run, + [call1]) + + def test_threads_terminate(self): + self.executor = threaded_futures.ThreadPoolExecutor(max_threads=5) + + call1 = Call(manual_finish=True) + call2 = Call(manual_finish=True) + call3 = Call(manual_finish=True) + + self.executor.run([call1, call2, call3], + run_until=threaded_futures.RETURN_IMMEDIATELY) + + call1.wait_on_called() + call2.wait_on_called() + call3.wait_on_called() + + call1.set_can() + call2.set_can() + call3.set_can() + + self.assertEqual(len(self.executor._threads), 3) + self.executor.shutdown() + for t in self.executor._threads: + t.join() + + +class ConcurrentWaitsTest(unittest.TestCase): + def test(self): + def aaa(): + fs.wait(run_until=threaded_futures.ALL_COMPLETED) + self.assertTrue(f1.done()) + self.assertTrue(f2.done()) + self.assertTrue(f3.done()) + self.assertTrue(f4.done()) + + def bbb(): + fs.wait(run_until=threaded_futures.FIRST_COMPLETED) + self.assertTrue(f1.done()) + self.assertFalse(f2.done()) + self.assertFalse(f3.done()) + self.assertFalse(f4.done()) + + def ccc(): + fs.wait(run_until=threaded_futures.FIRST_EXCEPTION) + self.assertTrue(f1.done()) + self.assertTrue(f2.done()) + print('fs:', fs) + print(f1, f2, f3, f4) + self.assertFalse(f3.done()) + self.assertFalse(f4.done()) + + executor = threaded_futures.ThreadPoolExecutor(max_threads=1) + + call1 = Call(manual_finish=True) + call2 = ExceptionCall(manual_finish=True) + call3 = Call(manual_finish=True) + call4 = Call() + + fs = executor.run([call1, call2, call3, call4], + run_until=threaded_futures.RETURN_IMMEDIATELY) + f1, f2, f3, f4 = fs + + threads = [] + for call in [aaa, bbb, ccc] * 3: + t = threading.Thread(target=call) + t.start() + threads.append(t) + + time.sleep(1) + call1.set_can() + time.sleep(1) + call2.set_can() + time.sleep(1) + call3.set_can() + time.sleep(1) + call4.set_can() + + for t in threads: + print('join') + t.join() + print('shutdown') + executor.shutdown() + print('done shutdown') + +class CancelTests(unittest.TestCase): + def test_cancel_states(self): + executor = threaded_futures.ThreadPoolExecutor(max_threads=1) + + call1 = Call(manual_finish=True) + call2 = Call() + call3 = Call() + call4 = Call() + + fs = executor.run([call1, call2, call3, call4], + run_until=threaded_futures.RETURN_IMMEDIATELY) + f1, f2, f3, f4 = fs + + call1.wait_on_called() + self.assertEqual(f1.cancel(), False) + self.assertEqual(f2.cancel(), True) + self.assertEqual(f4.cancel(), True) + self.assertEqual(f1.cancelled(), False) + self.assertEqual(f2.cancelled(), True) + self.assertEqual(f3.cancelled(), False) + self.assertEqual(f4.cancelled(), True) + self.assertEqual(f1.done(), False) + self.assertEqual(f2.done(), True) + self.assertEqual(f3.done(), False) + self.assertEqual(f4.done(), True) + + call1.set_can() + fs.wait(run_until=threaded_futures.ALL_COMPLETED) + self.assertEqual(f1.result(), 42) + self.assertRaises(threaded_futures.CancelledException, f2.result) + self.assertRaises(threaded_futures.CancelledException, f2.exception) + self.assertEqual(f3.result(), 42) + self.assertRaises(threaded_futures.CancelledException, f4.result) + self.assertRaises(threaded_futures.CancelledException, f4.exception) + + self.assertEqual(call2.called(), False) + self.assertEqual(call4.called(), False) + executor.shutdown() + + def test_wait_for_individual_cancel(self): + def end_call(): + print ('Here1') + time.sleep(1) + print ('Here2') + f2.cancel() + print ('Here3') + call1.set_can() + print ('Here4') + + executor = threaded_futures.ThreadPoolExecutor(max_threads=1) + + call1 = Call(manual_finish=True) + call2 = Call() + + fs = executor.run([call1, call2], run_until=threaded_futures.RETURN_IMMEDIATELY) + f1, f2 = fs + + call1.wait_on_called() + t = threading.Thread(target=end_call) + t.start() + self.assertRaises(threaded_futures.CancelledException, f2.result) + self.assertRaises(threaded_futures.CancelledException, f2.exception) + t.join() + executor.shutdown() + + def test_cancel_all(self): + executor = threaded_futures.ThreadPoolExecutor(max_threads=1) + + call1 = Call(manual_finish=True) + call2 = Call() + call3 = Call() + call4 = Call() + + fs = executor.run([call1, call2, call3, call4], + run_until=threaded_futures.RETURN_IMMEDIATELY) + f1, f2, f3, f4 = fs + + call1.wait_on_called() + print('HERE!!!') + self.assertRaises(threaded_futures.TimeoutException, fs.cancel, timeout=0) + print('HERE 2!!!') + call1.set_can() + fs.cancel() + + self.assertFalse(f1.cancelled()) + self.assertTrue(f2.cancelled()) + self.assertTrue(f3.cancelled()) + self.assertTrue(f4.cancelled()) + executor.shutdown() + + def test_cancel_repr(self): + executor = threaded_futures.ThreadPoolExecutor(max_threads=1) + + call1 = Call(manual_finish=True) + call2 = Call() + + fs = executor.run([call1, call2], run_until=threaded_futures.RETURN_IMMEDIATELY) + f1, f2 = fs + + call1.wait_on_called() + call1.set_can() + f2.cancel() + self.assertEqual(repr(f2), '<Future state=cancelled>') + executor.shutdown() + +class FutureListTests(unittest.TestCase): + def test_cancel_states(self): + f1 = FutureStub(cancelled=False, done=False) + f2 = FutureStub(cancelled=False, done=True) + f3 = FutureStub(cancelled=False, done=True, exception=IOError()) + f4 = FutureStub(cancelled=True, done=True) + + fs = [f1, f2, f3, f4] + f = threaded_futures.FutureList(fs, None) + + self.assertEqual(f.running_futures(), [f1]) + self.assertEqual(f.cancelled_futures(), [f4]) + self.assertEqual(f.done_futures(), [f2, f3, f4]) + self.assertEqual(f.successful_futures(), [f2]) + self.assertEqual(f.exception_futures(), [f3]) + + def test_has_running_futures(self): + f1 = FutureStub(cancelled=False, done=False) + f2 = FutureStub(cancelled=False, done=True) + + self.assertTrue( + threaded_futures.FutureList([f1], None).has_running_futures()) + self.assertFalse( + threaded_futures.FutureList([f2], None).has_running_futures()) + + def test_has_cancelled_futures(self): + f1 = FutureStub(cancelled=True, done=True) + f2 = FutureStub(cancelled=False, done=True) + + self.assertTrue( + threaded_futures.FutureList([f1], None).has_cancelled_futures()) + self.assertFalse( + threaded_futures.FutureList([f2], None).has_cancelled_futures()) + + def test_has_done_futures(self): + f1 = FutureStub(cancelled=True, done=True) + f2 = FutureStub(cancelled=False, done=True) + f3 = FutureStub(cancelled=False, done=False) + + self.assertTrue( + threaded_futures.FutureList([f1], None).has_done_futures()) + self.assertTrue( + threaded_futures.FutureList([f2], None).has_done_futures()) + self.assertFalse( + threaded_futures.FutureList([f3], None).has_done_futures()) + + def test_has_successful_futures(self): + f1 = FutureStub(cancelled=False, done=True) + f2 = FutureStub(cancelled=False, done=True, exception=IOError()) + f3 = FutureStub(cancelled=False, done=False) + f4 = FutureStub(cancelled=True, done=True) + + self.assertTrue( + threaded_futures.FutureList([f1], None).has_successful_futures()) + self.assertFalse( + threaded_futures.FutureList([f2], None).has_successful_futures()) + self.assertFalse( + threaded_futures.FutureList([f3], None).has_successful_futures()) + self.assertFalse( + threaded_futures.FutureList([f4], None).has_successful_futures()) + + def test_has_exception_futures(self): + f1 = FutureStub(cancelled=False, done=True) + f2 = FutureStub(cancelled=False, done=True, exception=IOError()) + f3 = FutureStub(cancelled=False, done=False) + f4 = FutureStub(cancelled=True, done=True) + + self.assertFalse( + threaded_futures.FutureList([f1], None).has_exception_futures()) + self.assertTrue( + threaded_futures.FutureList([f2], None).has_exception_futures()) + self.assertFalse( + threaded_futures.FutureList([f3], None).has_exception_futures()) + self.assertFalse( + threaded_futures.FutureList([f4], None).has_exception_futures()) + + def test_get_item(self): + f1 = FutureStub(cancelled=False, done=False) + f2 = FutureStub(cancelled=False, done=True) + f3 = FutureStub(cancelled=False, done=True) + + fs = [f1, f2, f3] + f = threaded_futures.FutureList(fs, None) + self.assertEqual(f[0], f1) + self.assertEqual(f[1], f2) + self.assertEqual(f[2], f3) + self.assertRaises(IndexError, f.__getitem__, 3) + + def test_len(self): + f1 = FutureStub(cancelled=False, done=False) + f2 = FutureStub(cancelled=False, done=True) + f3 = FutureStub(cancelled=False, done=True) + + f = threaded_futures.FutureList([f1, f2, f3], None) + self.assertEqual(len(f), 3) + + def test_iter(self): + f1 = FutureStub(cancelled=False, done=False) + f2 = FutureStub(cancelled=False, done=True) + f3 = FutureStub(cancelled=False, done=True) + + fs = [f1, f2, f3] + f = threaded_futures.FutureList(fs, None) + self.assertEqual(list(iter(f)), fs) + + def test_contains(self): + f1 = FutureStub(cancelled=False, done=False) + f2 = FutureStub(cancelled=False, done=True) + f3 = FutureStub(cancelled=False, done=True) + + f = threaded_futures.FutureList([f1, f2], None) + self.assertTrue(f1 in f) + self.assertTrue(f2 in f) + self.assertFalse(f3 in f) + + def test_repr(self): + running = FutureStub(cancelled=False, done=False) + result = FutureStub(cancelled=False, done=True) + exception = FutureStub(cancelled=False, done=True, exception=IOError()) + cancelled = FutureStub(cancelled=True, done=True) + + f = threaded_futures.FutureList( + [running] * 4 + [result] * 3 + [exception] * 2 + [cancelled], + None) + + self.assertEqual(repr(f), + '<FutureList #futures=10 ' + '[#success=3 #exception=2 #cancelled=1]>') +def test_main(): + test.support.run_unittest(CancelTests, + ConcurrentWaitsTest, + FutureListTests, + ShutdownTest) + +if __name__ == "__main__": + test_main()
\ No newline at end of file |