summaryrefslogtreecommitdiff
path: root/python2/futures/thread.py
diff options
context:
space:
mode:
Diffstat (limited to 'python2/futures/thread.py')
-rw-r--r--python2/futures/thread.py56
1 files changed, 41 insertions, 15 deletions
diff --git a/python2/futures/thread.py b/python2/futures/thread.py
index 5834fd4..79e34cb 100644
--- a/python2/futures/thread.py
+++ b/python2/futures/thread.py
@@ -6,8 +6,26 @@ from futures._base import (PENDING, RUNNING, CANCELLED,
LOGGER,
set_future_exception, set_future_result,
Executor, Future, FutureList, ThreadEventSink)
+import atexit
import Queue
import threading
+import weakref
+
+_thread_references = set()
+_shutdown = False
+
+def _python_exit():
+ global _shutdown
+ _shutdown = True
+ for thread_reference in _thread_references:
+ thread = thread_reference()
+ if thread is not None:
+ thread.join()
+
+def _remove_dead_thread_references():
+ for thread_reference in set(_thread_references):
+ if thread_reference() is None:
+ _thread_references.discard(thread_reference)
class _WorkItem(object):
def __init__(self, call, future, completion_tracker):
@@ -43,34 +61,40 @@ class _WorkItem(object):
else:
set_future_result(self.future, self.completion_tracker, result)
+def _worker(executor_reference, work_queue):
+ try:
+ while True:
+ try:
+ work_item = work_queue.get(block=True, timeout=0.1)
+ except Queue.Empty:
+ executor = executor_reference()
+ if _shutdown or executor is None or executor._shutdown:
+ return
+ del executor
+ else:
+ work_item.run()
+ except Exception, e:
+ LOGGER.critical('Exception in worker', exc_info=True)
+
class ThreadPoolExecutor(Executor):
def __init__(self, max_threads):
+ _remove_dead_thread_references()
+
self._max_threads = max_threads
self._work_queue = Queue.Queue()
self._threads = set()
self._shutdown = False
self._shutdown_lock = threading.Lock()
- def _worker(self):
- empty = Queue.Empty
- try:
- while True:
- try:
- work_item = self._work_queue.get(block=True, timeout=0.1)
- except empty:
- if self._shutdown:
- return
- else:
- work_item.run()
- except BaseException, e:
- LOGGER.critical('Exception in worker', exc_info=True)
-
def _adjust_thread_count(self):
for _ in range(len(self._threads),
min(self._max_threads, self._work_queue.qsize())):
- t = threading.Thread(target=self._worker)
+ t = threading.Thread(target=_worker,
+ args=(weakref.ref(self), self._work_queue))
+ t.setDaemon(True)
t.start()
self._threads.add(t)
+ _thread_references.add(weakref.ref(t))
def run_to_futures(self, calls, timeout=None, return_when=ALL_COMPLETED):
self._shutdown_lock.acquire()
@@ -99,3 +123,5 @@ class ThreadPoolExecutor(Executor):
self._shutdown = True
finally:
self._shutdown_lock.release()
+
+atexit.register(_python_exit)