diff options
author | brian.quinlan <devnull@localhost> | 2009-10-23 06:36:16 +0000 |
---|---|---|
committer | brian.quinlan <devnull@localhost> | 2009-10-23 06:36:16 +0000 |
commit | 5aab5b5c6cfdc3c520fced44218e55b795451712 (patch) | |
tree | 635eebe25e3e59fb61746ed09d1f1cfbb801cb3d /python3 | |
parent | ea6b714ad2398c052b69ead66e3e45c84d6df3c5 (diff) | |
download | futures-5aab5b5c6cfdc3c520fced44218e55b795451712.tar.gz |
Fixes two bugs: the _result function could exit if there is work still in progress and a deadlock can occur on Mac OS X.
Diffstat (limited to 'python3')
-rw-r--r-- | python3/crawl.py | 4 | ||||
-rw-r--r-- | python3/futures/process.py | 25 | ||||
-rw-r--r-- | python3/futures/thread.py | 4 | ||||
-rw-r--r-- | python3/moprocessmoproblems.py | 59 | ||||
-rw-r--r-- | python3/primes.py | 4 | ||||
-rw-r--r-- | python3/test_futures.py | 8 |
6 files changed, 92 insertions, 12 deletions
diff --git a/python3/crawl.py b/python3/crawl.py index 46b8f7f..8d49e18 100644 --- a/python3/crawl.py +++ b/python3/crawl.py @@ -47,6 +47,10 @@ def download_urls_with_executor(urls, executor, timeout=60): def main(): for name, fn in [('sequential', functools.partial(download_urls_sequential, URLS)), + ('processes', + functools.partial(download_urls_with_executor, + URLS, + futures.ProcessPoolExecutor(10))), ('threads', functools.partial(download_urls_with_executor, URLS, diff --git a/python3/futures/process.py b/python3/futures/process.py index 71dd602..75ee83c 100644 --- a/python3/futures/process.py +++ b/python3/futures/process.py @@ -65,6 +65,8 @@ def _add_call_item_to_queue(pending_work_items, work_ids, call_queue): while True: + if call_queue.full(): + return try: work_id = work_ids.get(block=False) except queue.Empty: @@ -76,17 +78,16 @@ def _add_call_item_to_queue(pending_work_items, with work_item.future._condition: work_item.future._condition.notify_all() work_item.completion_tracker.add_cancelled() + del pending_work_items[work_id] continue else: with work_item.future._condition: work_item.future._state = RUNNING - call_queue.put(_CallItem(work_id, work_item.call), block=True) - if call_queue.full(): - return def _result(executor_reference, + processes, pending_work_items, work_ids_queue, call_queue, @@ -101,9 +102,22 @@ def _result(executor_reference, result_item = result_queue.get(block=True, timeout=0.1) except queue.Empty: executor = executor_reference() + # No more work items can be added if: + # - The interpreter is shutting down OR + # - The executor that owns this worker has been collected OR + # - The executor that owns this worker has been shutdown. if _shutdown or executor is None or executor._shutdown_thread: - shutdown_process_event.set() - return + # Since no new work items can be added, it is safe to shutdown + # this thread if there are no pending work items. + if not pending_work_items: + shutdown_process_event.set() + + # If .join() is not called on the created processes then + # some multiprocessing.Queue methods may deadlock on Mac OS + # X. + for p in processes: + p.join() + return del executor else: work_item = pending_work_items[result_item.work_id] @@ -147,6 +161,7 @@ class ProcessPoolExecutor(Executor): self._queue_management_thread = threading.Thread( target=_result, args=(weakref.ref(self), + self._processes, self._pending_work_items, self._work_ids, self._call_queue, diff --git a/python3/futures/thread.py b/python3/futures/thread.py index 286d08b..851a548 100644 --- a/python3/futures/thread.py +++ b/python3/futures/thread.py @@ -90,8 +90,8 @@ def _worker(executor_reference, work_queue): except queue.Empty: executor = executor_reference() # Exit if: - # - The interpreter is shutting down. - # - The executor that owns the worker has been collected. + # - The interpreter is shutting down OR + # - The executor that owns the worker has been collected OR # - The executor that owns the worker has been shutdown. if _shutdown or executor is None or executor._shutdown: return diff --git a/python3/moprocessmoproblems.py b/python3/moprocessmoproblems.py new file mode 100644 index 0000000..18bfc39 --- /dev/null +++ b/python3/moprocessmoproblems.py @@ -0,0 +1,59 @@ +import multiprocessing +import queue + +def _process_worker(q): + while True: + try: + something = q.get(block=True, timeout=0.1) + except queue.Empty: + return + else: + print('Grabbed item from queue:', something) + + +def _make_some_processes(q): + processes = [] + for _ in range(10): + p = multiprocessing.Process(target=_process_worker, args=(q,)) + p.start() + processes.append(p) + return processes + +def _do(i): + print('Run:', i) + q = multiprocessing.Queue() + print('Created queue') + for j in range(30): + q.put(i*30+j) + processes = _make_some_processes(q) + print('Created processes') + + while not q.empty(): + pass + print('Q is empty') + + # Without the two following commented lines, the output on Mac OS 10.5 (the + # output is as expected on Linux) will be: + # Run: 0 + # Created queue + # Grabbed item from queue: 0 + # ... + # Grabbed item from queue: 29 + # Created processes + # Q is empty + # Run: 1 + # Created queue + # Grabbed item from queue: 30 + # ... + # Grabbed item from queue: 59 + # Created processes + # Q is empty + # Run: 2 + # Created queue + # Created processes + # <no further output> +# for p in processes: +# p.join() + +for i in range(100): + _do(i)
\ No newline at end of file diff --git a/python3/primes.py b/python3/primes.py index 7e83ea0..20b5202 100644 --- a/python3/primes.py +++ b/python3/primes.py @@ -7,7 +7,9 @@ PRIMES = [ 112582705942171, 112272535095293, 115280095190773, - 115797848077099] + 115797848077099, + 117450548693743, + 993960000099397] def is_prime(n): if n % 2 == 0: diff --git a/python3/test_futures.py b/python3/test_futures.py index 2711d7e..b7dee65 100644 --- a/python3/test_futures.py +++ b/python3/test_futures.py @@ -816,15 +816,15 @@ class FutureListTests(unittest.TestCase): '[#pending=4 #cancelled=3 #running=2 #finished=6]>') def test_main(): - test.support.run_unittest(#ProcessPoolCancelTests, + test.support.run_unittest(ProcessPoolCancelTests, ThreadPoolCancelTests, - #ProcessPoolExecutorTest, + ProcessPoolExecutorTest, ThreadPoolExecutorTest, - #ProcessPoolWaitTests, + ProcessPoolWaitTests, ThreadPoolWaitTests, FutureTests, FutureListTests, - #ProcessPoolShutdownTest, + ProcessPoolShutdownTest, ThreadPoolShutdownTest) if __name__ == "__main__": |