summaryrefslogtreecommitdiff
path: root/python2/futures/thread.py
diff options
context:
space:
mode:
Diffstat (limited to 'python2/futures/thread.py')
-rw-r--r--python2/futures/thread.py41
1 files changed, 35 insertions, 6 deletions
diff --git a/python2/futures/thread.py b/python2/futures/thread.py
index 79e34cb..77a75d0 100644
--- a/python2/futures/thread.py
+++ b/python2/futures/thread.py
@@ -1,4 +1,8 @@
-#!/usr/bin/env python
+# Copyright 2009 Brian Quinlan. All Rights Reserved. See LICENSE file.
+
+"""Implements ThreadPoolExecutor."""
+
+__author__ = 'Brian Quinlan (brian@sweetapp.com)'
from futures._base import (PENDING, RUNNING, CANCELLED,
CANCELLED_AND_NOTIFIED, FINISHED,
@@ -11,8 +15,22 @@ import Queue
import threading
import weakref
-_thread_references = set()
-_shutdown = False
+# Workers are created as daemon threads. This is done to allow the interpreter
+# to exit when there are still idle threads in a ThreadPoolExecutor's thread
+# pool (i.e. shutdown() was not called). However, allowing workers to die with
+# the interpreter has two undesirable properties:
+# - The workers would still be running during interpretor shutdown,
+# meaning that they would fail in unpredictable ways.
+# - The workers could be killed while evaluating a work item, which could
+# be bad if the function being evaluated has external side-effects e.g.
+# writing to a file.
+#
+# To work around this problem, an exit handler is installed which tells the
+# workers to exit when their work queues are empty and then waits until the
+# threads finish.
+
+_thread_references = set() # Weakrefs to every active worker thread.
+_shutdown = False # Indicates that the interpreter is shutting down.
def _python_exit():
global _shutdown
@@ -23,10 +41,19 @@ def _python_exit():
thread.join()
def _remove_dead_thread_references():
+ """Remove inactive threads from _thread_references.
+
+ Should be called periodically to prevent memory leaks in scenarios such as:
+ >>> while True:
+ >>> ... t = ThreadPoolExecutor(max_threads=5)
+ >>> ... t.map(int, ['1', '2', '3', '4', '5'])
+ """
for thread_reference in set(_thread_references):
if thread_reference() is None:
_thread_references.discard(thread_reference)
+atexit.register(_python_exit)
+
class _WorkItem(object):
def __init__(self, call, future, completion_tracker):
self.call = call
@@ -68,6 +95,10 @@ def _worker(executor_reference, work_queue):
work_item = work_queue.get(block=True, timeout=0.1)
except Queue.Empty:
executor = executor_reference()
+ # Exit if:
+ # - The interpreter is shutting down.
+ # - The executor that owns the worker has been collected.
+ # - The executor that owns the worker has been shutdown.
if _shutdown or executor is None or executor._shutdown:
return
del executor
@@ -75,7 +106,7 @@ def _worker(executor_reference, work_queue):
work_item.run()
except Exception, e:
LOGGER.critical('Exception in worker', exc_info=True)
-
+
class ThreadPoolExecutor(Executor):
def __init__(self, max_threads):
_remove_dead_thread_references()
@@ -123,5 +154,3 @@ class ThreadPoolExecutor(Executor):
self._shutdown = True
finally:
self._shutdown_lock.release()
-
-atexit.register(_python_exit)