diff options
Diffstat (limited to 'python3/futures/process.py')
-rw-r--r-- | python3/futures/process.py | 155 |
1 files changed, 155 insertions, 0 deletions
diff --git a/python3/futures/process.py b/python3/futures/process.py new file mode 100644 index 0000000..ad6dc6d --- /dev/null +++ b/python3/futures/process.py @@ -0,0 +1,155 @@ +#!/usr/bin/env python + +from futures._base import (PENDING, RUNNING, CANCELLED, + CANCELLED_AND_NOTIFIED, FINISHED, + ALL_COMPLETED, + set_future_exception, set_future_result, + Executor, Future, FutureList, ThreadEventSink) + +import queue +import multiprocessing +import threading + +class _WorkItem(object): + def __init__(self, call, future, completion_tracker): + self.call = call + self.future = future + self.completion_tracker = completion_tracker + +class _ResultItem(object): + def __init__(self, work_id, exception=None, result=None): + self.work_id = work_id + self.exception = exception + self.result = result + +class _CallItem(object): + def __init__(self, work_id, call): + self.work_id = work_id + self.call = call + +def _process_worker(call_queue, result_queue, shutdown): + while True: + try: + call_item = call_queue.get(block=True, timeout=0.1) + except queue.Empty: + if shutdown.is_set(): + return + else: + try: + r = call_item.call() + except BaseException as e: + result_queue.put(_ResultItem(call_item.work_id, + exception=e)) + else: + result_queue.put(_ResultItem(call_item.work_id, + result=r)) + +class ProcessPoolExecutor(Executor): + def __init__(self, max_processes=None): + if max_processes is None: + max_processes = multiprocessing.cpu_count() + + self._max_processes = max_processes + # Make the call queue slightly larger than the number of processes to + # prevent the worker processes from starving but to make future.cancel() + # responsive. + self._call_queue = multiprocessing.Queue(self._max_processes + 1) + self._result_queue = multiprocessing.Queue() + self._work_ids = queue.Queue() + self._queue_management_thread = None + self._processes = set() + + # Shutdown is a two-step process. + self._shutdown_thread = False + self._shutdown_process_event = multiprocessing.Event() + self._shutdown_lock = threading.Lock() + self._queue_count = 0 + self._pending_work_items = {} + + def _add_call_item_to_queue(self): + while True: + try: + work_id = self._work_ids.get(block=False) + except queue.Empty: + return + else: + work_item = self._pending_work_items[work_id] + + if work_item.future.cancelled(): + with work_item.future._condition: + work_item.future._condition.notify_all() + work_item.completion_tracker.add_cancelled() + continue + else: + with work_item.future._condition: + work_item.future._state = RUNNING + + self._call_queue.put(_CallItem(work_id, work_item.call), + block=True) + if self._call_queue.full(): + return + + def _result(self): + while True: + self._add_call_item_to_queue() + try: + result_item = self._result_queue.get(block=True, + timeout=0.1) + except queue.Empty: + if self._shutdown_thread and not self._pending_work_items: + self._shutdown_process_event.set() + return + else: + work_item = self._pending_work_items[result_item.work_id] + del self._pending_work_items[result_item.work_id] + + if result_item.exception: + set_future_exception(work_item.future, + work_item.completion_tracker, + result_item.exception) + else: + set_future_result(work_item.future, + work_item.completion_tracker, + result_item.result) + + def _adjust_process_count(self): + if self._queue_management_thread is None: + self._queue_management_thread = threading.Thread( + target=self._result) + self._queue_management_thread.daemon = True + self._queue_management_thread.start() + + for _ in range(len(self._processes), self._max_processes): + p = multiprocessing.Process( + target=_process_worker, + args=(self._call_queue, + self._result_queue, + self._shutdown_process_event)) + p.daemon = True + p.start() + self._processes.add(p) + + def run_to_futures(self, calls, timeout=None, return_when=ALL_COMPLETED): + with self._shutdown_lock: + if self._shutdown_thread: + raise RuntimeError('cannot run new futures after shutdown') + + futures = [] + event_sink = ThreadEventSink() + self._queue_count + for index, call in enumerate(calls): + f = Future(index) + self._pending_work_items[self._queue_count] = _WorkItem( + call, f, event_sink) + self._work_ids.put(self._queue_count) + futures.append(f) + self._queue_count += 1 + + self._adjust_process_count() + fl = FutureList(futures, event_sink) + fl.wait(timeout=timeout, return_when=return_when) + return fl + + def shutdown(self): + with self._shutdown_lock: + self._shutdown_thread = True |