diff options
Diffstat (limited to 'python2/futures/process.py')
-rw-r--r-- | python2/futures/process.py | 134 |
1 files changed, 82 insertions, 52 deletions
diff --git a/python2/futures/process.py b/python2/futures/process.py index 3e16e80..463ee8b 100644 --- a/python2/futures/process.py +++ b/python2/futures/process.py @@ -5,10 +5,22 @@ from futures._base import (PENDING, RUNNING, CANCELLED, ALL_COMPLETED, set_future_exception, set_future_result, Executor, Future, FutureList, ThreadEventSink) - +import atexit import Queue import multiprocessing import threading +import weakref + +_thread_references = set() +_shutdown = False + +def _python_exit(): + global _shutdown + _shutdown = True + for thread_reference in _thread_references: + thread = thread_reference() + if thread is not None: + thread.join() class _WorkItem(object): def __init__(self, call, future, completion_tracker): @@ -44,6 +56,63 @@ def _process_worker(call_queue, result_queue, shutdown): result_queue.put(_ResultItem(call_item.work_id, result=r)) +def _add_call_item_to_queue(pending_work_items, + work_ids, + call_queue): + while True: + try: + work_id = work_ids.get(block=False) + except Queue.Empty: + return + else: + work_item = pending_work_items[work_id] + + if work_item.future.cancelled(): + work_item.future._condition.acquire() + work_item.future._condition.notify_all() + work_item.future._condition.release() + + work_item.completion_tracker.add_cancelled() + continue + else: + work_item.future._condition.acquire() + work_item.future._state = RUNNING + work_item.future._condition.release() + call_queue.put(_CallItem(work_id, work_item.call), block=True) + if call_queue.full(): + return + +def _result(executor_reference, + pending_work_items, + work_ids_queue, + call_queue, + result_queue, + shutdown_process_event): + while True: + _add_call_item_to_queue(pending_work_items, + work_ids_queue, + call_queue) + try: + result_item = result_queue.get(block=True, timeout=0.1) + except Queue.Empty: + executor = executor_reference() + if _shutdown or executor is None or executor._shutdown_thread: + shutdown_process_event.set() + return + del executor + else: + work_item = pending_work_items[result_item.work_id] + del pending_work_items[result_item.work_id] + + if result_item.exception: + set_future_exception(work_item.future, + work_item.completion_tracker, + result_item.exception) + else: + set_future_result(work_item.future, + work_item.completion_tracker, + result_item.result) + class ProcessPoolExecutor(Executor): def __init__(self, max_processes=None): if max_processes is None: @@ -66,60 +135,19 @@ class ProcessPoolExecutor(Executor): self._queue_count = 0 self._pending_work_items = {} - def _add_call_item_to_queue(self): - while True: - try: - work_id = self._work_ids.get(block=False) - except Queue.Empty: - return - else: - work_item = self._pending_work_items[work_id] - - if work_item.future.cancelled(): - work_item.future._condition.acquire() - work_item.future._condition.notify_all() - work_item.future._condition.release() - - work_item.completion_tracker.add_cancelled() - continue - else: - work_item.future._condition.acquire() - work_item.future._state = RUNNING - work_item.future._condition.release() - - self._call_queue.put(_CallItem(work_id, work_item.call), - block=True) - if self._call_queue.full(): - return - - def _result(self): - while True: - self._add_call_item_to_queue() - try: - result_item = self._result_queue.get(block=True, - timeout=0.1) - except Queue.Empty: - if self._shutdown_thread and not self._pending_work_items: - self._shutdown_process_event.set() - return - else: - work_item = self._pending_work_items[result_item.work_id] - del self._pending_work_items[result_item.work_id] - - if result_item.exception: - set_future_exception(work_item.future, - work_item.completion_tracker, - result_item.exception) - else: - set_future_result(work_item.future, - work_item.completion_tracker, - result_item.result) - def _adjust_process_count(self): if self._queue_management_thread is None: self._queue_management_thread = threading.Thread( - target=self._result) + target=_result, + args=(weakref.ref(self), + self._pending_work_items, + self._work_ids, + self._call_queue, + self._result_queue, + self._shutdown_process_event)) + self._queue_management_thread.setDaemon(True) self._queue_management_thread.start() + _thread_references.add(weakref.ref(self._queue_management_thread)) for _ in range(len(self._processes), self._max_processes): p = multiprocessing.Process( @@ -138,7 +166,7 @@ class ProcessPoolExecutor(Executor): futures = [] event_sink = ThreadEventSink() - self._queue_count + for index, call in enumerate(calls): f = Future(index) self._pending_work_items[self._queue_count] = _WorkItem( @@ -160,3 +188,5 @@ class ProcessPoolExecutor(Executor): self._shutdown_thread = True finally: self._shutdown_lock.release() + +atexit.register(_python_exit)
\ No newline at end of file |