From e1c6c9758b70b3e9e1630b5e8545a9a1e3de7368 Mon Sep 17 00:00:00 2001 From: brian quinlan Date: Sat, 23 May 2009 09:18:19 +0000 Subject: Seperate into python2 and python3 directories --- python2/futures/thread.py | 89 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 89 insertions(+) create mode 100644 python2/futures/thread.py (limited to 'python2/futures/thread.py') diff --git a/python2/futures/thread.py b/python2/futures/thread.py new file mode 100644 index 0000000..9e3275f --- /dev/null +++ b/python2/futures/thread.py @@ -0,0 +1,89 @@ +#!/usr/bin/env python + +from futures._base import (PENDING, RUNNING, CANCELLED, + CANCELLED_AND_NOTIFIED, FINISHED, + ALL_COMPLETED, + LOGGER, + set_future_exception, set_future_result, + Executor, Future, FutureList, ThreadEventSink) +import queue +import threading + +class _WorkItem(object): + def __init__(self, call, future, completion_tracker): + self.call = call + self.future = future + self.completion_tracker = completion_tracker + + def run(self): + with self.future._condition: + if self.future._state == PENDING: + self.future._state = RUNNING + elif self.future._state == CANCELLED: + with self.completion_tracker._condition: + self.future._state = CANCELLED_AND_NOTIFIED + self.completion_tracker.add_cancelled() + return + else: + LOGGER.critical('Future %s in unexpected state: %d', + id(self.future), + self.future._state) + return + + try: + result = self.call() + except BaseException as e: + set_future_exception(self.future, self.completion_tracker, e) + else: + set_future_result(self.future, self.completion_tracker, result) + +class ThreadPoolExecutor(Executor): + def __init__(self, max_threads): + self._max_threads = max_threads + self._work_queue = queue.Queue() + self._threads = set() + self._shutdown = False + self._shutdown_lock = threading.Lock() + + def _worker(self): + try: + while True: + try: + work_item = self._work_queue.get(block=True, timeout=0.1) + except queue.Empty: + if self._shutdown: + return + else: + work_item.run() + except BaseException as e: + LOGGER.critical('Exception in worker', exc_info=True) + + def _adjust_thread_count(self): + for _ in range(len(self._threads), + min(self._max_threads, self._work_queue.qsize())): + t = threading.Thread(target=self._worker) + t.daemon = True + t.start() + self._threads.add(t) + + def run_to_futures(self, calls, timeout=None, return_when=ALL_COMPLETED): + with self._shutdown_lock: + if self._shutdown: + raise RuntimeError('cannot run new futures after shutdown') + + futures = [] + event_sink = ThreadEventSink() + for index, call in enumerate(calls): + f = Future(index) + w = _WorkItem(call, f, event_sink) + self._work_queue.put(w) + futures.append(f) + + self._adjust_thread_count() + fl = FutureList(futures, event_sink) + fl.wait(timeout=timeout, return_when=return_when) + return fl + + def shutdown(self): + with self._shutdown_lock: + self._shutdown = True -- cgit v1.2.1