summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorbrian.quinlan <devnull@localhost>2009-10-23 06:36:16 +0000
committerbrian.quinlan <devnull@localhost>2009-10-23 06:36:16 +0000
commit5aab5b5c6cfdc3c520fced44218e55b795451712 (patch)
tree635eebe25e3e59fb61746ed09d1f1cfbb801cb3d
parentea6b714ad2398c052b69ead66e3e45c84d6df3c5 (diff)
downloadfutures-5aab5b5c6cfdc3c520fced44218e55b795451712.tar.gz
Fixes two bugs: the _result function could exit if there is work still in progress and a deadlock can occur on Mac OS X.
-rw-r--r--python2/crawl.py4
-rw-r--r--python2/futures/process.py24
-rw-r--r--python2/futures/thread.py4
-rw-r--r--python2/primes.py4
-rw-r--r--python2/test_futures.py8
-rw-r--r--python3/crawl.py4
-rw-r--r--python3/futures/process.py25
-rw-r--r--python3/futures/thread.py4
-rw-r--r--python3/moprocessmoproblems.py59
-rw-r--r--python3/primes.py4
-rw-r--r--python3/test_futures.py8
11 files changed, 125 insertions, 23 deletions
diff --git a/python2/crawl.py b/python2/crawl.py
index 88d7c9e..cf4a218 100644
--- a/python2/crawl.py
+++ b/python2/crawl.py
@@ -49,6 +49,10 @@ def main():
('threads',
functools.partial(download_urls_with_executor,
URLS,
+ futures.ThreadPoolExecutor(10))),
+ ('processes',
+ functools.partial(download_urls_with_executor,
+ URLS,
futures.ThreadPoolExecutor(10)))]:
print '%s: ' % name.ljust(12),
start = time.time()
diff --git a/python2/futures/process.py b/python2/futures/process.py
index b96203b..e4c3ad3 100644
--- a/python2/futures/process.py
+++ b/python2/futures/process.py
@@ -64,6 +64,8 @@ def _add_call_item_to_queue(pending_work_items,
work_ids,
call_queue):
while True:
+ if call_queue.full():
+ return
try:
work_id = work_ids.get(block=False)
except Queue.Empty:
@@ -77,16 +79,16 @@ def _add_call_item_to_queue(pending_work_items,
work_item.future._condition.release()
work_item.completion_tracker.add_cancelled()
+ del pending_work_items[work_id]
continue
else:
work_item.future._condition.acquire()
work_item.future._state = RUNNING
work_item.future._condition.release()
call_queue.put(_CallItem(work_id, work_item.call), block=True)
- if call_queue.full():
- return
def _result(executor_reference,
+ processes,
pending_work_items,
work_ids_queue,
call_queue,
@@ -100,9 +102,22 @@ def _result(executor_reference,
result_item = result_queue.get(block=True, timeout=0.1)
except Queue.Empty:
executor = executor_reference()
+ # No more work items can be added if:
+ # - The interpreter is shutting down OR
+ # - The executor that owns this worker has been collected OR
+ # - The executor that owns this worker has been shutdown.
if _shutdown or executor is None or executor._shutdown_thread:
- shutdown_process_event.set()
- return
+ # Since no new work items can be added, it is safe to shutdown
+ # this thread if there are no pending work items.
+ if not pending_work_items:
+ shutdown_process_event.set()
+
+ # If .join() is not called on the created processes then
+ # some multiprocessing.Queue methods may deadlock on Mac OS
+ # X.
+ for p in processes:
+ p.join()
+ return
del executor
else:
work_item = pending_work_items[result_item.work_id]
@@ -147,6 +162,7 @@ class ProcessPoolExecutor(Executor):
self._queue_management_thread = threading.Thread(
target=_result,
args=(weakref.ref(self),
+ self._processes,
self._pending_work_items,
self._work_ids,
self._call_queue,
diff --git a/python2/futures/thread.py b/python2/futures/thread.py
index 4928342..4071574 100644
--- a/python2/futures/thread.py
+++ b/python2/futures/thread.py
@@ -97,8 +97,8 @@ def _worker(executor_reference, work_queue):
except Queue.Empty:
executor = executor_reference()
# Exit if:
- # - The interpreter is shutting down.
- # - The executor that owns the worker has been collected.
+ # - The interpreter is shutting down OR
+ # - The executor that owns the worker has been collected OR
# - The executor that owns the worker has been shutdown.
if _shutdown or executor is None or executor._shutdown:
return
diff --git a/python2/primes.py b/python2/primes.py
index ac684d6..0b2bf81 100644
--- a/python2/primes.py
+++ b/python2/primes.py
@@ -7,7 +7,9 @@ PRIMES = [
112582705942171,
112272535095293,
115280095190773,
- 115797848077099]
+ 115797848077099,
+ 117450548693743,
+ 993960000099397]
def is_prime(n):
if n % 2 == 0:
diff --git a/python2/test_futures.py b/python2/test_futures.py
index d746ecb..e4bdf36 100644
--- a/python2/test_futures.py
+++ b/python2/test_futures.py
@@ -814,15 +814,15 @@ class FutureListTests(unittest.TestCase):
'[#pending=4 #cancelled=3 #running=2 #finished=6]>')
def test_main():
- test_support.run_unittest(#ProcessPoolCancelTests,
+ test_support.run_unittest(ProcessPoolCancelTests,
ThreadPoolCancelTests,
- #ProcessPoolExecutorTest,
+ ProcessPoolExecutorTest,
ThreadPoolExecutorTest,
- #ProcessPoolWaitTests,
+ ProcessPoolWaitTests,
ThreadPoolWaitTests,
FutureTests,
FutureListTests,
- #ProcessPoolShutdownTest,
+ ProcessPoolShutdownTest,
ThreadPoolShutdownTest)
if __name__ == "__main__":
diff --git a/python3/crawl.py b/python3/crawl.py
index 46b8f7f..8d49e18 100644
--- a/python3/crawl.py
+++ b/python3/crawl.py
@@ -47,6 +47,10 @@ def download_urls_with_executor(urls, executor, timeout=60):
def main():
for name, fn in [('sequential',
functools.partial(download_urls_sequential, URLS)),
+ ('processes',
+ functools.partial(download_urls_with_executor,
+ URLS,
+ futures.ProcessPoolExecutor(10))),
('threads',
functools.partial(download_urls_with_executor,
URLS,
diff --git a/python3/futures/process.py b/python3/futures/process.py
index 71dd602..75ee83c 100644
--- a/python3/futures/process.py
+++ b/python3/futures/process.py
@@ -65,6 +65,8 @@ def _add_call_item_to_queue(pending_work_items,
work_ids,
call_queue):
while True:
+ if call_queue.full():
+ return
try:
work_id = work_ids.get(block=False)
except queue.Empty:
@@ -76,17 +78,16 @@ def _add_call_item_to_queue(pending_work_items,
with work_item.future._condition:
work_item.future._condition.notify_all()
work_item.completion_tracker.add_cancelled()
+ del pending_work_items[work_id]
continue
else:
with work_item.future._condition:
work_item.future._state = RUNNING
-
call_queue.put(_CallItem(work_id, work_item.call),
block=True)
- if call_queue.full():
- return
def _result(executor_reference,
+ processes,
pending_work_items,
work_ids_queue,
call_queue,
@@ -101,9 +102,22 @@ def _result(executor_reference,
result_item = result_queue.get(block=True, timeout=0.1)
except queue.Empty:
executor = executor_reference()
+ # No more work items can be added if:
+ # - The interpreter is shutting down OR
+ # - The executor that owns this worker has been collected OR
+ # - The executor that owns this worker has been shutdown.
if _shutdown or executor is None or executor._shutdown_thread:
- shutdown_process_event.set()
- return
+ # Since no new work items can be added, it is safe to shutdown
+ # this thread if there are no pending work items.
+ if not pending_work_items:
+ shutdown_process_event.set()
+
+ # If .join() is not called on the created processes then
+ # some multiprocessing.Queue methods may deadlock on Mac OS
+ # X.
+ for p in processes:
+ p.join()
+ return
del executor
else:
work_item = pending_work_items[result_item.work_id]
@@ -147,6 +161,7 @@ class ProcessPoolExecutor(Executor):
self._queue_management_thread = threading.Thread(
target=_result,
args=(weakref.ref(self),
+ self._processes,
self._pending_work_items,
self._work_ids,
self._call_queue,
diff --git a/python3/futures/thread.py b/python3/futures/thread.py
index 286d08b..851a548 100644
--- a/python3/futures/thread.py
+++ b/python3/futures/thread.py
@@ -90,8 +90,8 @@ def _worker(executor_reference, work_queue):
except queue.Empty:
executor = executor_reference()
# Exit if:
- # - The interpreter is shutting down.
- # - The executor that owns the worker has been collected.
+ # - The interpreter is shutting down OR
+ # - The executor that owns the worker has been collected OR
# - The executor that owns the worker has been shutdown.
if _shutdown or executor is None or executor._shutdown:
return
diff --git a/python3/moprocessmoproblems.py b/python3/moprocessmoproblems.py
new file mode 100644
index 0000000..18bfc39
--- /dev/null
+++ b/python3/moprocessmoproblems.py
@@ -0,0 +1,59 @@
+import multiprocessing
+import queue
+
+def _process_worker(q):
+ while True:
+ try:
+ something = q.get(block=True, timeout=0.1)
+ except queue.Empty:
+ return
+ else:
+ print('Grabbed item from queue:', something)
+
+
+def _make_some_processes(q):
+ processes = []
+ for _ in range(10):
+ p = multiprocessing.Process(target=_process_worker, args=(q,))
+ p.start()
+ processes.append(p)
+ return processes
+
+def _do(i):
+ print('Run:', i)
+ q = multiprocessing.Queue()
+ print('Created queue')
+ for j in range(30):
+ q.put(i*30+j)
+ processes = _make_some_processes(q)
+ print('Created processes')
+
+ while not q.empty():
+ pass
+ print('Q is empty')
+
+ # Without the two following commented lines, the output on Mac OS 10.5 (the
+ # output is as expected on Linux) will be:
+ # Run: 0
+ # Created queue
+ # Grabbed item from queue: 0
+ # ...
+ # Grabbed item from queue: 29
+ # Created processes
+ # Q is empty
+ # Run: 1
+ # Created queue
+ # Grabbed item from queue: 30
+ # ...
+ # Grabbed item from queue: 59
+ # Created processes
+ # Q is empty
+ # Run: 2
+ # Created queue
+ # Created processes
+ # <no further output>
+# for p in processes:
+# p.join()
+
+for i in range(100):
+ _do(i) \ No newline at end of file
diff --git a/python3/primes.py b/python3/primes.py
index 7e83ea0..20b5202 100644
--- a/python3/primes.py
+++ b/python3/primes.py
@@ -7,7 +7,9 @@ PRIMES = [
112582705942171,
112272535095293,
115280095190773,
- 115797848077099]
+ 115797848077099,
+ 117450548693743,
+ 993960000099397]
def is_prime(n):
if n % 2 == 0:
diff --git a/python3/test_futures.py b/python3/test_futures.py
index 2711d7e..b7dee65 100644
--- a/python3/test_futures.py
+++ b/python3/test_futures.py
@@ -816,15 +816,15 @@ class FutureListTests(unittest.TestCase):
'[#pending=4 #cancelled=3 #running=2 #finished=6]>')
def test_main():
- test.support.run_unittest(#ProcessPoolCancelTests,
+ test.support.run_unittest(ProcessPoolCancelTests,
ThreadPoolCancelTests,
- #ProcessPoolExecutorTest,
+ ProcessPoolExecutorTest,
ThreadPoolExecutorTest,
- #ProcessPoolWaitTests,
+ ProcessPoolWaitTests,
ThreadPoolWaitTests,
FutureTests,
FutureListTests,
- #ProcessPoolShutdownTest,
+ ProcessPoolShutdownTest,
ThreadPoolShutdownTest)
if __name__ == "__main__":