diff options
Diffstat (limited to 'python2/futures/thread.py')
-rw-r--r-- | python2/futures/thread.py | 129 |
1 files changed, 50 insertions, 79 deletions
diff --git a/python2/futures/thread.py b/python2/futures/thread.py index 4071574..3f1584a 100644 --- a/python2/futures/thread.py +++ b/python2/futures/thread.py @@ -1,16 +1,12 @@ -# Copyright 2009 Brian Quinlan. All Rights Reserved. See LICENSE file. +# Copyright 2009 Brian Quinlan. All Rights Reserved. +# Licensed to PSF under a Contributor Agreement. """Implements ThreadPoolExecutor.""" __author__ = 'Brian Quinlan (brian@sweetapp.com)' -from futures._base import (PENDING, RUNNING, CANCELLED, - CANCELLED_AND_NOTIFIED, FINISHED, - ALL_COMPLETED, - LOGGER, - set_future_exception, set_future_result, - Executor, Future, FutureList, ThreadEventSink) import atexit +import _base import Queue import threading import weakref @@ -22,15 +18,15 @@ import weakref # - The workers would still be running during interpretor shutdown, # meaning that they would fail in unpredictable ways. # - The workers could be killed while evaluating a work item, which could -# be bad if the function being evaluated has external side-effects e.g. +# be bad if the callable being evaluated has external side-effects e.g. # writing to a file. # # To work around this problem, an exit handler is installed which tells the -# workers to exit when their work queues are empty and then waits until they -# finish. +# workers to exit when their work queues are empty and then waits until the +# threads finish. -_thread_references = set() # Weakrefs to every active worker thread. -_shutdown = False # Indicates that the interpreter is shutting down. +_thread_references = set() +_shutdown = False def _python_exit(): global _shutdown @@ -43,11 +39,10 @@ def _python_exit(): def _remove_dead_thread_references(): """Remove inactive threads from _thread_references. - Should be called periodically to prevent thread objects from accumulating in - scenarios such as: + Should be called periodically to prevent memory leaks in scenarios such as: >>> while True: - >>> ... t = ThreadPoolExecutor(max_threads=5) - >>> ... t.map(int, ['1', '2', '3', '4', '5']) + ... t = ThreadPoolExecutor(max_workers=5) + ... t.map(int, ['1', '2', '3', '4', '5']) """ for thread_reference in set(_thread_references): if thread_reference() is None: @@ -56,38 +51,22 @@ def _remove_dead_thread_references(): atexit.register(_python_exit) class _WorkItem(object): - def __init__(self, call, future, completion_tracker): - self.call = call + def __init__(self, future, fn, args, kwargs): self.future = future - self.completion_tracker = completion_tracker + self.fn = fn + self.args = args + self.kwargs = kwargs def run(self): - self.future._condition.acquire() - try: - if self.future._state == PENDING: - self.future._state = RUNNING - elif self.future._state == CANCELLED: - self.completion_tracker._condition.acquire() - try: - self.future._state = CANCELLED_AND_NOTIFIED - self.completion_tracker.add_cancelled() - return - finally: - self.completion_tracker._condition.release() - else: - LOGGER.critical('Future %s in unexpected state: %d', - id(self.future), - self.future._state) - return - finally: - self.future._condition.release() + if not self.future.set_running_or_notify_cancel(): + return try: - result = self.call() - except Exception, e: - set_future_exception(self.future, self.completion_tracker, e) + result = self.fn(*self.args, **self.kwargs) + except BaseException as e: + self.future.set_exception(e) else: - set_future_result(self.future, self.completion_tracker, result) + self.future.set_result(result) def _worker(executor_reference, work_queue): try: @@ -105,61 +84,53 @@ def _worker(executor_reference, work_queue): del executor else: work_item.run() - except Exception, e: - LOGGER.critical('Exception in worker', exc_info=True) + except BaseException as e: + _base.LOGGER.critical('Exception in worker', exc_info=True) -class ThreadPoolExecutor(Executor): - def __init__(self, max_threads): +class ThreadPoolExecutor(_base.Executor): + def __init__(self, max_workers): """Initializes a new ThreadPoolExecutor instance. Args: - max_threads: The maximum number of threads that can be used to + max_workers: The maximum number of threads that can be used to execute the given calls. """ _remove_dead_thread_references() - self._max_threads = max_threads + self._max_workers = max_workers self._work_queue = Queue.Queue() self._threads = set() self._shutdown = False self._shutdown_lock = threading.Lock() + def submit(self, fn, *args, **kwargs): + with self._shutdown_lock: + if self._shutdown: + raise RuntimeError('cannot schedule new futures after shutdown') + + f = _base.Future() + w = _WorkItem(f, fn, args, kwargs) + + self._work_queue.put(w) + self._adjust_thread_count() + return f + submit.__doc__ = _base.Executor.submit.__doc__ + def _adjust_thread_count(self): - for _ in range(len(self._threads), - min(self._max_threads, self._work_queue.qsize())): + # TODO(bquinlan): Should avoid creating new threads if there are more + # idle threads than items in the work queue. + if len(self._threads) < self._max_workers: t = threading.Thread(target=_worker, args=(weakref.ref(self), self._work_queue)) - t.setDaemon(True) + t.daemon = True t.start() self._threads.add(t) _thread_references.add(weakref.ref(t)) - def run_to_futures(self, calls, timeout=None, return_when=ALL_COMPLETED): - self._shutdown_lock.acquire() - try: - if self._shutdown: - raise RuntimeError('cannot run new futures after shutdown') - - futures = [] - event_sink = ThreadEventSink() - for index, call in enumerate(calls): - f = Future(index) - w = _WorkItem(call, f, event_sink) - self._work_queue.put(w) - futures.append(f) - - self._adjust_thread_count() - fl = FutureList(futures, event_sink) - fl.wait(timeout=timeout, return_when=return_when) - return fl - finally: - self._shutdown_lock.release() - run_to_futures.__doc__ = Executor.run_to_futures.__doc__ - - def shutdown(self): - self._shutdown_lock.acquire() - try: + def shutdown(self, wait=True): + with self._shutdown_lock: self._shutdown = True - finally: - self._shutdown_lock.release() - shutdown.__doc__ = Executor.shutdown.__doc__ + if wait: + for t in self._threads: + t.join() + shutdown.__doc__ = _base.Executor.shutdown.__doc__ |