summaryrefslogtreecommitdiff
path: root/python2/futures/process.py
diff options
context:
space:
mode:
Diffstat (limited to 'python2/futures/process.py')
-rw-r--r--python2/futures/process.py134
1 files changed, 82 insertions, 52 deletions
diff --git a/python2/futures/process.py b/python2/futures/process.py
index 3e16e80..463ee8b 100644
--- a/python2/futures/process.py
+++ b/python2/futures/process.py
@@ -5,10 +5,22 @@ from futures._base import (PENDING, RUNNING, CANCELLED,
ALL_COMPLETED,
set_future_exception, set_future_result,
Executor, Future, FutureList, ThreadEventSink)
-
+import atexit
import Queue
import multiprocessing
import threading
+import weakref
+
+_thread_references = set()
+_shutdown = False
+
+def _python_exit():
+ global _shutdown
+ _shutdown = True
+ for thread_reference in _thread_references:
+ thread = thread_reference()
+ if thread is not None:
+ thread.join()
class _WorkItem(object):
def __init__(self, call, future, completion_tracker):
@@ -44,6 +56,63 @@ def _process_worker(call_queue, result_queue, shutdown):
result_queue.put(_ResultItem(call_item.work_id,
result=r))
+def _add_call_item_to_queue(pending_work_items,
+ work_ids,
+ call_queue):
+ while True:
+ try:
+ work_id = work_ids.get(block=False)
+ except Queue.Empty:
+ return
+ else:
+ work_item = pending_work_items[work_id]
+
+ if work_item.future.cancelled():
+ work_item.future._condition.acquire()
+ work_item.future._condition.notify_all()
+ work_item.future._condition.release()
+
+ work_item.completion_tracker.add_cancelled()
+ continue
+ else:
+ work_item.future._condition.acquire()
+ work_item.future._state = RUNNING
+ work_item.future._condition.release()
+ call_queue.put(_CallItem(work_id, work_item.call), block=True)
+ if call_queue.full():
+ return
+
+def _result(executor_reference,
+ pending_work_items,
+ work_ids_queue,
+ call_queue,
+ result_queue,
+ shutdown_process_event):
+ while True:
+ _add_call_item_to_queue(pending_work_items,
+ work_ids_queue,
+ call_queue)
+ try:
+ result_item = result_queue.get(block=True, timeout=0.1)
+ except Queue.Empty:
+ executor = executor_reference()
+ if _shutdown or executor is None or executor._shutdown_thread:
+ shutdown_process_event.set()
+ return
+ del executor
+ else:
+ work_item = pending_work_items[result_item.work_id]
+ del pending_work_items[result_item.work_id]
+
+ if result_item.exception:
+ set_future_exception(work_item.future,
+ work_item.completion_tracker,
+ result_item.exception)
+ else:
+ set_future_result(work_item.future,
+ work_item.completion_tracker,
+ result_item.result)
+
class ProcessPoolExecutor(Executor):
def __init__(self, max_processes=None):
if max_processes is None:
@@ -66,60 +135,19 @@ class ProcessPoolExecutor(Executor):
self._queue_count = 0
self._pending_work_items = {}
- def _add_call_item_to_queue(self):
- while True:
- try:
- work_id = self._work_ids.get(block=False)
- except Queue.Empty:
- return
- else:
- work_item = self._pending_work_items[work_id]
-
- if work_item.future.cancelled():
- work_item.future._condition.acquire()
- work_item.future._condition.notify_all()
- work_item.future._condition.release()
-
- work_item.completion_tracker.add_cancelled()
- continue
- else:
- work_item.future._condition.acquire()
- work_item.future._state = RUNNING
- work_item.future._condition.release()
-
- self._call_queue.put(_CallItem(work_id, work_item.call),
- block=True)
- if self._call_queue.full():
- return
-
- def _result(self):
- while True:
- self._add_call_item_to_queue()
- try:
- result_item = self._result_queue.get(block=True,
- timeout=0.1)
- except Queue.Empty:
- if self._shutdown_thread and not self._pending_work_items:
- self._shutdown_process_event.set()
- return
- else:
- work_item = self._pending_work_items[result_item.work_id]
- del self._pending_work_items[result_item.work_id]
-
- if result_item.exception:
- set_future_exception(work_item.future,
- work_item.completion_tracker,
- result_item.exception)
- else:
- set_future_result(work_item.future,
- work_item.completion_tracker,
- result_item.result)
-
def _adjust_process_count(self):
if self._queue_management_thread is None:
self._queue_management_thread = threading.Thread(
- target=self._result)
+ target=_result,
+ args=(weakref.ref(self),
+ self._pending_work_items,
+ self._work_ids,
+ self._call_queue,
+ self._result_queue,
+ self._shutdown_process_event))
+ self._queue_management_thread.setDaemon(True)
self._queue_management_thread.start()
+ _thread_references.add(weakref.ref(self._queue_management_thread))
for _ in range(len(self._processes), self._max_processes):
p = multiprocessing.Process(
@@ -138,7 +166,7 @@ class ProcessPoolExecutor(Executor):
futures = []
event_sink = ThreadEventSink()
- self._queue_count
+
for index, call in enumerate(calls):
f = Future(index)
self._pending_work_items[self._queue_count] = _WorkItem(
@@ -160,3 +188,5 @@ class ProcessPoolExecutor(Executor):
self._shutdown_thread = True
finally:
self._shutdown_lock.release()
+
+atexit.register(_python_exit) \ No newline at end of file