diff options
Diffstat (limited to 'futures/thread.py')
-rw-r--r-- | futures/thread.py | 274 |
1 files changed, 8 insertions, 266 deletions
diff --git a/futures/thread.py b/futures/thread.py index a4c779d..3f1b807 100644 --- a/futures/thread.py +++ b/futures/thread.py @@ -1,259 +1,9 @@ #!/usr/bin/env python +from futures._base import RUNNING, FINISHED, Executor, ALL_COMPLETED, ThreadEventSink, Future, FutureList import queue import threading -FIRST_COMPLETED = 0 -FIRST_EXCEPTION = 1 -ALL_COMPLETED = 2 -RETURN_IMMEDIATELY = 3 - -_PENDING = 0 -_RUNNING = 1 -_CANCELLED = 2 -_FINISHED = 3 - -_STATE_TO_DESCRIPTION_MAP = { - _PENDING: "pending", - _RUNNING: "running", - _CANCELLED: "cancelled", - _FINISHED: "finished" -} - -class CancelledException(Exception): - pass - -class TimeoutException(Exception): - pass - -class Future(object): - def __init__(self): - self._condition = threading.Condition() - self._state = _PENDING - self._result = None - self._exception = None - - def __repr__(self): - with self._condition: - if self._state == _FINISHED: - if self._exception: - return '<Future state=%s raised %s>' % ( - _STATE_TO_DESCRIPTION_MAP[self._state], - self._exception.__class__.__name__) - else: - return '<Future state=%s returned %s>' % ( - _STATE_TO_DESCRIPTION_MAP[self._state], - self._result.__class__.__name__) - return '<Future state=%s>' % _STATE_TO_DESCRIPTION_MAP[self._state] - - def cancel(self): - with self._condition: - if self._state in [_RUNNING, _FINISHED]: - return False - - self._state = _CANCELLED - return True - - def cancelled(self): - with self._condition: - return self._state == _CANCELLED - - def done(self): - with self._condition: - return self._state in [_CANCELLED, _FINISHED] - - def __get_result(self): - if self._exception: - raise self._exception - else: - return self._result - - def result(self, timeout=None): - with self._condition: - if self._state == _CANCELLED: - raise CancelledException() - elif self._state == _FINISHED: - return self.__get_result() - - print('Waiting...') - self._condition.wait(timeout) - print('Post Waiting...') - - if self._state == _CANCELLED: - raise CancelledException() - elif self._state == _FINISHED: - return self.__get_result() - else: - raise TimeoutException() - - def exception(self, timeout=None): - with self._condition: - if self._state == _CANCELLED: - raise CancelledException() - elif self._state == _FINISHED: - return self._exception - - self._condition.wait(timeout) - - if self._state == _CANCELLED: - raise CancelledException() - elif self._state == _FINISHED: - return self._exception - else: - raise TimeoutException() - -class _NullWaitTracker(object): - def add_result(self): - pass - - def add_exception(self): - pass - - def add_cancelled(self): - pass - -class _FirstCompletedWaitTracker(object): - def __init__(self): - self.event = threading.Event() - - def add_result(self): - self.event.set() - - def add_exception(self): - self.event.set() - - def add_cancelled(self): - self.event.set() - -class _AllCompletedWaitTracker(object): - def __init__(self, pending_calls, stop_on_exception): - self.event = threading.Event() - self.pending_calls = pending_calls - self.stop_on_exception = stop_on_exception - - def add_result(self): - self.pending_calls -= 1 - if not self.pending_calls: - self.event.set() - - def add_exception(self): - self.add_result() - if self.stop_on_exception: - self.event.set() - - def add_cancelled(self): - self.add_result() - -class _ThreadEventSink(object): - def __init__(self): - self._condition = threading.Lock() - self._waiters = [] - - def add(self, e): - self._waiters.append(e) - - def add_result(self): - with self._condition: - for waiter in self._waiters: - waiter.add_result() - - def add_exception(self): - with self._condition: - for waiter in self._waiters: - waiter.add_exception() - - def add_cancelled(self): - with self._condition: - for waiter in self._waiters: - waiter.add_cancelled() - -class FutureList(object): - def __init__(self, futures, event_sink): - self._futures = futures - self._event_sink = event_sink - - def wait(self, timeout=None, run_until=ALL_COMPLETED): - with self._event_sink._condition: - print('WAIT 123') - if all(f.done() for f in self): - return - print('WAIT 1234') - - if run_until == FIRST_COMPLETED: - m = _FirstCompletedWaitTracker() - elif run_until == FIRST_EXCEPTION: - m = _AllCompletedWaitTracker(len(self), stop_on_exception=True) - elif run_until == ALL_COMPLETED: - m = _AllCompletedWaitTracker(len(self), stop_on_exception=False) - elif run_until == RETURN_IMMEDIATELY: - m = _NullWaitTracker() - else: - raise ValueError() - - self._event_sink.add(m) - - if run_until != RETURN_IMMEDIATELY: - print('WAIT 12345', timeout) - m.event.wait(timeout) - - def cancel(self, timeout=None): - for f in self: - f.cancel() - self.wait(timeout=timeout, run_until=ALL_COMPLETED) - if any(not f.done() for f in self): - raise TimeoutException() - - def has_running_futures(self): - return bool(self.running_futures()) - - def has_cancelled_futures(self): - return bool(self.cancelled_futures()) - - def has_done_futures(self): - return bool(self.done_futures()) - - def has_successful_futures(self): - return bool(self.successful_futures()) - - def has_exception_futures(self): - return bool(self.exception_futures()) - - def running_futures(self): - return [f for f in self if not f.done() and not f.cancelled()] - - def cancelled_futures(self): - return [f for f in self if f.cancelled()] - - def done_futures(self): - return [f for f in self if f.done()] - - def successful_futures(self): - return [f for f in self - if f.done() and not f.cancelled() and f.exception() is None] - - def exception_futures(self): - return [f for f in self if f.done() and f.exception() is not None] - - def __getitem__(self, i): - return self._futures[i] - - def __len__(self): - return len(self._futures) - - def __iter__(self): - return iter(self._futures) - - def __contains__(self, f): - return f in self._futures - - def __repr__(self): - return ('<FutureList #futures=%d ' - '[#success=%d #exception=%d #cancelled=%d]>' % ( - len(self), - len(self.successful_futures()), - len(self.exception_futures()), - len(self.cancelled_futures()))) - class _WorkItem(object): def __init__(self, call, future, completion_tracker): self.call = call @@ -267,23 +17,25 @@ class _WorkItem(object): self.completion_tracker.add_cancelled() return - self.future._state = _RUNNING + with self.future._condition: + self.future._state = RUNNING + try: r = self.call() except BaseException as e: with self.future._condition: self.future._exception = e - self.future._state = _FINISHED + self.future._state = FINISHED self.future._condition.notify_all() self.completion_tracker.add_exception() else: with self.future._condition: self.future._result = r - self.future._state = _FINISHED + self.future._state = FINISHED self.future._condition.notify_all() self.completion_tracker.add_result() -class ThreadPoolExecutor(object): +class ThreadPoolExecutor(Executor): def __init__(self, max_threads): self._max_threads = max_threads self._work_queue = queue.Queue() @@ -308,7 +60,6 @@ class ThreadPoolExecutor(object): def _adjust_thread_count(self): for _ in range(len(self._threads), min(self._max_threads, self._work_queue.qsize())): - print('Creating a thread') t = threading.Thread(target=self._worker) t.daemon = True t.start() @@ -320,27 +71,18 @@ class ThreadPoolExecutor(object): raise RuntimeError() futures = [] - event_sink = _ThreadEventSink() + event_sink = ThreadEventSink() for call in calls: f = Future() w = _WorkItem(call, f, event_sink) self._work_queue.put(w) futures.append(f) - print('futures:', futures) self._adjust_thread_count() fl = FutureList(futures, event_sink) fl.wait(timeout=timeout, run_until=run_until) return fl - def runXXX(self, calls, timeout=None): - fs = self.run(calls, timeout, run_util=FIRST_EXCEPTION) - - if fs.has_exception_futures(): - raise fs.exception_futures()[0].exception() - else: - return [f.result() for f in fs] - def shutdown(self): with self._lock: self._shutdown = True |