summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorbrian.quinlan <devnull@localhost>2009-06-14 15:09:00 +0000
committerbrian.quinlan <devnull@localhost>2009-06-14 15:09:00 +0000
commitdfcd0a3976b4469c0fb97e55dbd3fe613ca4f8be (patch)
tree0f8018352f7a664b65faa2c21c938cc8d54dbb63
parent494b2789bbc092d057fa61001e7d133c8922f34f (diff)
downloadfutures-dfcd0a3976b4469c0fb97e55dbd3fe613ca4f8be.tar.gz
Some style fixes and documentation improvements.
-rw-r--r--python2/futures/__init__.py6
-rw-r--r--python2/futures/_base.py20
-rw-r--r--python2/futures/process.py6
-rw-r--r--python2/futures/thread.py41
-rw-r--r--python3/futures/__init__.py6
-rw-r--r--python3/futures/_base.py4
-rw-r--r--python3/futures/process.py6
-rw-r--r--python3/futures/thread.py35
8 files changed, 105 insertions, 19 deletions
diff --git a/python2/futures/__init__.py b/python2/futures/__init__.py
index 22f10db..27a5720 100644
--- a/python2/futures/__init__.py
+++ b/python2/futures/__init__.py
@@ -1,3 +1,9 @@
+# Copyright 2009 Brian Quinlan. All Rights Reserved. See LICENSE file.
+
+"""Execute computations asynchronously using threads or processes."""
+
+__author__ = 'Brian Quinlan (brian@sweetapp.com)'
+
from futures._base import (FIRST_COMPLETED, FIRST_EXCEPTION,
ALL_COMPLETED, RETURN_IMMEDIATELY,
CancelledError, TimeoutError,
diff --git a/python2/futures/_base.py b/python2/futures/_base.py
index 8d0841e..b60f19a 100644
--- a/python2/futures/_base.py
+++ b/python2/futures/_base.py
@@ -1,3 +1,7 @@
+# Copyright 2009 Brian Quinlan. All Rights Reserved. See LICENSE file.
+
+__author__ = 'Brian Quinlan (brian@sweetapp.com)'
+
import logging
import threading
import time
@@ -6,14 +10,14 @@ try:
from functools import partial
except ImportError:
def partial(func, *args, **keywords):
- def newfunc(*fargs, **fkeywords):
- newkeywords = keywords.copy()
- newkeywords.update(fkeywords)
- return func(*(args + fargs), **newkeywords)
- newfunc.func = func
- newfunc.args = args
- newfunc.keywords = keywords
- return newfunc
+ def newfunc(*fargs, **fkeywords):
+ newkeywords = keywords.copy()
+ newkeywords.update(fkeywords)
+ return func(*(args + fargs), **newkeywords)
+ newfunc.func = func
+ newfunc.args = args
+ newfunc.keywords = keywords
+ return newfunc
# The "any" and "all" builtins weren't introduced until Python 2.5.
try:
diff --git a/python2/futures/process.py b/python2/futures/process.py
index 463ee8b..03deb60 100644
--- a/python2/futures/process.py
+++ b/python2/futures/process.py
@@ -1,4 +1,8 @@
-#!/usr/bin/env python
+# Copyright 2009 Brian Quinlan. All Rights Reserved. See LICENSE file.
+
+"""Implements ProcessPoolExecutor."""
+
+__author__ = 'Brian Quinlan (brian@sweetapp.com)'
from futures._base import (PENDING, RUNNING, CANCELLED,
CANCELLED_AND_NOTIFIED, FINISHED,
diff --git a/python2/futures/thread.py b/python2/futures/thread.py
index 79e34cb..77a75d0 100644
--- a/python2/futures/thread.py
+++ b/python2/futures/thread.py
@@ -1,4 +1,8 @@
-#!/usr/bin/env python
+# Copyright 2009 Brian Quinlan. All Rights Reserved. See LICENSE file.
+
+"""Implements ThreadPoolExecutor."""
+
+__author__ = 'Brian Quinlan (brian@sweetapp.com)'
from futures._base import (PENDING, RUNNING, CANCELLED,
CANCELLED_AND_NOTIFIED, FINISHED,
@@ -11,8 +15,22 @@ import Queue
import threading
import weakref
-_thread_references = set()
-_shutdown = False
+# Workers are created as daemon threads. This is done to allow the interpreter
+# to exit when there are still idle threads in a ThreadPoolExecutor's thread
+# pool (i.e. shutdown() was not called). However, allowing workers to die with
+# the interpreter has two undesirable properties:
+# - The workers would still be running during interpretor shutdown,
+# meaning that they would fail in unpredictable ways.
+# - The workers could be killed while evaluating a work item, which could
+# be bad if the function being evaluated has external side-effects e.g.
+# writing to a file.
+#
+# To work around this problem, an exit handler is installed which tells the
+# workers to exit when their work queues are empty and then waits until the
+# threads finish.
+
+_thread_references = set() # Weakrefs to every active worker thread.
+_shutdown = False # Indicates that the interpreter is shutting down.
def _python_exit():
global _shutdown
@@ -23,10 +41,19 @@ def _python_exit():
thread.join()
def _remove_dead_thread_references():
+ """Remove inactive threads from _thread_references.
+
+ Should be called periodically to prevent memory leaks in scenarios such as:
+ >>> while True:
+ >>> ... t = ThreadPoolExecutor(max_threads=5)
+ >>> ... t.map(int, ['1', '2', '3', '4', '5'])
+ """
for thread_reference in set(_thread_references):
if thread_reference() is None:
_thread_references.discard(thread_reference)
+atexit.register(_python_exit)
+
class _WorkItem(object):
def __init__(self, call, future, completion_tracker):
self.call = call
@@ -68,6 +95,10 @@ def _worker(executor_reference, work_queue):
work_item = work_queue.get(block=True, timeout=0.1)
except Queue.Empty:
executor = executor_reference()
+ # Exit if:
+ # - The interpreter is shutting down.
+ # - The executor that owns the worker has been collected.
+ # - The executor that owns the worker has been shutdown.
if _shutdown or executor is None or executor._shutdown:
return
del executor
@@ -75,7 +106,7 @@ def _worker(executor_reference, work_queue):
work_item.run()
except Exception, e:
LOGGER.critical('Exception in worker', exc_info=True)
-
+
class ThreadPoolExecutor(Executor):
def __init__(self, max_threads):
_remove_dead_thread_references()
@@ -123,5 +154,3 @@ class ThreadPoolExecutor(Executor):
self._shutdown = True
finally:
self._shutdown_lock.release()
-
-atexit.register(_python_exit)
diff --git a/python3/futures/__init__.py b/python3/futures/__init__.py
index 5f599ad..86b67dc 100644
--- a/python3/futures/__init__.py
+++ b/python3/futures/__init__.py
@@ -1,3 +1,9 @@
+# Copyright 2009 Brian Quinlan. All Rights Reserved. See LICENSE file.
+
+"""Execute computations asynchronously using threads or processes."""
+
+__author__ = 'Brian Quinlan (brian@sweetapp.com)'
+
from futures._base import (FIRST_COMPLETED, FIRST_EXCEPTION,
ALL_COMPLETED, RETURN_IMMEDIATELY,
CancelledError, TimeoutError,
diff --git a/python3/futures/_base.py b/python3/futures/_base.py
index 19cabe8..3997ecc 100644
--- a/python3/futures/_base.py
+++ b/python3/futures/_base.py
@@ -1,3 +1,7 @@
+# Copyright 2009 Brian Quinlan. All Rights Reserved. See LICENSE file.
+
+__author__ = 'Brian Quinlan (brian@sweetapp.com)'
+
import functools
import logging
import threading
diff --git a/python3/futures/process.py b/python3/futures/process.py
index 6a386e5..94d7988 100644
--- a/python3/futures/process.py
+++ b/python3/futures/process.py
@@ -1,4 +1,8 @@
-#!/usr/bin/env python
+# Copyright 2009 Brian Quinlan. All Rights Reserved. See LICENSE file.
+
+"""Implements ProcessPoolExecutor."""
+
+__author__ = 'Brian Quinlan (brian@sweetapp.com)'
from futures._base import (PENDING, RUNNING, CANCELLED,
CANCELLED_AND_NOTIFIED, FINISHED,
diff --git a/python3/futures/thread.py b/python3/futures/thread.py
index 7f64372..9292863 100644
--- a/python3/futures/thread.py
+++ b/python3/futures/thread.py
@@ -1,4 +1,8 @@
-#!/usr/bin/env python
+# Copyright 2009 Brian Quinlan. All Rights Reserved. See LICENSE file.
+
+"""Implements ThreadPoolExecutor."""
+
+__author__ = 'Brian Quinlan (brian@sweetapp.com)'
from futures._base import (PENDING, RUNNING, CANCELLED,
CANCELLED_AND_NOTIFIED, FINISHED,
@@ -11,6 +15,20 @@ import queue
import threading
import weakref
+# Workers are created as daemon threads. This is done to allow the interpreter
+# to exit when there are still idle threads in a ThreadPoolExecutor's thread
+# pool (i.e. shutdown() was not called). However, allowing workers to die with
+# the interpreter has two undesirable properties:
+# - The workers would still be running during interpretor shutdown,
+# meaning that they would fail in unpredictable ways.
+# - The workers could be killed while evaluating a work item, which could
+# be bad if the function being evaluated has external side-effects e.g.
+# writing to a file.
+#
+# To work around this problem, an exit handler is installed which tells the
+# workers to exit when their work queues are empty and then waits until the
+# threads finish.
+
_thread_references = set()
_shutdown = False
@@ -23,10 +41,19 @@ def _python_exit():
thread.join()
def _remove_dead_thread_references():
+ """Remove inactive threads from _thread_references.
+
+ Should be called periodically to prevent memory leaks in scenarios such as:
+ >>> while True:
+ >>> ... t = ThreadPoolExecutor(max_threads=5)
+ >>> ... t.map(int, ['1', '2', '3', '4', '5'])
+ """
for thread_reference in set(_thread_references):
if thread_reference() is None:
_thread_references.discard(thread_reference)
+atexit.register(_python_exit)
+
class _WorkItem(object):
def __init__(self, call, future, completion_tracker):
self.call = call
@@ -62,6 +89,10 @@ def _worker(executor_reference, work_queue):
work_item = work_queue.get(block=True, timeout=0.1)
except queue.Empty:
executor = executor_reference()
+ # Exit if:
+ # - The interpreter is shutting down.
+ # - The executor that owns the worker has been collected.
+ # - The executor that owns the worker has been shutdown.
if _shutdown or executor is None or executor._shutdown:
return
del executor
@@ -109,5 +140,3 @@ class ThreadPoolExecutor(Executor):
def shutdown(self):
with self._shutdown_lock:
self._shutdown = True
-
-atexit.register(_python_exit)