diff options
Diffstat (limited to 'python3/futures/thread.py')
-rw-r--r-- | python3/futures/thread.py | 35 |
1 files changed, 32 insertions, 3 deletions
diff --git a/python3/futures/thread.py b/python3/futures/thread.py index 7f64372..9292863 100644 --- a/python3/futures/thread.py +++ b/python3/futures/thread.py @@ -1,4 +1,8 @@ -#!/usr/bin/env python +# Copyright 2009 Brian Quinlan. All Rights Reserved. See LICENSE file. + +"""Implements ThreadPoolExecutor.""" + +__author__ = 'Brian Quinlan (brian@sweetapp.com)' from futures._base import (PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, @@ -11,6 +15,20 @@ import queue import threading import weakref +# Workers are created as daemon threads. This is done to allow the interpreter +# to exit when there are still idle threads in a ThreadPoolExecutor's thread +# pool (i.e. shutdown() was not called). However, allowing workers to die with +# the interpreter has two undesirable properties: +# - The workers would still be running during interpretor shutdown, +# meaning that they would fail in unpredictable ways. +# - The workers could be killed while evaluating a work item, which could +# be bad if the function being evaluated has external side-effects e.g. +# writing to a file. +# +# To work around this problem, an exit handler is installed which tells the +# workers to exit when their work queues are empty and then waits until the +# threads finish. + _thread_references = set() _shutdown = False @@ -23,10 +41,19 @@ def _python_exit(): thread.join() def _remove_dead_thread_references(): + """Remove inactive threads from _thread_references. + + Should be called periodically to prevent memory leaks in scenarios such as: + >>> while True: + >>> ... t = ThreadPoolExecutor(max_threads=5) + >>> ... t.map(int, ['1', '2', '3', '4', '5']) + """ for thread_reference in set(_thread_references): if thread_reference() is None: _thread_references.discard(thread_reference) +atexit.register(_python_exit) + class _WorkItem(object): def __init__(self, call, future, completion_tracker): self.call = call @@ -62,6 +89,10 @@ def _worker(executor_reference, work_queue): work_item = work_queue.get(block=True, timeout=0.1) except queue.Empty: executor = executor_reference() + # Exit if: + # - The interpreter is shutting down. + # - The executor that owns the worker has been collected. + # - The executor that owns the worker has been shutdown. if _shutdown or executor is None or executor._shutdown: return del executor @@ -109,5 +140,3 @@ class ThreadPoolExecutor(Executor): def shutdown(self): with self._shutdown_lock: self._shutdown = True - -atexit.register(_python_exit) |