diff options
Diffstat (limited to 'python2/futures/thread.py')
-rw-r--r-- | python2/futures/thread.py | 41 |
1 files changed, 35 insertions, 6 deletions
diff --git a/python2/futures/thread.py b/python2/futures/thread.py index 79e34cb..77a75d0 100644 --- a/python2/futures/thread.py +++ b/python2/futures/thread.py @@ -1,4 +1,8 @@ -#!/usr/bin/env python +# Copyright 2009 Brian Quinlan. All Rights Reserved. See LICENSE file. + +"""Implements ThreadPoolExecutor.""" + +__author__ = 'Brian Quinlan (brian@sweetapp.com)' from futures._base import (PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, @@ -11,8 +15,22 @@ import Queue import threading import weakref -_thread_references = set() -_shutdown = False +# Workers are created as daemon threads. This is done to allow the interpreter +# to exit when there are still idle threads in a ThreadPoolExecutor's thread +# pool (i.e. shutdown() was not called). However, allowing workers to die with +# the interpreter has two undesirable properties: +# - The workers would still be running during interpretor shutdown, +# meaning that they would fail in unpredictable ways. +# - The workers could be killed while evaluating a work item, which could +# be bad if the function being evaluated has external side-effects e.g. +# writing to a file. +# +# To work around this problem, an exit handler is installed which tells the +# workers to exit when their work queues are empty and then waits until the +# threads finish. + +_thread_references = set() # Weakrefs to every active worker thread. +_shutdown = False # Indicates that the interpreter is shutting down. def _python_exit(): global _shutdown @@ -23,10 +41,19 @@ def _python_exit(): thread.join() def _remove_dead_thread_references(): + """Remove inactive threads from _thread_references. + + Should be called periodically to prevent memory leaks in scenarios such as: + >>> while True: + >>> ... t = ThreadPoolExecutor(max_threads=5) + >>> ... t.map(int, ['1', '2', '3', '4', '5']) + """ for thread_reference in set(_thread_references): if thread_reference() is None: _thread_references.discard(thread_reference) +atexit.register(_python_exit) + class _WorkItem(object): def __init__(self, call, future, completion_tracker): self.call = call @@ -68,6 +95,10 @@ def _worker(executor_reference, work_queue): work_item = work_queue.get(block=True, timeout=0.1) except Queue.Empty: executor = executor_reference() + # Exit if: + # - The interpreter is shutting down. + # - The executor that owns the worker has been collected. + # - The executor that owns the worker has been shutdown. if _shutdown or executor is None or executor._shutdown: return del executor @@ -75,7 +106,7 @@ def _worker(executor_reference, work_queue): work_item.run() except Exception, e: LOGGER.critical('Exception in worker', exc_info=True) - + class ThreadPoolExecutor(Executor): def __init__(self, max_threads): _remove_dead_thread_references() @@ -123,5 +154,3 @@ class ThreadPoolExecutor(Executor): self._shutdown = True finally: self._shutdown_lock.release() - -atexit.register(_python_exit) |