diff options
Diffstat (limited to 'futures/process.py')
-rw-r--r-- | futures/process.py | 94 |
1 files changed, 94 insertions, 0 deletions
diff --git a/futures/process.py b/futures/process.py new file mode 100644 index 0000000..945303f --- /dev/null +++ b/futures/process.py @@ -0,0 +1,94 @@ +#!/usr/bin/env python + +class _WorkItem(object): + def __init__(self, call, future, completion_tracker): + self.call = call + self.future = future + self.completion_tracker = completion_tracker + + def run(self): + if self.future.cancelled(): + with self.future._condition: + self.future._condition.notify_all() + self.completion_tracker.add_cancelled() + return + + self.future._state = _RUNNING + try: + r = self.call() + except BaseException as e: + with self.future._condition: + self.future._exception = e + self.future._state = _FINISHED + self.future._condition.notify_all() + self.completion_tracker.add_exception() + else: + with self.future._condition: + self.future._result = r + self.future._state = _FINISHED + self.future._condition.notify_all() + self.completion_tracker.add_result() + +class XXX: + def wait(self, timeout=None, run_until=ALL_COMPLETED): + + pass + +class ProcessPoolExecutor(object): + def __init__(self, max_processes): + self._max_processes = max_processes + self._work_queue = multiprocessing.Queue() + self._processes = set() + self._shutdown = False + self._lock = threading.Lock() + self._queue_count = 0 + self._pending_futures = {} + + def _(self): + while True: + try: + result_item = self._result_queue.get(block=True, + timeout=0.1) + except multiprocessing.TimeoutError: + if self._shutdown: + return + else: + completion_tracker, future = self._pending_futures[ + result_item.index] + + if result_item.exception: + with future._condition: + future._exception = result_item.exception + future._state = _FINISHED + future._condition.notify_all() + completion_tracker.add_exception() + else: + with future._condition: + future._result = result_item.result + future._state = _FINISHED + future._condition.notify_all() + completion_tracker.add_result() + + + + def _adjust_process_count(self): + + def run(self, calls, timeout=None, run_until=ALL_COMPLETED): + with self._lock: + if self._shutdown: + raise RuntimeError() + + futures = [] + event_sink = _ThreadEventSink() + for call in calls: + f = Future() + w = _WorkItem(call, f, event_sink) + self._work_queue.put(w) + futures.append(f) + self._queue_count += 1 + + print('futures:', futures) + self._adjust_process_count() + fl = FutureList(futures, event_sink) + fl.wait(timeout=timeout, run_until=run_until) + return fl |