diff options
Diffstat (limited to 'concurrent/futures/process.py')
-rw-r--r-- | concurrent/futures/process.py | 170 |
1 files changed, 94 insertions, 76 deletions
diff --git a/concurrent/futures/process.py b/concurrent/futures/process.py index 87dc789..98684f8 100644 --- a/concurrent/futures/process.py +++ b/concurrent/futures/process.py @@ -73,28 +73,17 @@ __author__ = 'Brian Quinlan (brian@sweetapp.com)' # workers to exit when their work queues are empty and then waits until the # threads/processes finish. -_thread_references = set() +_threads_queues = weakref.WeakKeyDictionary() _shutdown = False def _python_exit(): global _shutdown _shutdown = True - for thread_reference in _thread_references: - thread = thread_reference() - if thread is not None: - thread.join() - -def _remove_dead_thread_references(): - """Remove inactive threads from _thread_references. - - Should be called periodically to prevent memory leaks in scenarios such as: - >>> while True: - >>> ... t = ThreadPoolExecutor(max_workers=5) - >>> ... t.map(int, ['1', '2', '3', '4', '5']) - """ - for thread_reference in set(_thread_references): - if thread_reference() is None: - _thread_references.discard(thread_reference) + items = list(_threads_queues.items()) + for t, q in items: + q.put(None) + for t, q in items: + t.join() # Controls how many more calls than processes will be queued in the call queue. # A smaller number will mean that processes spend more time idle waiting for @@ -122,10 +111,10 @@ class _CallItem(object): self.args = args self.kwargs = kwargs -def _process_worker(call_queue, result_queue, shutdown): +def _process_worker(call_queue, result_queue): """Evaluates calls from call_queue and places the results in result_queue. - This worker is run in a seperate process. + This worker is run in a separate process. Args: call_queue: A multiprocessing.Queue of _CallItems that will be read and @@ -136,21 +125,20 @@ def _process_worker(call_queue, result_queue, shutdown): worker that it should exit when call_queue is empty. """ while True: + call_item = call_queue.get(block=True) + if call_item is None: + # Wake up queue management thread + result_queue.put(None) + return try: - call_item = call_queue.get(block=True, timeout=0.1) - except queue.Empty: - if shutdown.is_set(): - return + r = call_item.fn(*call_item.args, **call_item.kwargs) + except BaseException: + e = sys.exc_info()[1] + result_queue.put(_ResultItem(call_item.work_id, + exception=e)) else: - try: - r = call_item.fn(*call_item.args, **call_item.kwargs) - except BaseException: - e = sys.exc_info()[1] - result_queue.put(_ResultItem(call_item.work_id, - exception=e)) - else: - result_queue.put(_ResultItem(call_item.work_id, - result=r)) + result_queue.put(_ResultItem(call_item.work_id, + result=r)) def _add_call_item_to_queue(pending_work_items, work_ids, @@ -189,13 +177,12 @@ def _add_call_item_to_queue(pending_work_items, del pending_work_items[work_id] continue -def _queue_manangement_worker(executor_reference, - processes, - pending_work_items, - work_ids_queue, - call_queue, - result_queue, - shutdown_process_event): +def _queue_management_worker(executor_reference, + processes, + pending_work_items, + work_ids_queue, + call_queue, + result_queue): """Manages the communication between this process and the worker processes. This function is run in a local thread. @@ -213,37 +200,19 @@ def _queue_manangement_worker(executor_reference, derived from _WorkItems for processing by the process workers. result_queue: A multiprocessing.Queue of _ResultItems generated by the process workers. - shutdown_process_event: A multiprocessing.Event used to signal the - process workers that they should exit when their work queue is - empty. """ + nb_shutdown_processes = [0] + def shutdown_one_process(): + """Tell a worker to terminate, which will in turn wake us again""" + call_queue.put(None) + nb_shutdown_processes[0] += 1 while True: _add_call_item_to_queue(pending_work_items, work_ids_queue, call_queue) - try: - result_item = result_queue.get(block=True, timeout=0.1) - except queue.Empty: - executor = executor_reference() - # No more work items can be added if: - # - The interpreter is shutting down OR - # - The executor that owns this worker has been collected OR - # - The executor that owns this worker has been shutdown. - if _shutdown or executor is None or executor._shutdown_thread: - # Since no new work items can be added, it is safe to shutdown - # this thread if there are no pending work items. - if not pending_work_items: - shutdown_process_event.set() - - # If .join() is not called on the created processes then - # some multiprocessing.Queue methods may deadlock on Mac OS - # X. - for p in processes: - p.join() - return - del executor - else: + result_item = result_queue.get(block=True) + if result_item is not None: work_item = pending_work_items[result_item.work_id] del pending_work_items[result_item.work_id] @@ -251,6 +220,51 @@ def _queue_manangement_worker(executor_reference, work_item.future.set_exception(result_item.exception) else: work_item.future.set_result(result_item.result) + # Check whether we should start shutting down. + executor = executor_reference() + # No more work items can be added if: + # - The interpreter is shutting down OR + # - The executor that owns this worker has been collected OR + # - The executor that owns this worker has been shutdown. + if _shutdown or executor is None or executor._shutdown_thread: + # Since no new work items can be added, it is safe to shutdown + # this thread if there are no pending work items. + if not pending_work_items: + while nb_shutdown_processes[0] < len(processes): + shutdown_one_process() + # If .join() is not called on the created processes then + # some multiprocessing.Queue methods may deadlock on Mac OS + # X. + for p in processes: + p.join() + call_queue.close() + return + del executor + +_system_limits_checked = False +_system_limited = None +def _check_system_limits(): + global _system_limits_checked, _system_limited + if _system_limits_checked: + if _system_limited: + raise NotImplementedError(_system_limited) + _system_limits_checked = True + try: + import os + nsems_max = os.sysconf("SC_SEM_NSEMS_MAX") + except (AttributeError, ValueError): + # sysconf not available or setting not available + return + if nsems_max == -1: + # indetermine limit, assume that limit is determined + # by available memory only + return + if nsems_max >= 256: + # minimum number of semaphores available + # according to POSIX + return + _system_limited = "system provides too few semaphores (%d available, 256 necessary)" % nsems_max + raise NotImplementedError(_system_limited) class ProcessPoolExecutor(_base.Executor): def __init__(self, max_workers=None): @@ -261,7 +275,7 @@ class ProcessPoolExecutor(_base.Executor): execute the given calls. If None or not given then as many worker processes will be created as the machine has processors. """ - _remove_dead_thread_references() + _check_system_limits() if max_workers is None: self._max_workers = multiprocessing.cpu_count() @@ -280,33 +294,34 @@ class ProcessPoolExecutor(_base.Executor): # Shutdown is a two-step process. self._shutdown_thread = False - self._shutdown_process_event = multiprocessing.Event() self._shutdown_lock = threading.Lock() self._queue_count = 0 self._pending_work_items = {} def _start_queue_management_thread(self): + # When the executor gets lost, the weakref callback will wake up + # the queue management thread. + def weakref_cb(_, q=self._result_queue): + q.put(None) if self._queue_management_thread is None: self._queue_management_thread = threading.Thread( - target=_queue_manangement_worker, - args=(weakref.ref(self), + target=_queue_management_worker, + args=(weakref.ref(self, weakref_cb), self._processes, self._pending_work_items, self._work_ids, self._call_queue, - self._result_queue, - self._shutdown_process_event)) + self._result_queue)) self._queue_management_thread.daemon = True self._queue_management_thread.start() - _thread_references.add(weakref.ref(self._queue_management_thread)) + _threads_queues[self._queue_management_thread] = self._result_queue def _adjust_process_count(self): for _ in range(len(self._processes), self._max_workers): p = multiprocessing.Process( target=_process_worker, args=(self._call_queue, - self._result_queue, - self._shutdown_process_event)) + self._result_queue)) p.start() self._processes.add(p) @@ -321,6 +336,8 @@ class ProcessPoolExecutor(_base.Executor): self._pending_work_items[self._queue_count] = w self._work_ids.put(self._queue_count) self._queue_count += 1 + # Wake up queue management thread + self._result_queue.put(None) self._start_queue_management_thread() self._adjust_process_count() @@ -330,15 +347,16 @@ class ProcessPoolExecutor(_base.Executor): def shutdown(self, wait=True): with self._shutdown_lock: self._shutdown_thread = True - if wait: - if self._queue_management_thread: + if self._queue_management_thread: + # Wake up queue management thread + self._result_queue.put(None) + if wait: self._queue_management_thread.join() # To reduce the risk of openning too many files, remove references to # objects that use file descriptors. self._queue_management_thread = None self._call_queue = None self._result_queue = None - self._shutdown_process_event = None self._processes = None shutdown.__doc__ = _base.Executor.shutdown.__doc__ |