summaryrefslogtreecommitdiff
path: root/python2/futures/thread.py
diff options
context:
space:
mode:
Diffstat (limited to 'python2/futures/thread.py')
-rw-r--r--python2/futures/thread.py129
1 files changed, 50 insertions, 79 deletions
diff --git a/python2/futures/thread.py b/python2/futures/thread.py
index 4071574..3f1584a 100644
--- a/python2/futures/thread.py
+++ b/python2/futures/thread.py
@@ -1,16 +1,12 @@
-# Copyright 2009 Brian Quinlan. All Rights Reserved. See LICENSE file.
+# Copyright 2009 Brian Quinlan. All Rights Reserved.
+# Licensed to PSF under a Contributor Agreement.
"""Implements ThreadPoolExecutor."""
__author__ = 'Brian Quinlan (brian@sweetapp.com)'
-from futures._base import (PENDING, RUNNING, CANCELLED,
- CANCELLED_AND_NOTIFIED, FINISHED,
- ALL_COMPLETED,
- LOGGER,
- set_future_exception, set_future_result,
- Executor, Future, FutureList, ThreadEventSink)
import atexit
+import _base
import Queue
import threading
import weakref
@@ -22,15 +18,15 @@ import weakref
# - The workers would still be running during interpretor shutdown,
# meaning that they would fail in unpredictable ways.
# - The workers could be killed while evaluating a work item, which could
-# be bad if the function being evaluated has external side-effects e.g.
+# be bad if the callable being evaluated has external side-effects e.g.
# writing to a file.
#
# To work around this problem, an exit handler is installed which tells the
-# workers to exit when their work queues are empty and then waits until they
-# finish.
+# workers to exit when their work queues are empty and then waits until the
+# threads finish.
-_thread_references = set() # Weakrefs to every active worker thread.
-_shutdown = False # Indicates that the interpreter is shutting down.
+_thread_references = set()
+_shutdown = False
def _python_exit():
global _shutdown
@@ -43,11 +39,10 @@ def _python_exit():
def _remove_dead_thread_references():
"""Remove inactive threads from _thread_references.
- Should be called periodically to prevent thread objects from accumulating in
- scenarios such as:
+ Should be called periodically to prevent memory leaks in scenarios such as:
>>> while True:
- >>> ... t = ThreadPoolExecutor(max_threads=5)
- >>> ... t.map(int, ['1', '2', '3', '4', '5'])
+ ... t = ThreadPoolExecutor(max_workers=5)
+ ... t.map(int, ['1', '2', '3', '4', '5'])
"""
for thread_reference in set(_thread_references):
if thread_reference() is None:
@@ -56,38 +51,22 @@ def _remove_dead_thread_references():
atexit.register(_python_exit)
class _WorkItem(object):
- def __init__(self, call, future, completion_tracker):
- self.call = call
+ def __init__(self, future, fn, args, kwargs):
self.future = future
- self.completion_tracker = completion_tracker
+ self.fn = fn
+ self.args = args
+ self.kwargs = kwargs
def run(self):
- self.future._condition.acquire()
- try:
- if self.future._state == PENDING:
- self.future._state = RUNNING
- elif self.future._state == CANCELLED:
- self.completion_tracker._condition.acquire()
- try:
- self.future._state = CANCELLED_AND_NOTIFIED
- self.completion_tracker.add_cancelled()
- return
- finally:
- self.completion_tracker._condition.release()
- else:
- LOGGER.critical('Future %s in unexpected state: %d',
- id(self.future),
- self.future._state)
- return
- finally:
- self.future._condition.release()
+ if not self.future.set_running_or_notify_cancel():
+ return
try:
- result = self.call()
- except Exception, e:
- set_future_exception(self.future, self.completion_tracker, e)
+ result = self.fn(*self.args, **self.kwargs)
+ except BaseException as e:
+ self.future.set_exception(e)
else:
- set_future_result(self.future, self.completion_tracker, result)
+ self.future.set_result(result)
def _worker(executor_reference, work_queue):
try:
@@ -105,61 +84,53 @@ def _worker(executor_reference, work_queue):
del executor
else:
work_item.run()
- except Exception, e:
- LOGGER.critical('Exception in worker', exc_info=True)
+ except BaseException as e:
+ _base.LOGGER.critical('Exception in worker', exc_info=True)
-class ThreadPoolExecutor(Executor):
- def __init__(self, max_threads):
+class ThreadPoolExecutor(_base.Executor):
+ def __init__(self, max_workers):
"""Initializes a new ThreadPoolExecutor instance.
Args:
- max_threads: The maximum number of threads that can be used to
+ max_workers: The maximum number of threads that can be used to
execute the given calls.
"""
_remove_dead_thread_references()
- self._max_threads = max_threads
+ self._max_workers = max_workers
self._work_queue = Queue.Queue()
self._threads = set()
self._shutdown = False
self._shutdown_lock = threading.Lock()
+ def submit(self, fn, *args, **kwargs):
+ with self._shutdown_lock:
+ if self._shutdown:
+ raise RuntimeError('cannot schedule new futures after shutdown')
+
+ f = _base.Future()
+ w = _WorkItem(f, fn, args, kwargs)
+
+ self._work_queue.put(w)
+ self._adjust_thread_count()
+ return f
+ submit.__doc__ = _base.Executor.submit.__doc__
+
def _adjust_thread_count(self):
- for _ in range(len(self._threads),
- min(self._max_threads, self._work_queue.qsize())):
+ # TODO(bquinlan): Should avoid creating new threads if there are more
+ # idle threads than items in the work queue.
+ if len(self._threads) < self._max_workers:
t = threading.Thread(target=_worker,
args=(weakref.ref(self), self._work_queue))
- t.setDaemon(True)
+ t.daemon = True
t.start()
self._threads.add(t)
_thread_references.add(weakref.ref(t))
- def run_to_futures(self, calls, timeout=None, return_when=ALL_COMPLETED):
- self._shutdown_lock.acquire()
- try:
- if self._shutdown:
- raise RuntimeError('cannot run new futures after shutdown')
-
- futures = []
- event_sink = ThreadEventSink()
- for index, call in enumerate(calls):
- f = Future(index)
- w = _WorkItem(call, f, event_sink)
- self._work_queue.put(w)
- futures.append(f)
-
- self._adjust_thread_count()
- fl = FutureList(futures, event_sink)
- fl.wait(timeout=timeout, return_when=return_when)
- return fl
- finally:
- self._shutdown_lock.release()
- run_to_futures.__doc__ = Executor.run_to_futures.__doc__
-
- def shutdown(self):
- self._shutdown_lock.acquire()
- try:
+ def shutdown(self, wait=True):
+ with self._shutdown_lock:
self._shutdown = True
- finally:
- self._shutdown_lock.release()
- shutdown.__doc__ = Executor.shutdown.__doc__
+ if wait:
+ for t in self._threads:
+ t.join()
+ shutdown.__doc__ = _base.Executor.shutdown.__doc__