summaryrefslogtreecommitdiff
path: root/concurrent/futures/process.py
diff options
context:
space:
mode:
Diffstat (limited to 'concurrent/futures/process.py')
-rw-r--r--concurrent/futures/process.py170
1 files changed, 94 insertions, 76 deletions
diff --git a/concurrent/futures/process.py b/concurrent/futures/process.py
index 87dc789..98684f8 100644
--- a/concurrent/futures/process.py
+++ b/concurrent/futures/process.py
@@ -73,28 +73,17 @@ __author__ = 'Brian Quinlan (brian@sweetapp.com)'
# workers to exit when their work queues are empty and then waits until the
# threads/processes finish.
-_thread_references = set()
+_threads_queues = weakref.WeakKeyDictionary()
_shutdown = False
def _python_exit():
global _shutdown
_shutdown = True
- for thread_reference in _thread_references:
- thread = thread_reference()
- if thread is not None:
- thread.join()
-
-def _remove_dead_thread_references():
- """Remove inactive threads from _thread_references.
-
- Should be called periodically to prevent memory leaks in scenarios such as:
- >>> while True:
- >>> ... t = ThreadPoolExecutor(max_workers=5)
- >>> ... t.map(int, ['1', '2', '3', '4', '5'])
- """
- for thread_reference in set(_thread_references):
- if thread_reference() is None:
- _thread_references.discard(thread_reference)
+ items = list(_threads_queues.items())
+ for t, q in items:
+ q.put(None)
+ for t, q in items:
+ t.join()
# Controls how many more calls than processes will be queued in the call queue.
# A smaller number will mean that processes spend more time idle waiting for
@@ -122,10 +111,10 @@ class _CallItem(object):
self.args = args
self.kwargs = kwargs
-def _process_worker(call_queue, result_queue, shutdown):
+def _process_worker(call_queue, result_queue):
"""Evaluates calls from call_queue and places the results in result_queue.
- This worker is run in a seperate process.
+ This worker is run in a separate process.
Args:
call_queue: A multiprocessing.Queue of _CallItems that will be read and
@@ -136,21 +125,20 @@ def _process_worker(call_queue, result_queue, shutdown):
worker that it should exit when call_queue is empty.
"""
while True:
+ call_item = call_queue.get(block=True)
+ if call_item is None:
+ # Wake up queue management thread
+ result_queue.put(None)
+ return
try:
- call_item = call_queue.get(block=True, timeout=0.1)
- except queue.Empty:
- if shutdown.is_set():
- return
+ r = call_item.fn(*call_item.args, **call_item.kwargs)
+ except BaseException:
+ e = sys.exc_info()[1]
+ result_queue.put(_ResultItem(call_item.work_id,
+ exception=e))
else:
- try:
- r = call_item.fn(*call_item.args, **call_item.kwargs)
- except BaseException:
- e = sys.exc_info()[1]
- result_queue.put(_ResultItem(call_item.work_id,
- exception=e))
- else:
- result_queue.put(_ResultItem(call_item.work_id,
- result=r))
+ result_queue.put(_ResultItem(call_item.work_id,
+ result=r))
def _add_call_item_to_queue(pending_work_items,
work_ids,
@@ -189,13 +177,12 @@ def _add_call_item_to_queue(pending_work_items,
del pending_work_items[work_id]
continue
-def _queue_manangement_worker(executor_reference,
- processes,
- pending_work_items,
- work_ids_queue,
- call_queue,
- result_queue,
- shutdown_process_event):
+def _queue_management_worker(executor_reference,
+ processes,
+ pending_work_items,
+ work_ids_queue,
+ call_queue,
+ result_queue):
"""Manages the communication between this process and the worker processes.
This function is run in a local thread.
@@ -213,37 +200,19 @@ def _queue_manangement_worker(executor_reference,
derived from _WorkItems for processing by the process workers.
result_queue: A multiprocessing.Queue of _ResultItems generated by the
process workers.
- shutdown_process_event: A multiprocessing.Event used to signal the
- process workers that they should exit when their work queue is
- empty.
"""
+ nb_shutdown_processes = [0]
+ def shutdown_one_process():
+ """Tell a worker to terminate, which will in turn wake us again"""
+ call_queue.put(None)
+ nb_shutdown_processes[0] += 1
while True:
_add_call_item_to_queue(pending_work_items,
work_ids_queue,
call_queue)
- try:
- result_item = result_queue.get(block=True, timeout=0.1)
- except queue.Empty:
- executor = executor_reference()
- # No more work items can be added if:
- # - The interpreter is shutting down OR
- # - The executor that owns this worker has been collected OR
- # - The executor that owns this worker has been shutdown.
- if _shutdown or executor is None or executor._shutdown_thread:
- # Since no new work items can be added, it is safe to shutdown
- # this thread if there are no pending work items.
- if not pending_work_items:
- shutdown_process_event.set()
-
- # If .join() is not called on the created processes then
- # some multiprocessing.Queue methods may deadlock on Mac OS
- # X.
- for p in processes:
- p.join()
- return
- del executor
- else:
+ result_item = result_queue.get(block=True)
+ if result_item is not None:
work_item = pending_work_items[result_item.work_id]
del pending_work_items[result_item.work_id]
@@ -251,6 +220,51 @@ def _queue_manangement_worker(executor_reference,
work_item.future.set_exception(result_item.exception)
else:
work_item.future.set_result(result_item.result)
+ # Check whether we should start shutting down.
+ executor = executor_reference()
+ # No more work items can be added if:
+ # - The interpreter is shutting down OR
+ # - The executor that owns this worker has been collected OR
+ # - The executor that owns this worker has been shutdown.
+ if _shutdown or executor is None or executor._shutdown_thread:
+ # Since no new work items can be added, it is safe to shutdown
+ # this thread if there are no pending work items.
+ if not pending_work_items:
+ while nb_shutdown_processes[0] < len(processes):
+ shutdown_one_process()
+ # If .join() is not called on the created processes then
+ # some multiprocessing.Queue methods may deadlock on Mac OS
+ # X.
+ for p in processes:
+ p.join()
+ call_queue.close()
+ return
+ del executor
+
+_system_limits_checked = False
+_system_limited = None
+def _check_system_limits():
+ global _system_limits_checked, _system_limited
+ if _system_limits_checked:
+ if _system_limited:
+ raise NotImplementedError(_system_limited)
+ _system_limits_checked = True
+ try:
+ import os
+ nsems_max = os.sysconf("SC_SEM_NSEMS_MAX")
+ except (AttributeError, ValueError):
+ # sysconf not available or setting not available
+ return
+ if nsems_max == -1:
+ # indetermine limit, assume that limit is determined
+ # by available memory only
+ return
+ if nsems_max >= 256:
+ # minimum number of semaphores available
+ # according to POSIX
+ return
+ _system_limited = "system provides too few semaphores (%d available, 256 necessary)" % nsems_max
+ raise NotImplementedError(_system_limited)
class ProcessPoolExecutor(_base.Executor):
def __init__(self, max_workers=None):
@@ -261,7 +275,7 @@ class ProcessPoolExecutor(_base.Executor):
execute the given calls. If None or not given then as many
worker processes will be created as the machine has processors.
"""
- _remove_dead_thread_references()
+ _check_system_limits()
if max_workers is None:
self._max_workers = multiprocessing.cpu_count()
@@ -280,33 +294,34 @@ class ProcessPoolExecutor(_base.Executor):
# Shutdown is a two-step process.
self._shutdown_thread = False
- self._shutdown_process_event = multiprocessing.Event()
self._shutdown_lock = threading.Lock()
self._queue_count = 0
self._pending_work_items = {}
def _start_queue_management_thread(self):
+ # When the executor gets lost, the weakref callback will wake up
+ # the queue management thread.
+ def weakref_cb(_, q=self._result_queue):
+ q.put(None)
if self._queue_management_thread is None:
self._queue_management_thread = threading.Thread(
- target=_queue_manangement_worker,
- args=(weakref.ref(self),
+ target=_queue_management_worker,
+ args=(weakref.ref(self, weakref_cb),
self._processes,
self._pending_work_items,
self._work_ids,
self._call_queue,
- self._result_queue,
- self._shutdown_process_event))
+ self._result_queue))
self._queue_management_thread.daemon = True
self._queue_management_thread.start()
- _thread_references.add(weakref.ref(self._queue_management_thread))
+ _threads_queues[self._queue_management_thread] = self._result_queue
def _adjust_process_count(self):
for _ in range(len(self._processes), self._max_workers):
p = multiprocessing.Process(
target=_process_worker,
args=(self._call_queue,
- self._result_queue,
- self._shutdown_process_event))
+ self._result_queue))
p.start()
self._processes.add(p)
@@ -321,6 +336,8 @@ class ProcessPoolExecutor(_base.Executor):
self._pending_work_items[self._queue_count] = w
self._work_ids.put(self._queue_count)
self._queue_count += 1
+ # Wake up queue management thread
+ self._result_queue.put(None)
self._start_queue_management_thread()
self._adjust_process_count()
@@ -330,15 +347,16 @@ class ProcessPoolExecutor(_base.Executor):
def shutdown(self, wait=True):
with self._shutdown_lock:
self._shutdown_thread = True
- if wait:
- if self._queue_management_thread:
+ if self._queue_management_thread:
+ # Wake up queue management thread
+ self._result_queue.put(None)
+ if wait:
self._queue_management_thread.join()
# To reduce the risk of openning too many files, remove references to
# objects that use file descriptors.
self._queue_management_thread = None
self._call_queue = None
self._result_queue = None
- self._shutdown_process_event = None
self._processes = None
shutdown.__doc__ = _base.Executor.shutdown.__doc__