From dfcd0a3976b4469c0fb97e55dbd3fe613ca4f8be Mon Sep 17 00:00:00 2001 From: "brian.quinlan" Date: Sun, 14 Jun 2009 15:09:00 +0000 Subject: Some style fixes and documentation improvements. --- python2/futures/__init__.py | 6 ++++++ python2/futures/_base.py | 20 ++++++++++++-------- python2/futures/process.py | 6 +++++- python2/futures/thread.py | 41 +++++++++++++++++++++++++++++++++++------ python3/futures/__init__.py | 6 ++++++ python3/futures/_base.py | 4 ++++ python3/futures/process.py | 6 +++++- python3/futures/thread.py | 35 ++++++++++++++++++++++++++++++++--- 8 files changed, 105 insertions(+), 19 deletions(-) diff --git a/python2/futures/__init__.py b/python2/futures/__init__.py index 22f10db..27a5720 100644 --- a/python2/futures/__init__.py +++ b/python2/futures/__init__.py @@ -1,3 +1,9 @@ +# Copyright 2009 Brian Quinlan. All Rights Reserved. See LICENSE file. + +"""Execute computations asynchronously using threads or processes.""" + +__author__ = 'Brian Quinlan (brian@sweetapp.com)' + from futures._base import (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED, RETURN_IMMEDIATELY, CancelledError, TimeoutError, diff --git a/python2/futures/_base.py b/python2/futures/_base.py index 8d0841e..b60f19a 100644 --- a/python2/futures/_base.py +++ b/python2/futures/_base.py @@ -1,3 +1,7 @@ +# Copyright 2009 Brian Quinlan. All Rights Reserved. See LICENSE file. + +__author__ = 'Brian Quinlan (brian@sweetapp.com)' + import logging import threading import time @@ -6,14 +10,14 @@ try: from functools import partial except ImportError: def partial(func, *args, **keywords): - def newfunc(*fargs, **fkeywords): - newkeywords = keywords.copy() - newkeywords.update(fkeywords) - return func(*(args + fargs), **newkeywords) - newfunc.func = func - newfunc.args = args - newfunc.keywords = keywords - return newfunc + def newfunc(*fargs, **fkeywords): + newkeywords = keywords.copy() + newkeywords.update(fkeywords) + return func(*(args + fargs), **newkeywords) + newfunc.func = func + newfunc.args = args + newfunc.keywords = keywords + return newfunc # The "any" and "all" builtins weren't introduced until Python 2.5. try: diff --git a/python2/futures/process.py b/python2/futures/process.py index 463ee8b..03deb60 100644 --- a/python2/futures/process.py +++ b/python2/futures/process.py @@ -1,4 +1,8 @@ -#!/usr/bin/env python +# Copyright 2009 Brian Quinlan. All Rights Reserved. See LICENSE file. + +"""Implements ProcessPoolExecutor.""" + +__author__ = 'Brian Quinlan (brian@sweetapp.com)' from futures._base import (PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, diff --git a/python2/futures/thread.py b/python2/futures/thread.py index 79e34cb..77a75d0 100644 --- a/python2/futures/thread.py +++ b/python2/futures/thread.py @@ -1,4 +1,8 @@ -#!/usr/bin/env python +# Copyright 2009 Brian Quinlan. All Rights Reserved. See LICENSE file. + +"""Implements ThreadPoolExecutor.""" + +__author__ = 'Brian Quinlan (brian@sweetapp.com)' from futures._base import (PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, @@ -11,8 +15,22 @@ import Queue import threading import weakref -_thread_references = set() -_shutdown = False +# Workers are created as daemon threads. This is done to allow the interpreter +# to exit when there are still idle threads in a ThreadPoolExecutor's thread +# pool (i.e. shutdown() was not called). However, allowing workers to die with +# the interpreter has two undesirable properties: +# - The workers would still be running during interpretor shutdown, +# meaning that they would fail in unpredictable ways. +# - The workers could be killed while evaluating a work item, which could +# be bad if the function being evaluated has external side-effects e.g. +# writing to a file. +# +# To work around this problem, an exit handler is installed which tells the +# workers to exit when their work queues are empty and then waits until the +# threads finish. + +_thread_references = set() # Weakrefs to every active worker thread. +_shutdown = False # Indicates that the interpreter is shutting down. def _python_exit(): global _shutdown @@ -23,10 +41,19 @@ def _python_exit(): thread.join() def _remove_dead_thread_references(): + """Remove inactive threads from _thread_references. + + Should be called periodically to prevent memory leaks in scenarios such as: + >>> while True: + >>> ... t = ThreadPoolExecutor(max_threads=5) + >>> ... t.map(int, ['1', '2', '3', '4', '5']) + """ for thread_reference in set(_thread_references): if thread_reference() is None: _thread_references.discard(thread_reference) +atexit.register(_python_exit) + class _WorkItem(object): def __init__(self, call, future, completion_tracker): self.call = call @@ -68,6 +95,10 @@ def _worker(executor_reference, work_queue): work_item = work_queue.get(block=True, timeout=0.1) except Queue.Empty: executor = executor_reference() + # Exit if: + # - The interpreter is shutting down. + # - The executor that owns the worker has been collected. + # - The executor that owns the worker has been shutdown. if _shutdown or executor is None or executor._shutdown: return del executor @@ -75,7 +106,7 @@ def _worker(executor_reference, work_queue): work_item.run() except Exception, e: LOGGER.critical('Exception in worker', exc_info=True) - + class ThreadPoolExecutor(Executor): def __init__(self, max_threads): _remove_dead_thread_references() @@ -123,5 +154,3 @@ class ThreadPoolExecutor(Executor): self._shutdown = True finally: self._shutdown_lock.release() - -atexit.register(_python_exit) diff --git a/python3/futures/__init__.py b/python3/futures/__init__.py index 5f599ad..86b67dc 100644 --- a/python3/futures/__init__.py +++ b/python3/futures/__init__.py @@ -1,3 +1,9 @@ +# Copyright 2009 Brian Quinlan. All Rights Reserved. See LICENSE file. + +"""Execute computations asynchronously using threads or processes.""" + +__author__ = 'Brian Quinlan (brian@sweetapp.com)' + from futures._base import (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED, RETURN_IMMEDIATELY, CancelledError, TimeoutError, diff --git a/python3/futures/_base.py b/python3/futures/_base.py index 19cabe8..3997ecc 100644 --- a/python3/futures/_base.py +++ b/python3/futures/_base.py @@ -1,3 +1,7 @@ +# Copyright 2009 Brian Quinlan. All Rights Reserved. See LICENSE file. + +__author__ = 'Brian Quinlan (brian@sweetapp.com)' + import functools import logging import threading diff --git a/python3/futures/process.py b/python3/futures/process.py index 6a386e5..94d7988 100644 --- a/python3/futures/process.py +++ b/python3/futures/process.py @@ -1,4 +1,8 @@ -#!/usr/bin/env python +# Copyright 2009 Brian Quinlan. All Rights Reserved. See LICENSE file. + +"""Implements ProcessPoolExecutor.""" + +__author__ = 'Brian Quinlan (brian@sweetapp.com)' from futures._base import (PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, diff --git a/python3/futures/thread.py b/python3/futures/thread.py index 7f64372..9292863 100644 --- a/python3/futures/thread.py +++ b/python3/futures/thread.py @@ -1,4 +1,8 @@ -#!/usr/bin/env python +# Copyright 2009 Brian Quinlan. All Rights Reserved. See LICENSE file. + +"""Implements ThreadPoolExecutor.""" + +__author__ = 'Brian Quinlan (brian@sweetapp.com)' from futures._base import (PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, @@ -11,6 +15,20 @@ import queue import threading import weakref +# Workers are created as daemon threads. This is done to allow the interpreter +# to exit when there are still idle threads in a ThreadPoolExecutor's thread +# pool (i.e. shutdown() was not called). However, allowing workers to die with +# the interpreter has two undesirable properties: +# - The workers would still be running during interpretor shutdown, +# meaning that they would fail in unpredictable ways. +# - The workers could be killed while evaluating a work item, which could +# be bad if the function being evaluated has external side-effects e.g. +# writing to a file. +# +# To work around this problem, an exit handler is installed which tells the +# workers to exit when their work queues are empty and then waits until the +# threads finish. + _thread_references = set() _shutdown = False @@ -23,10 +41,19 @@ def _python_exit(): thread.join() def _remove_dead_thread_references(): + """Remove inactive threads from _thread_references. + + Should be called periodically to prevent memory leaks in scenarios such as: + >>> while True: + >>> ... t = ThreadPoolExecutor(max_threads=5) + >>> ... t.map(int, ['1', '2', '3', '4', '5']) + """ for thread_reference in set(_thread_references): if thread_reference() is None: _thread_references.discard(thread_reference) +atexit.register(_python_exit) + class _WorkItem(object): def __init__(self, call, future, completion_tracker): self.call = call @@ -62,6 +89,10 @@ def _worker(executor_reference, work_queue): work_item = work_queue.get(block=True, timeout=0.1) except queue.Empty: executor = executor_reference() + # Exit if: + # - The interpreter is shutting down. + # - The executor that owns the worker has been collected. + # - The executor that owns the worker has been shutdown. if _shutdown or executor is None or executor._shutdown: return del executor @@ -109,5 +140,3 @@ class ThreadPoolExecutor(Executor): def shutdown(self): with self._shutdown_lock: self._shutdown = True - -atexit.register(_python_exit) -- cgit v1.2.1