summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorbrian.quinlan <devnull@localhost>2009-05-10 11:37:27 +0000
committerbrian.quinlan <devnull@localhost>2009-05-10 11:37:27 +0000
commitb32b0bfba4f6c8cd4f0b71cf7cfbad979c6c2695 (patch)
tree65c2c23588edbb0fd4e0fede7eaaba578600cbe1
parent0a24721b25c89aed65030b940b19ac5fc2c27aaa (diff)
downloadfutures-b32b0bfba4f6c8cd4f0b71cf7cfbad979c6c2695.tar.gz
Same names change and doc additions.
-rw-r--r--crawl.py4
-rw-r--r--futures/_base.py156
-rw-r--r--futures/process.py27
-rw-r--r--futures/thread.py15
-rw-r--r--primes.py19
-rw-r--r--test_futures.py159
6 files changed, 269 insertions, 111 deletions
diff --git a/crawl.py b/crawl.py
index 80d56e0..bc01946 100644
--- a/crawl.py
+++ b/crawl.py
@@ -28,7 +28,7 @@ def download_urls_sequential(urls, timeout=60):
def download_urls_with_executor(urls, executor, timeout=60):
try:
url_to_content = {}
- fs = executor.run(
+ fs = executor.run_to_futures(
(functools.partial(load_url, url, timeout) for url in urls),
timeout=timeout)
for url, future in zip(urls, fs.successful_futures()):
@@ -37,8 +37,6 @@ def download_urls_with_executor(urls, executor, timeout=60):
finally:
executor.shutdown()
-import functools
-import time
def main():
for name, fn in [('sequential',
functools.partial(download_urls_sequential, URLS)),
diff --git a/futures/_base.py b/futures/_base.py
index 1debfbb..01dafd3 100644
--- a/futures/_base.py
+++ b/futures/_base.py
@@ -52,14 +52,20 @@ def set_future_result(future, event_sink, result):
event_sink.add_result()
future._condition.notify_all()
-class CancelledError(Exception):
+class Error(Exception):
pass
-class TimeoutError(Exception):
+class CancelledError(Error):
+ pass
+
+class TimeoutError(Error):
pass
class Future(object):
+ """Represents the result of an asynchronous computation."""
+
def __init__(self):
+ """Initializes the Future. Should not be called by clients."""
self._condition = threading.Condition()
self._state = PENDING
self._result = None
@@ -79,6 +85,11 @@ class Future(object):
return '<Future state=%s>' % _STATE_TO_DESCRIPTION_MAP[self._state]
def cancel(self):
+ """Cancel the future if possible.
+
+ Returns True if the future was cancelled, False otherwise. A future
+ cannot be cancelled if it is running or has already completed.
+ """
with self._condition:
if self._state in [RUNNING, FINISHED]:
return False
@@ -89,10 +100,12 @@ class Future(object):
return True
def cancelled(self):
+ """Return True if the future has cancelled."""
with self._condition:
return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]
def done(self):
+ """Return True of the future was cancelled or finished executing."""
with self._condition:
return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]
@@ -103,6 +116,21 @@ class Future(object):
return self._result
def result(self, timeout=None):
+ """Return the result of the call that the future represents.
+
+ Args:
+ timeout: The number of seconds to wait for the result if the future
+ isn't done. If None, then there is no limit on the wait time.
+
+ Returns:
+ The result of the call that the future represents.
+
+ Raises:
+ CancelledError: If the future was cancelled.
+ TimeoutError: If the future didn't finish executing before the given
+ timeout.
+ Exception: If the call raised then that exception will be raised.
+ """
with self._condition:
if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
raise CancelledError()
@@ -119,6 +147,23 @@ class Future(object):
raise TimeoutError()
def exception(self, timeout=None):
+ """Return the exception raised by the call that the future represents.
+
+ Args:
+ timeout: The number of seconds to wait for the exception if the
+ future isn't done. If None, then there is no limit on the wait
+ time.
+
+ Returns:
+ The exception raised by the call that the future represents or None
+ if the call completed without raising.
+
+ Raises:
+ CancelledError: If the future was cancelled.
+ TimeoutError: If the future didn't finish executing before the given
+ timeout.
+ """
+
with self._condition:
if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
raise CancelledError()
@@ -192,10 +237,34 @@ class ThreadEventSink(object):
class FutureList(object):
def __init__(self, futures, event_sink):
+ """Initializes the FutureList. Should not be called by clients."""
self._futures = futures
self._event_sink = event_sink
def wait(self, timeout=None, return_when=ALL_COMPLETED):
+ """Wait for the futures in the list to complete.
+
+ Args:
+ timeout: The maximum number of seconds to wait. If None, then there
+ is no limit on the wait time.
+ return_when: Indicates when the method should return. The options
+ are:
+
+ FIRST_COMPLETED - Return when any future finishes or is
+ cancelled.
+ FIRST_EXCEPTION - Return when any future finishes by raising an
+ exception. If no future raises and exception
+ then it is equivalent to ALL_COMPLETED.
+ ALL_COMPLETED - Return when all futures finish or are cancelled.
+ RETURN_IMMEDIATELY - Return without waiting (this is not likely
+ to be a useful option but it is there to
+ be symmetrical with the
+ executor.run_to_futures() method.
+
+ Raises:
+ TimeoutError: If the wait condition wasn't satisfied before the
+ given timeout.
+ """
if return_when == RETURN_IMMEDIATELY:
return
@@ -234,6 +303,16 @@ class FutureList(object):
self._event_sink.remove(completed_tracker)
def cancel(self, timeout=None):
+ """Cancel the futures in the list.
+
+ Args:
+ timeout: The maximum number of seconds to wait. If None, then there
+ is no limit on the wait time.
+
+ Raises:
+ TimeoutError: If all the futures were not finished before the
+ given timeout.
+ """
for f in self:
f.cancel()
self.wait(timeout=timeout, return_when=ALL_COMPLETED)
@@ -303,28 +382,56 @@ class FutureList(object):
states[FINISHED]))
class Executor(object):
- def run(self, calls, timeout=None, return_when=ALL_COMPLETED):
+ def run_to_futures(self, calls, timeout=None, return_when=ALL_COMPLETED):
+ """Return a list of futures representing the given calls.
+
+ Args:
+ calls: A sequence of callables that take no arguments. These will
+ be bound to Futures and returned.
+ timeout: The maximum number of seconds to wait. If None, then there
+ is no limit on the wait time.
+ return_when: Indicates when the method should return. The options
+ are:
+
+ FIRST_COMPLETED - Return when any future finishes or is
+ cancelled.
+ FIRST_EXCEPTION - Return when any future finishes by raising an
+ exception. If no future raises and exception
+ then it is equivalent to ALL_COMPLETED.
+ ALL_COMPLETED - Return when all futures finish or are cancelled.
+ RETURN_IMMEDIATELY - Return without waiting (this is not likely
+ to be a useful option but it is there to
+ be symmetrical with the
+ executor.run_to_futures() method.
+
+ Returns:
+ A FuturesList containing futures for the given calls.
+ """
raise NotImplementedError()
- def runXXX(self, calls, timeout=None):
- """Execute the given calls and
+ def run_to_results(self, calls, timeout=None):
+ """Returns a iterator of the results of the given calls.
- Arguments:
- calls: A sequence of functions that will be called without arguments
- and whose results with be returned.
- timeout: The maximum number of seconds to wait for the complete results.
- None indicates that there is no timeout.
+ Args:
+ calls: A sequence of callables that take no arguments. These will
+ be called and their results returned.
+ timeout: The maximum number of seconds to wait. If None, then there
+ is no limit on the wait time.
- Yields:
- The results of the given calls in the order that they are given.
+ Returns:
+ An iterator over the results of the given calls. Equivalent to:
+ (call() for call in calls) but the calls may be evaluated
+ out-of-order.
- Exceptions:
- TimeoutError: if it takes more than timeout
+ Raises:
+ TimeoutError: If all the given calls were not completed before the
+ given timeout.
+ Exception: If any call() raises.
"""
if timeout is not None:
end_time = timeout + time.time()
- fs = self.run(calls, return_when=RETURN_IMMEDIATELY)
+ fs = self.run_to_futures(calls, return_when=RETURN_IMMEDIATELY)
try:
for future in fs:
@@ -339,10 +446,27 @@ class Executor(object):
pass
def map(self, fn, iter, timeout=None):
+ """Returns a iterator equivalent to map(fn, iter).
+
+ Args:
+ fn: A callable taking a single argument.
+ timeout: The maximum number of seconds to wait. If None, then there
+ is no limit on the wait time.
+
+ Returns:
+ An iterator equivalent to: map(fn, iter) but the calls may be
+ evaluated out-of-order.
+
+ Raises:
+ TimeoutError: If the entire result iterator could not be generated
+ before the given timeout.
+ Exception: If fn(x) raises for any x in iter.
+ """
calls = [functools.partial(fn, a) for a in iter]
- return self.runXXX(calls, timeout)
+ return self.run_to_results(calls, timeout)
def shutdown(self):
+ """Clean-up. No other methods can be called afterwards."""
raise NotImplementedError()
def __enter__(self):
diff --git a/futures/process.py b/futures/process.py
index 06a8c25..c4dad74 100644
--- a/futures/process.py
+++ b/futures/process.py
@@ -3,9 +3,8 @@
from futures._base import (PENDING, RUNNING, CANCELLED,
CANCELLED_AND_NOTIFIED, FINISHED,
ALL_COMPLETED,
- LOGGER,
set_future_exception, set_future_result,
- Executor, Future, FutureList,ThreadEventSink)
+ Executor, Future, FutureList, ThreadEventSink)
import queue
import multiprocessing
@@ -54,18 +53,22 @@ class ProcessPoolExecutor(Executor):
max_processes = 16
self._max_processes = max_processes
+ # Make the call queue slightly larger than the number of processes to
+ # prevent the worker processes from starving but to make future.cancel()
+ # responsive.
self._call_queue = multiprocessing.Queue(self._max_processes + 1)
self._result_queue = multiprocessing.Queue()
self._work_ids = queue.Queue()
self._queue_management_thread = None
self._processes = set()
- self._shutdown = False
+
+ # Shutdown is a two-step process.
+ self._shutdown_thread = False
self._shutdown_process_event = multiprocessing.Event()
- self._lock = threading.Lock()
+ self._shutdown_lock = threading.Lock()
self._queue_count = 0
self._pending_work_items = {}
-
def _add_call_item_to_queue(self):
while True:
try:
@@ -96,7 +99,7 @@ class ProcessPoolExecutor(Executor):
result_item = self._result_queue.get(block=True,
timeout=0.1)
except queue.Empty:
- if self._shutdown and not self._pending_work_items:
+ if self._shutdown_thread and not self._pending_work_items:
self._shutdown_process_event.set()
return
else:
@@ -129,10 +132,10 @@ class ProcessPoolExecutor(Executor):
p.start()
self._processes.add(p)
- def run(self, calls, timeout=None, return_when=ALL_COMPLETED):
- with self._lock:
- if self._shutdown:
- raise RuntimeError()
+ def run_to_futures(self, calls, timeout=None, return_when=ALL_COMPLETED):
+ with self._shutdown_lock:
+ if self._shutdown_thread:
+ raise RuntimeError('cannot run new futures after shutdown')
futures = []
event_sink = ThreadEventSink()
@@ -151,5 +154,5 @@ class ProcessPoolExecutor(Executor):
return fl
def shutdown(self):
- with self._lock:
- self._shutdown = True
+ with self._shutdown_lock:
+ self._shutdown_thread = True
diff --git a/futures/thread.py b/futures/thread.py
index d1c1409..c1f15ae 100644
--- a/futures/thread.py
+++ b/futures/thread.py
@@ -5,7 +5,7 @@ from futures._base import (PENDING, RUNNING, CANCELLED,
ALL_COMPLETED,
LOGGER,
set_future_exception, set_future_result,
- Executor, Future, FutureList,ThreadEventSink)
+ Executor, Future, FutureList, ThreadEventSink)
import queue
import threading
@@ -43,14 +43,13 @@ class ThreadPoolExecutor(Executor):
self._work_queue = queue.Queue()
self._threads = set()
self._shutdown = False
- self._lock = threading.Lock()
+ self._shutdown_lock = threading.Lock()
def _worker(self):
try:
while True:
try:
- work_item = self._work_queue.get(block=True,
- timeout=0.1)
+ work_item = self._work_queue.get(block=True, timeout=0.1)
except queue.Empty:
if self._shutdown:
return
@@ -67,10 +66,10 @@ class ThreadPoolExecutor(Executor):
t.start()
self._threads.add(t)
- def run(self, calls, timeout=None, return_when=ALL_COMPLETED):
- with self._lock:
+ def run_to_futures(self, calls, timeout=None, return_when=ALL_COMPLETED):
+ with self._shutdown_lock:
if self._shutdown:
- raise RuntimeError()
+ raise RuntimeError('cannot run new futures after shutdown')
futures = []
event_sink = ThreadEventSink()
@@ -86,5 +85,5 @@ class ThreadPoolExecutor(Executor):
return fl
def shutdown(self):
- with self._lock:
+ with self._shutdown_lock:
self._shutdown = True
diff --git a/primes.py b/primes.py
index aff6697..7e83ea0 100644
--- a/primes.py
+++ b/primes.py
@@ -10,30 +10,25 @@ PRIMES = [
115797848077099]
def is_prime(n):
- n = abs(n)
- i = 2
- while i <= math.sqrt(n):
+ if n % 2 == 0:
+ return False
+
+ sqrt_n = int(math.floor(math.sqrt(n)))
+ for i in range(3, sqrt_n + 1, 2):
if n % i == 0:
return False
- i += 1
return True
def sequential():
return list(map(is_prime, PRIMES))
def with_process_pool_executor():
- executor = futures.ProcessPoolExecutor(10)
- try:
+ with futures.ProcessPoolExecutor(10) as executor:
return list(executor.map(is_prime, PRIMES))
- finally:
- executor.shutdown()
def with_thread_pool_executor():
- executor = futures.ThreadPoolExecutor(10)
- try:
+ with futures.ThreadPoolExecutor(10) as executor:
return list(executor.map(is_prime, PRIMES))
- finally:
- executor.shutdown()
def main():
for name, fn in [('sequential', sequential),
diff --git a/test_futures.py b/test_futures.py
index 0093b0e..64e541b 100644
--- a/test_futures.py
+++ b/test_futures.py
@@ -25,20 +25,34 @@ CANCELLED_AND_NOTIFIED_FUTURE = create_future(state=CANCELLED_AND_NOTIFIED)
EXCEPTION_FUTURE = create_future(state=FINISHED, exception=IOError())
SUCCESSFUL_FUTURE = create_future(state=FINISHED, result=42)
-
class Call(object):
+ CALL_LOCKS = {}
def __init__(self, manual_finish=False):
- self._called_event = threading.Event()
+ called_event = multiprocessing.Event()
+ can_finish = multiprocessing.Event()
+
+ self._called_event_id = id(called_event)
+ self._can_finish_event_id = id(can_finish)
+
+ self.CALL_LOCKS[self._called_event_id] = called_event
+ self.CALL_LOCKS[self._can_finish_event_id] = can_finish
- self._can_finished = threading.Event()
if not manual_finish:
- self._can_finished.set()
+ self._can_finish.set()
+
+ @property
+ def _can_finish(self):
+ return self.CALL_LOCKS[self._can_finish_event_id]
+
+ @property
+ def _called_event(self):
+ return self.CALL_LOCKS[self._called_event_id]
def wait_on_called(self):
self._called_event.wait()
def set_can(self):
- self._can_finished.set()
+ self._can_finish.set()
def called(self):
return self._called_event.is_set()
@@ -47,36 +61,36 @@ class Call(object):
if self._called_event.is_set(): print('called twice')
self._called_event.set()
- self._can_finished.wait()
+ self._can_finish.wait()
return 42
+ def __del__(self):
+ del self.CALL_LOCKS[self._called_event_id]
+ del self.CALL_LOCKS[self._can_finish_event_id]
+
class ExceptionCall(Call):
def __call__(self):
assert not self._called_event.is_set(), 'already called'
self._called_event.set()
- self._can_finished.wait()
+ self._can_finish.wait()
raise ZeroDivisionError()
class ShutdownTest(unittest.TestCase):
def test_run_after_shutdown(self):
- self.executor = futures.ThreadPoolExecutor(max_threads=1)
-
call1 = Call()
self.executor.shutdown()
self.assertRaises(RuntimeError,
- self.executor.run,
+ self.executor.run_to_futures,
[call1])
- def test_threads_terminate(self):
- self.executor = futures.ThreadPoolExecutor(max_threads=5)
-
+ def _start_some_futures(self):
call1 = Call(manual_finish=True)
call2 = Call(manual_finish=True)
call3 = Call(manual_finish=True)
- self.executor.run([call1, call2, call3],
- return_when=futures.RETURN_IMMEDIATELY)
+ self.executor.run_to_futures([call1, call2, call3],
+ return_when=futures.RETURN_IMMEDIATELY)
call1.wait_on_called()
call2.wait_on_called()
@@ -86,11 +100,33 @@ class ShutdownTest(unittest.TestCase):
call2.set_can()
call3.set_can()
+class ThreadPoolShutdownTest(ShutdownTest):
+ def setUp(self):
+ self.executor = futures.ThreadPoolExecutor(max_threads=5)
+
+ def tearDown(self):
+ self.executor.shutdown()
+
+ def test_threads_terminate(self):
+ self._start_some_futures()
self.assertEqual(len(self.executor._threads), 3)
self.executor.shutdown()
for t in self.executor._threads:
t.join()
-
+
+class ProcessPoolShutdownTest(ShutdownTest):
+ def setUp(self):
+ self.executor = futures.ProcessPoolExecutor(max_processes=5)
+
+ def tearDown(self):
+ self.executor.shutdown()
+
+ def test_processes_terminate(self):
+ self._start_some_futures()
+ self.assertEqual(len(self.executor._processes), 5)
+ self.executor.shutdown()
+ for p in self.executor._processes:
+ p.join()
class WaitsTest(unittest.TestCase):
def test(self):
@@ -105,7 +141,7 @@ class WaitsTest(unittest.TestCase):
def wait_for_FIRST_COMPLETED():
fs.wait(return_when=futures.FIRST_COMPLETED)
self.assertTrue(f1.done())
- self.assertFalse(f2.done())
+ self.assertFalse(f2.done()) # XXX
self.assertFalse(f3.done())
self.assertFalse(f4.done())
first_completed.release()
@@ -114,7 +150,7 @@ class WaitsTest(unittest.TestCase):
fs.wait(return_when=futures.FIRST_EXCEPTION)
self.assertTrue(f1.done())
self.assertTrue(f2.done())
- self.assertFalse(f3.done())
+ self.assertFalse(f3.done()) # XXX
self.assertFalse(f4.done())
first_exception.release()
@@ -127,46 +163,54 @@ class WaitsTest(unittest.TestCase):
call3 = Call(manual_finish=True)
call4 = Call()
- fs = self.executor.run([call1, call2, call3, call4],
- return_when=futures.RETURN_IMMEDIATELY)
+ fs = self.executor.run_to_futures(
+ [call1, call2, call3, call4],
+ return_when=futures.RETURN_IMMEDIATELY)
f1, f2, f3, f4 = fs
threads = []
- for call in [wait_for_ALL_COMPLETED,
- wait_for_FIRST_COMPLETED,
- wait_for_FIRST_EXCEPTION]:
- t = threading.Thread(target=call)
+ for wait_test in [wait_for_ALL_COMPLETED,
+ wait_for_FIRST_COMPLETED,
+ wait_for_FIRST_EXCEPTION]:
+ t = threading.Thread(target=wait_test)
t.start()
threads.append(t)
time.sleep(1) # give threads enough time to execute wait
+
call1.set_can()
first_completed.acquire()
call2.set_can()
first_exception.acquire()
call3.set_can()
- call4.set_can()
all_completed.acquire()
self.executor.shutdown()
class ThreadPoolWaitTests(WaitsTest):
- executor = futures.ThreadPoolExecutor(max_threads=1)
+ def setUp(self):
+ self.executor = futures.ThreadPoolExecutor(max_threads=1)
+
+ def tearDown(self):
+ self.executor.shutdown()
class ProcessPoolWaitTests(WaitsTest):
- executor = futures.ProcessPoolExecutor(max_processes=1)
+ def setUp(self):
+ self.executor = futures.ProcessPoolExecutor(max_processes=1)
+
+ def tearDown(self):
+ self.executor.shutdown()
class CancelTests(unittest.TestCase):
def test_cancel_states(self):
- executor = futures.ThreadPoolExecutor(max_threads=1)
-
call1 = Call(manual_finish=True)
call2 = Call()
call3 = Call()
call4 = Call()
- fs = executor.run([call1, call2, call3, call4],
- return_when=futures.RETURN_IMMEDIATELY)
+ fs = self.executor.run_to_futures(
+ [call1, call2, call3, call4],
+ return_when=futures.RETURN_IMMEDIATELY)
f1, f2, f3, f4 = fs
call1.wait_on_called()
@@ -193,7 +237,6 @@ class CancelTests(unittest.TestCase):
self.assertEqual(call2.called(), False)
self.assertEqual(call4.called(), False)
- executor.shutdown()
def test_wait_for_individual_cancel(self):
def end_call():
@@ -201,12 +244,12 @@ class CancelTests(unittest.TestCase):
f2.cancel()
call1.set_can()
- executor = futures.ThreadPoolExecutor(max_threads=1)
-
call1 = Call(manual_finish=True)
call2 = Call()
- fs = executor.run([call1, call2], return_when=futures.RETURN_IMMEDIATELY)
+ fs = self.executor.run_to_futures(
+ [call1, call2],
+ return_when=futures.RETURN_IMMEDIATELY)
f1, f2 = fs
call1.wait_on_called()
@@ -215,18 +258,16 @@ class CancelTests(unittest.TestCase):
self.assertRaises(futures.CancelledError, f2.result)
self.assertRaises(futures.CancelledError, f2.exception)
t.join()
- executor.shutdown()
def test_wait_with_already_cancelled_futures(self):
- executor = futures.ThreadPoolExecutor(max_threads=1)
-
call1 = Call(manual_finish=True)
call2 = Call()
call3 = Call()
call4 = Call()
- fs = executor.run([call1, call2, call3, call4],
- return_when=futures.RETURN_IMMEDIATELY)
+ fs = self.executor.run_to_futures(
+ [call1, call2, call3, call4],
+ return_when=futures.RETURN_IMMEDIATELY)
f1, f2, f3, f4 = fs
call1.wait_on_called()
@@ -236,18 +277,16 @@ class CancelTests(unittest.TestCase):
time.sleep(0.1)
fs.wait(return_when=futures.ALL_COMPLETED)
- executor.shutdown()
def test_cancel_all(self):
- executor = futures.ThreadPoolExecutor(max_threads=1)
-
call1 = Call(manual_finish=True)
call2 = Call()
call3 = Call()
call4 = Call()
- fs = executor.run([call1, call2, call3, call4],
- return_when=futures.RETURN_IMMEDIATELY)
+ fs = self.executor.run_to_futures(
+ [call1, call2, call3, call4],
+ return_when=futures.RETURN_IMMEDIATELY)
f1, f2, f3, f4 = fs
call1.wait_on_called()
@@ -259,22 +298,20 @@ class CancelTests(unittest.TestCase):
self.assertTrue(f2.cancelled())
self.assertTrue(f3.cancelled())
self.assertTrue(f4.cancelled())
- executor.shutdown()
- def test_cancel_repr(self):
- executor = futures.ThreadPoolExecutor(max_threads=1)
+class ThreadPoolCancelTests(CancelTests):
+ def setUp(self):
+ self.executor = futures.ThreadPoolExecutor(max_threads=1)
- call1 = Call(manual_finish=True)
- call2 = Call()
+ def tearDown(self):
+ self.executor.shutdown()
- fs = executor.run([call1, call2], return_when=futures.RETURN_IMMEDIATELY)
- f1, f2 = fs
+class ProcessPoolCancelTests(WaitsTest):
+ def setUp(self):
+ self.executor = futures.ProcessPoolExecutor(max_processes=1)
- call1.wait_on_called()
- call1.set_can()
- f2.cancel()
- self.assertEqual(repr(f2), '<Future state=cancelled>')
- executor.shutdown()
+ def tearDown(self):
+ self.executor.shutdown()
class FutureTests(unittest.TestCase):
def test_repr(self):
@@ -552,12 +589,14 @@ class FutureListTests(unittest.TestCase):
'[#pending=4 #cancelled=3 #running=2 #finished=6]>')
def test_main():
- test.support.run_unittest(CancelTests,
-# ProcessPoolWaitTests,
+ test.support.run_unittest(ProcessPoolCancelTests,
+ ThreadPoolCancelTests,
+ ProcessPoolWaitTests,
ThreadPoolWaitTests,
FutureTests,
FutureListTests,
- ShutdownTest)
+ ProcessPoolShutdownTest,
+ ThreadPoolShutdownTest)
if __name__ == "__main__":
test_main() \ No newline at end of file