diff options
Diffstat (limited to 'python2/futures/thread.py')
-rw-r--r-- | python2/futures/thread.py | 56 |
1 files changed, 41 insertions, 15 deletions
diff --git a/python2/futures/thread.py b/python2/futures/thread.py index 5834fd4..79e34cb 100644 --- a/python2/futures/thread.py +++ b/python2/futures/thread.py @@ -6,8 +6,26 @@ from futures._base import (PENDING, RUNNING, CANCELLED, LOGGER, set_future_exception, set_future_result, Executor, Future, FutureList, ThreadEventSink) +import atexit import Queue import threading +import weakref + +_thread_references = set() +_shutdown = False + +def _python_exit(): + global _shutdown + _shutdown = True + for thread_reference in _thread_references: + thread = thread_reference() + if thread is not None: + thread.join() + +def _remove_dead_thread_references(): + for thread_reference in set(_thread_references): + if thread_reference() is None: + _thread_references.discard(thread_reference) class _WorkItem(object): def __init__(self, call, future, completion_tracker): @@ -43,34 +61,40 @@ class _WorkItem(object): else: set_future_result(self.future, self.completion_tracker, result) +def _worker(executor_reference, work_queue): + try: + while True: + try: + work_item = work_queue.get(block=True, timeout=0.1) + except Queue.Empty: + executor = executor_reference() + if _shutdown or executor is None or executor._shutdown: + return + del executor + else: + work_item.run() + except Exception, e: + LOGGER.critical('Exception in worker', exc_info=True) + class ThreadPoolExecutor(Executor): def __init__(self, max_threads): + _remove_dead_thread_references() + self._max_threads = max_threads self._work_queue = Queue.Queue() self._threads = set() self._shutdown = False self._shutdown_lock = threading.Lock() - def _worker(self): - empty = Queue.Empty - try: - while True: - try: - work_item = self._work_queue.get(block=True, timeout=0.1) - except empty: - if self._shutdown: - return - else: - work_item.run() - except BaseException, e: - LOGGER.critical('Exception in worker', exc_info=True) - def _adjust_thread_count(self): for _ in range(len(self._threads), min(self._max_threads, self._work_queue.qsize())): - t = threading.Thread(target=self._worker) + t = threading.Thread(target=_worker, + args=(weakref.ref(self), self._work_queue)) + t.setDaemon(True) t.start() self._threads.add(t) + _thread_references.add(weakref.ref(t)) def run_to_futures(self, calls, timeout=None, return_when=ALL_COMPLETED): self._shutdown_lock.acquire() @@ -99,3 +123,5 @@ class ThreadPoolExecutor(Executor): self._shutdown = True finally: self._shutdown_lock.release() + +atexit.register(_python_exit) |