summaryrefslogtreecommitdiff
path: root/python3/futures/thread.py
diff options
context:
space:
mode:
Diffstat (limited to 'python3/futures/thread.py')
-rw-r--r--python3/futures/thread.py35
1 files changed, 32 insertions, 3 deletions
diff --git a/python3/futures/thread.py b/python3/futures/thread.py
index 7f64372..9292863 100644
--- a/python3/futures/thread.py
+++ b/python3/futures/thread.py
@@ -1,4 +1,8 @@
-#!/usr/bin/env python
+# Copyright 2009 Brian Quinlan. All Rights Reserved. See LICENSE file.
+
+"""Implements ThreadPoolExecutor."""
+
+__author__ = 'Brian Quinlan (brian@sweetapp.com)'
from futures._base import (PENDING, RUNNING, CANCELLED,
CANCELLED_AND_NOTIFIED, FINISHED,
@@ -11,6 +15,20 @@ import queue
import threading
import weakref
+# Workers are created as daemon threads. This is done to allow the interpreter
+# to exit when there are still idle threads in a ThreadPoolExecutor's thread
+# pool (i.e. shutdown() was not called). However, allowing workers to die with
+# the interpreter has two undesirable properties:
+# - The workers would still be running during interpretor shutdown,
+# meaning that they would fail in unpredictable ways.
+# - The workers could be killed while evaluating a work item, which could
+# be bad if the function being evaluated has external side-effects e.g.
+# writing to a file.
+#
+# To work around this problem, an exit handler is installed which tells the
+# workers to exit when their work queues are empty and then waits until the
+# threads finish.
+
_thread_references = set()
_shutdown = False
@@ -23,10 +41,19 @@ def _python_exit():
thread.join()
def _remove_dead_thread_references():
+ """Remove inactive threads from _thread_references.
+
+ Should be called periodically to prevent memory leaks in scenarios such as:
+ >>> while True:
+ >>> ... t = ThreadPoolExecutor(max_threads=5)
+ >>> ... t.map(int, ['1', '2', '3', '4', '5'])
+ """
for thread_reference in set(_thread_references):
if thread_reference() is None:
_thread_references.discard(thread_reference)
+atexit.register(_python_exit)
+
class _WorkItem(object):
def __init__(self, call, future, completion_tracker):
self.call = call
@@ -62,6 +89,10 @@ def _worker(executor_reference, work_queue):
work_item = work_queue.get(block=True, timeout=0.1)
except queue.Empty:
executor = executor_reference()
+ # Exit if:
+ # - The interpreter is shutting down.
+ # - The executor that owns the worker has been collected.
+ # - The executor that owns the worker has been shutdown.
if _shutdown or executor is None or executor._shutdown:
return
del executor
@@ -109,5 +140,3 @@ class ThreadPoolExecutor(Executor):
def shutdown(self):
with self._shutdown_lock:
self._shutdown = True
-
-atexit.register(_python_exit)