summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorbrian.quinlan <devnull@localhost>2009-05-03 17:33:27 +0000
committerbrian.quinlan <devnull@localhost>2009-05-03 17:33:27 +0000
commit3b24ddaa96d138bac457f5d394c3a71947ad2d15 (patch)
tree23f096fcb085112826a416db4c154984db460e3f
parentce2ea56f9b0267a31aae79cacc939006ca53ea0b (diff)
downloadfutures-3b24ddaa96d138bac457f5d394c3a71947ad2d15.tar.gz
Initial checkin.
-rw-r--r--PEP.txt25
-rw-r--r--copy.py65
-rw-r--r--crawl.py38
-rw-r--r--futures/__init__.py0
-rw-r--r--futures/process.py94
-rw-r--r--futures/thread.py346
-rw-r--r--test_futures.py393
7 files changed, 961 insertions, 0 deletions
diff --git a/PEP.txt b/PEP.txt
new file mode 100644
index 0000000..3395c7f
--- /dev/null
+++ b/PEP.txt
@@ -0,0 +1,25 @@
+"""
+Abstract
+
+ Python currently has powerful primitives to construct multi-threaded
+ applications but parallelizing simple functions requires a lot of
+ setup work i.e. explicitly launching threads, constructing a
+ work/results queue, and waiting for completion or some other
+ termination condition (e.g. exception, success). It is also hard to
+ manage the global . This PEP proposes the addition
+
+
+ Python currently distinguishes between two kinds of integers
+ (ints): regular or short ints, limited by the size of a C long
+ (typically 32 or 64 bits), and long ints, which are limited only
+ by available memory. When operations on short ints yield results
+ that don't fit in a C long, they raise an error. There are some
+ other distinctions too. This PEP proposes to do away with most of
+ the differences in semantics, unifying the two types from the
+ perspective of the Python user.
+
+
+Abstract
+
+
+""" \ No newline at end of file
diff --git a/copy.py b/copy.py
new file mode 100644
index 0000000..e06cf74
--- /dev/null
+++ b/copy.py
@@ -0,0 +1,65 @@
+#!/usr/bin/env python
+
+import functools
+import futures.thread as futures
+import shutil
+import os
+import os.path
+
+def copy_many(destination, sources_files):
+ for source_file in sources_files:
+ copied_files = []
+ try:
+ shutil.copy2(source_file, destination)
+ except IOError, e:
+ for delete_file in copied_files:
+ try:
+ os.remove(delete_file)
+ except:
+ pass
+ raise
+ else:
+ copied_files.append(source_files)
+
+p = futures.ThreadPoolExecutor(max_threads=15)
+
+def copy_many(destination, sources_files):
+ copies = p.run(
+ (functools.partial(shutil.copy2, sources_file, destination)
+ for sources_file in sources_files),
+ run_until=futures.FIRST_EXCEPTION)
+
+ if copies.has_exception_futures():
+ copies.cancel()
+ for f in map(copies:
+ if not f.exception():
+
+ functools.partial(os.path.)
+ raise copies.get_exception_futures()[0].exception()
+
+"""
+def copy_many(sources_and_destinations):
+ copies = p.run(
+ (functools.partial(shutil.copytree, source, destination)
+ for source, destination in sources_and_destinations),
+ run_until=futures.FIRST_EXCEPTION)
+
+ print('copies:', copies)
+ if copies.has_exception_futures():
+ print('HAS EXCEPTIONS')
+ copies.cancel(exit_running=True, wait_unit_finished=True)
+ p.run(
+ (functools.partial(shutil.rmtree, destination, ignore_errors=True)
+ for _, destination in sources_and_destinations),
+ run_until=futures.ALL_COMPLETED)
+ raise copies.get_exception_futures()[0].exception()
+ print('copies:', copies)
+
+
+copy_many([('source1', 'destination/source1'),
+ ('source2', 'destination/source2'),
+ ('source3', 'destination/source3')])
+"""
+
+copy_many2('destination', ['source1', 'source2', 'source3', 'source4', 'source5',
+ 'source6', 'source7', 'source8', 'source9', 'source11']) \ No newline at end of file
diff --git a/crawl.py b/crawl.py
new file mode 100644
index 0000000..57f9e8e
--- /dev/null
+++ b/crawl.py
@@ -0,0 +1,38 @@
+import datetime
+import functools
+import futures.thread
+import time
+import timeit
+import urllib.request
+
+URLS = ['http://www.google.com/',
+ 'http://www.apple.com/',
+ 'http://www.ibm.com',
+ 'http://www.thisurlprobablydoesnotexist.com',
+ 'http://www.slashdot.org/',
+ 'http://www.python.org/',
+ 'http://www.sweetapp.com/']
+
+def load_url(url, timeout):
+ return urllib.request.urlopen(url, timeout=timeout).read()
+
+def download_urls(urls, timeout=60):
+ url_to_content = {}
+ for url in urls:
+ try:
+ url_to_content[url] = load_url(url, timeout=timeout)
+ except:
+ pass
+ return url_to_content
+
+executor = futures.thread.ThreadPoolExecutor(max_threads=100)
+def download_urls_with_futures(urls, timeout=60):
+ url_to_content = {}
+ fs = executor.run(
+ (functools.partial(load_url, url, timeout) for url in urls),
+ timeout=timeout)
+ for url, future in zip(urls, fs.result_futures()):
+ url_to_content[url] = future.result()
+ return url_to_content
+
+print(download_urls(URLS))
diff --git a/futures/__init__.py b/futures/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/futures/__init__.py
diff --git a/futures/process.py b/futures/process.py
new file mode 100644
index 0000000..945303f
--- /dev/null
+++ b/futures/process.py
@@ -0,0 +1,94 @@
+#!/usr/bin/env python
+
+class _WorkItem(object):
+ def __init__(self, call, future, completion_tracker):
+ self.call = call
+ self.future = future
+ self.completion_tracker = completion_tracker
+
+ def run(self):
+ if self.future.cancelled():
+ with self.future._condition:
+ self.future._condition.notify_all()
+ self.completion_tracker.add_cancelled()
+ return
+
+ self.future._state = _RUNNING
+ try:
+ r = self.call()
+ except BaseException as e:
+ with self.future._condition:
+ self.future._exception = e
+ self.future._state = _FINISHED
+ self.future._condition.notify_all()
+ self.completion_tracker.add_exception()
+ else:
+ with self.future._condition:
+ self.future._result = r
+ self.future._state = _FINISHED
+ self.future._condition.notify_all()
+ self.completion_tracker.add_result()
+
+class XXX:
+ def wait(self, timeout=None, run_until=ALL_COMPLETED):
+
+ pass
+
+class ProcessPoolExecutor(object):
+ def __init__(self, max_processes):
+ self._max_processes = max_processes
+ self._work_queue = multiprocessing.Queue()
+ self._processes = set()
+ self._shutdown = False
+ self._lock = threading.Lock()
+ self._queue_count = 0
+ self._pending_futures = {}
+
+ def _(self):
+ while True:
+ try:
+ result_item = self._result_queue.get(block=True,
+ timeout=0.1)
+ except multiprocessing.TimeoutError:
+ if self._shutdown:
+ return
+ else:
+ completion_tracker, future = self._pending_futures[
+ result_item.index]
+
+ if result_item.exception:
+ with future._condition:
+ future._exception = result_item.exception
+ future._state = _FINISHED
+ future._condition.notify_all()
+ completion_tracker.add_exception()
+ else:
+ with future._condition:
+ future._result = result_item.result
+ future._state = _FINISHED
+ future._condition.notify_all()
+ completion_tracker.add_result()
+
+
+
+ def _adjust_process_count(self):
+
+ def run(self, calls, timeout=None, run_until=ALL_COMPLETED):
+ with self._lock:
+ if self._shutdown:
+ raise RuntimeError()
+
+ futures = []
+ event_sink = _ThreadEventSink()
+ for call in calls:
+ f = Future()
+ w = _WorkItem(call, f, event_sink)
+ self._work_queue.put(w)
+ futures.append(f)
+ self._queue_count += 1
+
+ print('futures:', futures)
+ self._adjust_process_count()
+ fl = FutureList(futures, event_sink)
+ fl.wait(timeout=timeout, run_until=run_until)
+ return fl
diff --git a/futures/thread.py b/futures/thread.py
new file mode 100644
index 0000000..a4c779d
--- /dev/null
+++ b/futures/thread.py
@@ -0,0 +1,346 @@
+#!/usr/bin/env python
+
+import queue
+import threading
+
+FIRST_COMPLETED = 0
+FIRST_EXCEPTION = 1
+ALL_COMPLETED = 2
+RETURN_IMMEDIATELY = 3
+
+_PENDING = 0
+_RUNNING = 1
+_CANCELLED = 2
+_FINISHED = 3
+
+_STATE_TO_DESCRIPTION_MAP = {
+ _PENDING: "pending",
+ _RUNNING: "running",
+ _CANCELLED: "cancelled",
+ _FINISHED: "finished"
+}
+
+class CancelledException(Exception):
+ pass
+
+class TimeoutException(Exception):
+ pass
+
+class Future(object):
+ def __init__(self):
+ self._condition = threading.Condition()
+ self._state = _PENDING
+ self._result = None
+ self._exception = None
+
+ def __repr__(self):
+ with self._condition:
+ if self._state == _FINISHED:
+ if self._exception:
+ return '<Future state=%s raised %s>' % (
+ _STATE_TO_DESCRIPTION_MAP[self._state],
+ self._exception.__class__.__name__)
+ else:
+ return '<Future state=%s returned %s>' % (
+ _STATE_TO_DESCRIPTION_MAP[self._state],
+ self._result.__class__.__name__)
+ return '<Future state=%s>' % _STATE_TO_DESCRIPTION_MAP[self._state]
+
+ def cancel(self):
+ with self._condition:
+ if self._state in [_RUNNING, _FINISHED]:
+ return False
+
+ self._state = _CANCELLED
+ return True
+
+ def cancelled(self):
+ with self._condition:
+ return self._state == _CANCELLED
+
+ def done(self):
+ with self._condition:
+ return self._state in [_CANCELLED, _FINISHED]
+
+ def __get_result(self):
+ if self._exception:
+ raise self._exception
+ else:
+ return self._result
+
+ def result(self, timeout=None):
+ with self._condition:
+ if self._state == _CANCELLED:
+ raise CancelledException()
+ elif self._state == _FINISHED:
+ return self.__get_result()
+
+ print('Waiting...')
+ self._condition.wait(timeout)
+ print('Post Waiting...')
+
+ if self._state == _CANCELLED:
+ raise CancelledException()
+ elif self._state == _FINISHED:
+ return self.__get_result()
+ else:
+ raise TimeoutException()
+
+ def exception(self, timeout=None):
+ with self._condition:
+ if self._state == _CANCELLED:
+ raise CancelledException()
+ elif self._state == _FINISHED:
+ return self._exception
+
+ self._condition.wait(timeout)
+
+ if self._state == _CANCELLED:
+ raise CancelledException()
+ elif self._state == _FINISHED:
+ return self._exception
+ else:
+ raise TimeoutException()
+
+class _NullWaitTracker(object):
+ def add_result(self):
+ pass
+
+ def add_exception(self):
+ pass
+
+ def add_cancelled(self):
+ pass
+
+class _FirstCompletedWaitTracker(object):
+ def __init__(self):
+ self.event = threading.Event()
+
+ def add_result(self):
+ self.event.set()
+
+ def add_exception(self):
+ self.event.set()
+
+ def add_cancelled(self):
+ self.event.set()
+
+class _AllCompletedWaitTracker(object):
+ def __init__(self, pending_calls, stop_on_exception):
+ self.event = threading.Event()
+ self.pending_calls = pending_calls
+ self.stop_on_exception = stop_on_exception
+
+ def add_result(self):
+ self.pending_calls -= 1
+ if not self.pending_calls:
+ self.event.set()
+
+ def add_exception(self):
+ self.add_result()
+ if self.stop_on_exception:
+ self.event.set()
+
+ def add_cancelled(self):
+ self.add_result()
+
+class _ThreadEventSink(object):
+ def __init__(self):
+ self._condition = threading.Lock()
+ self._waiters = []
+
+ def add(self, e):
+ self._waiters.append(e)
+
+ def add_result(self):
+ with self._condition:
+ for waiter in self._waiters:
+ waiter.add_result()
+
+ def add_exception(self):
+ with self._condition:
+ for waiter in self._waiters:
+ waiter.add_exception()
+
+ def add_cancelled(self):
+ with self._condition:
+ for waiter in self._waiters:
+ waiter.add_cancelled()
+
+class FutureList(object):
+ def __init__(self, futures, event_sink):
+ self._futures = futures
+ self._event_sink = event_sink
+
+ def wait(self, timeout=None, run_until=ALL_COMPLETED):
+ with self._event_sink._condition:
+ print('WAIT 123')
+ if all(f.done() for f in self):
+ return
+ print('WAIT 1234')
+
+ if run_until == FIRST_COMPLETED:
+ m = _FirstCompletedWaitTracker()
+ elif run_until == FIRST_EXCEPTION:
+ m = _AllCompletedWaitTracker(len(self), stop_on_exception=True)
+ elif run_until == ALL_COMPLETED:
+ m = _AllCompletedWaitTracker(len(self), stop_on_exception=False)
+ elif run_until == RETURN_IMMEDIATELY:
+ m = _NullWaitTracker()
+ else:
+ raise ValueError()
+
+ self._event_sink.add(m)
+
+ if run_until != RETURN_IMMEDIATELY:
+ print('WAIT 12345', timeout)
+ m.event.wait(timeout)
+
+ def cancel(self, timeout=None):
+ for f in self:
+ f.cancel()
+ self.wait(timeout=timeout, run_until=ALL_COMPLETED)
+ if any(not f.done() for f in self):
+ raise TimeoutException()
+
+ def has_running_futures(self):
+ return bool(self.running_futures())
+
+ def has_cancelled_futures(self):
+ return bool(self.cancelled_futures())
+
+ def has_done_futures(self):
+ return bool(self.done_futures())
+
+ def has_successful_futures(self):
+ return bool(self.successful_futures())
+
+ def has_exception_futures(self):
+ return bool(self.exception_futures())
+
+ def running_futures(self):
+ return [f for f in self if not f.done() and not f.cancelled()]
+
+ def cancelled_futures(self):
+ return [f for f in self if f.cancelled()]
+
+ def done_futures(self):
+ return [f for f in self if f.done()]
+
+ def successful_futures(self):
+ return [f for f in self
+ if f.done() and not f.cancelled() and f.exception() is None]
+
+ def exception_futures(self):
+ return [f for f in self if f.done() and f.exception() is not None]
+
+ def __getitem__(self, i):
+ return self._futures[i]
+
+ def __len__(self):
+ return len(self._futures)
+
+ def __iter__(self):
+ return iter(self._futures)
+
+ def __contains__(self, f):
+ return f in self._futures
+
+ def __repr__(self):
+ return ('<FutureList #futures=%d '
+ '[#success=%d #exception=%d #cancelled=%d]>' % (
+ len(self),
+ len(self.successful_futures()),
+ len(self.exception_futures()),
+ len(self.cancelled_futures())))
+
+class _WorkItem(object):
+ def __init__(self, call, future, completion_tracker):
+ self.call = call
+ self.future = future
+ self.completion_tracker = completion_tracker
+
+ def run(self):
+ if self.future.cancelled():
+ with self.future._condition:
+ self.future._condition.notify_all()
+ self.completion_tracker.add_cancelled()
+ return
+
+ self.future._state = _RUNNING
+ try:
+ r = self.call()
+ except BaseException as e:
+ with self.future._condition:
+ self.future._exception = e
+ self.future._state = _FINISHED
+ self.future._condition.notify_all()
+ self.completion_tracker.add_exception()
+ else:
+ with self.future._condition:
+ self.future._result = r
+ self.future._state = _FINISHED
+ self.future._condition.notify_all()
+ self.completion_tracker.add_result()
+
+class ThreadPoolExecutor(object):
+ def __init__(self, max_threads):
+ self._max_threads = max_threads
+ self._work_queue = queue.Queue()
+ self._threads = set()
+ self._shutdown = False
+ self._lock = threading.Lock()
+
+ def _worker(self):
+ try:
+ while True:
+ try:
+ work_item = self._work_queue.get(block=True,
+ timeout=0.1)
+ except queue.Empty:
+ if self._shutdown:
+ return
+ else:
+ work_item.run()
+ except BaseException as e:
+ print('Out e:', e)
+
+ def _adjust_thread_count(self):
+ for _ in range(len(self._threads),
+ min(self._max_threads, self._work_queue.qsize())):
+ print('Creating a thread')
+ t = threading.Thread(target=self._worker)
+ t.daemon = True
+ t.start()
+ self._threads.add(t)
+
+ def run(self, calls, timeout=None, run_until=ALL_COMPLETED):
+ with self._lock:
+ if self._shutdown:
+ raise RuntimeError()
+
+ futures = []
+ event_sink = _ThreadEventSink()
+ for call in calls:
+ f = Future()
+ w = _WorkItem(call, f, event_sink)
+ self._work_queue.put(w)
+ futures.append(f)
+
+ print('futures:', futures)
+ self._adjust_thread_count()
+ fl = FutureList(futures, event_sink)
+ fl.wait(timeout=timeout, run_until=run_until)
+ return fl
+
+ def runXXX(self, calls, timeout=None):
+ fs = self.run(calls, timeout, run_util=FIRST_EXCEPTION)
+
+ if fs.has_exception_futures():
+ raise fs.exception_futures()[0].exception()
+ else:
+ return [f.result() for f in fs]
+
+ def shutdown(self):
+ with self._lock:
+ self._shutdown = True
diff --git a/test_futures.py b/test_futures.py
new file mode 100644
index 0000000..c0f0539
--- /dev/null
+++ b/test_futures.py
@@ -0,0 +1,393 @@
+import test.support
+from test.support import verbose
+
+import unittest
+import threading
+import time
+
+import futures.thread as threaded_futures
+
+class Call(object):
+ def __init__(self, manual_finish=False):
+ self._called_event = threading.Event()
+
+ self._can_finished = threading.Event()
+ if not manual_finish:
+ self._can_finished.set()
+
+ def wait_on_called(self):
+ self._called_event.wait()
+
+ def set_can(self):
+ self._can_finished.set()
+
+ def called(self):
+ return self._called_event.is_set()
+
+ def __call__(self):
+ if self._called_event.is_set(): print('called twice')
+
+ print('Doing call...')
+ self._called_event.set()
+ self._can_finished.wait()
+ print('About to return...')
+ return 42
+
+class ExceptionCall(Call):
+ def __call__(self):
+ assert not self._called_event.is_set(), 'already called'
+
+ print('Doing exception call...')
+ self._called_event.set()
+ self._can_finished.wait()
+ print('About to raise...')
+ raise ZeroDivisionError()
+
+class FutureStub(object):
+ def __init__(self, cancelled, done, exception=None):
+ self._cancelled = cancelled
+ self._done = done
+ self._exception = exception
+
+ def cancelled(self):
+ return self._cancelled
+
+ def done(self):
+ return self._done
+
+ def exception(self):
+ return self._exception
+
+class ShutdownTest(unittest.TestCase):
+ def test_run_after_shutdown(self):
+ self.executor = threaded_futures.ThreadPoolExecutor(max_threads=1)
+
+ call1 = Call()
+ self.executor.shutdown()
+ self.assertRaises(RuntimeError,
+ self.executor.run,
+ [call1])
+
+ def test_threads_terminate(self):
+ self.executor = threaded_futures.ThreadPoolExecutor(max_threads=5)
+
+ call1 = Call(manual_finish=True)
+ call2 = Call(manual_finish=True)
+ call3 = Call(manual_finish=True)
+
+ self.executor.run([call1, call2, call3],
+ run_until=threaded_futures.RETURN_IMMEDIATELY)
+
+ call1.wait_on_called()
+ call2.wait_on_called()
+ call3.wait_on_called()
+
+ call1.set_can()
+ call2.set_can()
+ call3.set_can()
+
+ self.assertEqual(len(self.executor._threads), 3)
+ self.executor.shutdown()
+ for t in self.executor._threads:
+ t.join()
+
+
+class ConcurrentWaitsTest(unittest.TestCase):
+ def test(self):
+ def aaa():
+ fs.wait(run_until=threaded_futures.ALL_COMPLETED)
+ self.assertTrue(f1.done())
+ self.assertTrue(f2.done())
+ self.assertTrue(f3.done())
+ self.assertTrue(f4.done())
+
+ def bbb():
+ fs.wait(run_until=threaded_futures.FIRST_COMPLETED)
+ self.assertTrue(f1.done())
+ self.assertFalse(f2.done())
+ self.assertFalse(f3.done())
+ self.assertFalse(f4.done())
+
+ def ccc():
+ fs.wait(run_until=threaded_futures.FIRST_EXCEPTION)
+ self.assertTrue(f1.done())
+ self.assertTrue(f2.done())
+ print('fs:', fs)
+ print(f1, f2, f3, f4)
+ self.assertFalse(f3.done())
+ self.assertFalse(f4.done())
+
+ executor = threaded_futures.ThreadPoolExecutor(max_threads=1)
+
+ call1 = Call(manual_finish=True)
+ call2 = ExceptionCall(manual_finish=True)
+ call3 = Call(manual_finish=True)
+ call4 = Call()
+
+ fs = executor.run([call1, call2, call3, call4],
+ run_until=threaded_futures.RETURN_IMMEDIATELY)
+ f1, f2, f3, f4 = fs
+
+ threads = []
+ for call in [aaa, bbb, ccc] * 3:
+ t = threading.Thread(target=call)
+ t.start()
+ threads.append(t)
+
+ time.sleep(1)
+ call1.set_can()
+ time.sleep(1)
+ call2.set_can()
+ time.sleep(1)
+ call3.set_can()
+ time.sleep(1)
+ call4.set_can()
+
+ for t in threads:
+ print('join')
+ t.join()
+ print('shutdown')
+ executor.shutdown()
+ print('done shutdown')
+
+class CancelTests(unittest.TestCase):
+ def test_cancel_states(self):
+ executor = threaded_futures.ThreadPoolExecutor(max_threads=1)
+
+ call1 = Call(manual_finish=True)
+ call2 = Call()
+ call3 = Call()
+ call4 = Call()
+
+ fs = executor.run([call1, call2, call3, call4],
+ run_until=threaded_futures.RETURN_IMMEDIATELY)
+ f1, f2, f3, f4 = fs
+
+ call1.wait_on_called()
+ self.assertEqual(f1.cancel(), False)
+ self.assertEqual(f2.cancel(), True)
+ self.assertEqual(f4.cancel(), True)
+ self.assertEqual(f1.cancelled(), False)
+ self.assertEqual(f2.cancelled(), True)
+ self.assertEqual(f3.cancelled(), False)
+ self.assertEqual(f4.cancelled(), True)
+ self.assertEqual(f1.done(), False)
+ self.assertEqual(f2.done(), True)
+ self.assertEqual(f3.done(), False)
+ self.assertEqual(f4.done(), True)
+
+ call1.set_can()
+ fs.wait(run_until=threaded_futures.ALL_COMPLETED)
+ self.assertEqual(f1.result(), 42)
+ self.assertRaises(threaded_futures.CancelledException, f2.result)
+ self.assertRaises(threaded_futures.CancelledException, f2.exception)
+ self.assertEqual(f3.result(), 42)
+ self.assertRaises(threaded_futures.CancelledException, f4.result)
+ self.assertRaises(threaded_futures.CancelledException, f4.exception)
+
+ self.assertEqual(call2.called(), False)
+ self.assertEqual(call4.called(), False)
+ executor.shutdown()
+
+ def test_wait_for_individual_cancel(self):
+ def end_call():
+ print ('Here1')
+ time.sleep(1)
+ print ('Here2')
+ f2.cancel()
+ print ('Here3')
+ call1.set_can()
+ print ('Here4')
+
+ executor = threaded_futures.ThreadPoolExecutor(max_threads=1)
+
+ call1 = Call(manual_finish=True)
+ call2 = Call()
+
+ fs = executor.run([call1, call2], run_until=threaded_futures.RETURN_IMMEDIATELY)
+ f1, f2 = fs
+
+ call1.wait_on_called()
+ t = threading.Thread(target=end_call)
+ t.start()
+ self.assertRaises(threaded_futures.CancelledException, f2.result)
+ self.assertRaises(threaded_futures.CancelledException, f2.exception)
+ t.join()
+ executor.shutdown()
+
+ def test_cancel_all(self):
+ executor = threaded_futures.ThreadPoolExecutor(max_threads=1)
+
+ call1 = Call(manual_finish=True)
+ call2 = Call()
+ call3 = Call()
+ call4 = Call()
+
+ fs = executor.run([call1, call2, call3, call4],
+ run_until=threaded_futures.RETURN_IMMEDIATELY)
+ f1, f2, f3, f4 = fs
+
+ call1.wait_on_called()
+ print('HERE!!!')
+ self.assertRaises(threaded_futures.TimeoutException, fs.cancel, timeout=0)
+ print('HERE 2!!!')
+ call1.set_can()
+ fs.cancel()
+
+ self.assertFalse(f1.cancelled())
+ self.assertTrue(f2.cancelled())
+ self.assertTrue(f3.cancelled())
+ self.assertTrue(f4.cancelled())
+ executor.shutdown()
+
+ def test_cancel_repr(self):
+ executor = threaded_futures.ThreadPoolExecutor(max_threads=1)
+
+ call1 = Call(manual_finish=True)
+ call2 = Call()
+
+ fs = executor.run([call1, call2], run_until=threaded_futures.RETURN_IMMEDIATELY)
+ f1, f2 = fs
+
+ call1.wait_on_called()
+ call1.set_can()
+ f2.cancel()
+ self.assertEqual(repr(f2), '<Future state=cancelled>')
+ executor.shutdown()
+
+class FutureListTests(unittest.TestCase):
+ def test_cancel_states(self):
+ f1 = FutureStub(cancelled=False, done=False)
+ f2 = FutureStub(cancelled=False, done=True)
+ f3 = FutureStub(cancelled=False, done=True, exception=IOError())
+ f4 = FutureStub(cancelled=True, done=True)
+
+ fs = [f1, f2, f3, f4]
+ f = threaded_futures.FutureList(fs, None)
+
+ self.assertEqual(f.running_futures(), [f1])
+ self.assertEqual(f.cancelled_futures(), [f4])
+ self.assertEqual(f.done_futures(), [f2, f3, f4])
+ self.assertEqual(f.successful_futures(), [f2])
+ self.assertEqual(f.exception_futures(), [f3])
+
+ def test_has_running_futures(self):
+ f1 = FutureStub(cancelled=False, done=False)
+ f2 = FutureStub(cancelled=False, done=True)
+
+ self.assertTrue(
+ threaded_futures.FutureList([f1], None).has_running_futures())
+ self.assertFalse(
+ threaded_futures.FutureList([f2], None).has_running_futures())
+
+ def test_has_cancelled_futures(self):
+ f1 = FutureStub(cancelled=True, done=True)
+ f2 = FutureStub(cancelled=False, done=True)
+
+ self.assertTrue(
+ threaded_futures.FutureList([f1], None).has_cancelled_futures())
+ self.assertFalse(
+ threaded_futures.FutureList([f2], None).has_cancelled_futures())
+
+ def test_has_done_futures(self):
+ f1 = FutureStub(cancelled=True, done=True)
+ f2 = FutureStub(cancelled=False, done=True)
+ f3 = FutureStub(cancelled=False, done=False)
+
+ self.assertTrue(
+ threaded_futures.FutureList([f1], None).has_done_futures())
+ self.assertTrue(
+ threaded_futures.FutureList([f2], None).has_done_futures())
+ self.assertFalse(
+ threaded_futures.FutureList([f3], None).has_done_futures())
+
+ def test_has_successful_futures(self):
+ f1 = FutureStub(cancelled=False, done=True)
+ f2 = FutureStub(cancelled=False, done=True, exception=IOError())
+ f3 = FutureStub(cancelled=False, done=False)
+ f4 = FutureStub(cancelled=True, done=True)
+
+ self.assertTrue(
+ threaded_futures.FutureList([f1], None).has_successful_futures())
+ self.assertFalse(
+ threaded_futures.FutureList([f2], None).has_successful_futures())
+ self.assertFalse(
+ threaded_futures.FutureList([f3], None).has_successful_futures())
+ self.assertFalse(
+ threaded_futures.FutureList([f4], None).has_successful_futures())
+
+ def test_has_exception_futures(self):
+ f1 = FutureStub(cancelled=False, done=True)
+ f2 = FutureStub(cancelled=False, done=True, exception=IOError())
+ f3 = FutureStub(cancelled=False, done=False)
+ f4 = FutureStub(cancelled=True, done=True)
+
+ self.assertFalse(
+ threaded_futures.FutureList([f1], None).has_exception_futures())
+ self.assertTrue(
+ threaded_futures.FutureList([f2], None).has_exception_futures())
+ self.assertFalse(
+ threaded_futures.FutureList([f3], None).has_exception_futures())
+ self.assertFalse(
+ threaded_futures.FutureList([f4], None).has_exception_futures())
+
+ def test_get_item(self):
+ f1 = FutureStub(cancelled=False, done=False)
+ f2 = FutureStub(cancelled=False, done=True)
+ f3 = FutureStub(cancelled=False, done=True)
+
+ fs = [f1, f2, f3]
+ f = threaded_futures.FutureList(fs, None)
+ self.assertEqual(f[0], f1)
+ self.assertEqual(f[1], f2)
+ self.assertEqual(f[2], f3)
+ self.assertRaises(IndexError, f.__getitem__, 3)
+
+ def test_len(self):
+ f1 = FutureStub(cancelled=False, done=False)
+ f2 = FutureStub(cancelled=False, done=True)
+ f3 = FutureStub(cancelled=False, done=True)
+
+ f = threaded_futures.FutureList([f1, f2, f3], None)
+ self.assertEqual(len(f), 3)
+
+ def test_iter(self):
+ f1 = FutureStub(cancelled=False, done=False)
+ f2 = FutureStub(cancelled=False, done=True)
+ f3 = FutureStub(cancelled=False, done=True)
+
+ fs = [f1, f2, f3]
+ f = threaded_futures.FutureList(fs, None)
+ self.assertEqual(list(iter(f)), fs)
+
+ def test_contains(self):
+ f1 = FutureStub(cancelled=False, done=False)
+ f2 = FutureStub(cancelled=False, done=True)
+ f3 = FutureStub(cancelled=False, done=True)
+
+ f = threaded_futures.FutureList([f1, f2], None)
+ self.assertTrue(f1 in f)
+ self.assertTrue(f2 in f)
+ self.assertFalse(f3 in f)
+
+ def test_repr(self):
+ running = FutureStub(cancelled=False, done=False)
+ result = FutureStub(cancelled=False, done=True)
+ exception = FutureStub(cancelled=False, done=True, exception=IOError())
+ cancelled = FutureStub(cancelled=True, done=True)
+
+ f = threaded_futures.FutureList(
+ [running] * 4 + [result] * 3 + [exception] * 2 + [cancelled],
+ None)
+
+ self.assertEqual(repr(f),
+ '<FutureList #futures=10 '
+ '[#success=3 #exception=2 #cancelled=1]>')
+def test_main():
+ test.support.run_unittest(CancelTests,
+ ConcurrentWaitsTest,
+ FutureListTests,
+ ShutdownTest)
+
+if __name__ == "__main__":
+ test_main() \ No newline at end of file