diff options
Diffstat (limited to 'python2/futures/thread.py')
-rw-r--r-- | python2/futures/thread.py | 136 |
1 files changed, 0 insertions, 136 deletions
diff --git a/python2/futures/thread.py b/python2/futures/thread.py deleted file mode 100644 index 3f1584a..0000000 --- a/python2/futures/thread.py +++ /dev/null @@ -1,136 +0,0 @@ -# Copyright 2009 Brian Quinlan. All Rights Reserved. -# Licensed to PSF under a Contributor Agreement. - -"""Implements ThreadPoolExecutor.""" - -__author__ = 'Brian Quinlan (brian@sweetapp.com)' - -import atexit -import _base -import Queue -import threading -import weakref - -# Workers are created as daemon threads. This is done to allow the interpreter -# to exit when there are still idle threads in a ThreadPoolExecutor's thread -# pool (i.e. shutdown() was not called). However, allowing workers to die with -# the interpreter has two undesirable properties: -# - The workers would still be running during interpretor shutdown, -# meaning that they would fail in unpredictable ways. -# - The workers could be killed while evaluating a work item, which could -# be bad if the callable being evaluated has external side-effects e.g. -# writing to a file. -# -# To work around this problem, an exit handler is installed which tells the -# workers to exit when their work queues are empty and then waits until the -# threads finish. - -_thread_references = set() -_shutdown = False - -def _python_exit(): - global _shutdown - _shutdown = True - for thread_reference in _thread_references: - thread = thread_reference() - if thread is not None: - thread.join() - -def _remove_dead_thread_references(): - """Remove inactive threads from _thread_references. - - Should be called periodically to prevent memory leaks in scenarios such as: - >>> while True: - ... t = ThreadPoolExecutor(max_workers=5) - ... t.map(int, ['1', '2', '3', '4', '5']) - """ - for thread_reference in set(_thread_references): - if thread_reference() is None: - _thread_references.discard(thread_reference) - -atexit.register(_python_exit) - -class _WorkItem(object): - def __init__(self, future, fn, args, kwargs): - self.future = future - self.fn = fn - self.args = args - self.kwargs = kwargs - - def run(self): - if not self.future.set_running_or_notify_cancel(): - return - - try: - result = self.fn(*self.args, **self.kwargs) - except BaseException as e: - self.future.set_exception(e) - else: - self.future.set_result(result) - -def _worker(executor_reference, work_queue): - try: - while True: - try: - work_item = work_queue.get(block=True, timeout=0.1) - except Queue.Empty: - executor = executor_reference() - # Exit if: - # - The interpreter is shutting down OR - # - The executor that owns the worker has been collected OR - # - The executor that owns the worker has been shutdown. - if _shutdown or executor is None or executor._shutdown: - return - del executor - else: - work_item.run() - except BaseException as e: - _base.LOGGER.critical('Exception in worker', exc_info=True) - -class ThreadPoolExecutor(_base.Executor): - def __init__(self, max_workers): - """Initializes a new ThreadPoolExecutor instance. - - Args: - max_workers: The maximum number of threads that can be used to - execute the given calls. - """ - _remove_dead_thread_references() - - self._max_workers = max_workers - self._work_queue = Queue.Queue() - self._threads = set() - self._shutdown = False - self._shutdown_lock = threading.Lock() - - def submit(self, fn, *args, **kwargs): - with self._shutdown_lock: - if self._shutdown: - raise RuntimeError('cannot schedule new futures after shutdown') - - f = _base.Future() - w = _WorkItem(f, fn, args, kwargs) - - self._work_queue.put(w) - self._adjust_thread_count() - return f - submit.__doc__ = _base.Executor.submit.__doc__ - - def _adjust_thread_count(self): - # TODO(bquinlan): Should avoid creating new threads if there are more - # idle threads than items in the work queue. - if len(self._threads) < self._max_workers: - t = threading.Thread(target=_worker, - args=(weakref.ref(self), self._work_queue)) - t.daemon = True - t.start() - self._threads.add(t) - _thread_references.add(weakref.ref(t)) - - def shutdown(self, wait=True): - with self._shutdown_lock: - self._shutdown = True - if wait: - for t in self._threads: - t.join() - shutdown.__doc__ = _base.Executor.shutdown.__doc__ |