summaryrefslogtreecommitdiff
path: root/python2/futures/thread.py
diff options
context:
space:
mode:
Diffstat (limited to 'python2/futures/thread.py')
-rw-r--r--python2/futures/thread.py136
1 files changed, 0 insertions, 136 deletions
diff --git a/python2/futures/thread.py b/python2/futures/thread.py
deleted file mode 100644
index 3f1584a..0000000
--- a/python2/futures/thread.py
+++ /dev/null
@@ -1,136 +0,0 @@
-# Copyright 2009 Brian Quinlan. All Rights Reserved.
-# Licensed to PSF under a Contributor Agreement.
-
-"""Implements ThreadPoolExecutor."""
-
-__author__ = 'Brian Quinlan (brian@sweetapp.com)'
-
-import atexit
-import _base
-import Queue
-import threading
-import weakref
-
-# Workers are created as daemon threads. This is done to allow the interpreter
-# to exit when there are still idle threads in a ThreadPoolExecutor's thread
-# pool (i.e. shutdown() was not called). However, allowing workers to die with
-# the interpreter has two undesirable properties:
-# - The workers would still be running during interpretor shutdown,
-# meaning that they would fail in unpredictable ways.
-# - The workers could be killed while evaluating a work item, which could
-# be bad if the callable being evaluated has external side-effects e.g.
-# writing to a file.
-#
-# To work around this problem, an exit handler is installed which tells the
-# workers to exit when their work queues are empty and then waits until the
-# threads finish.
-
-_thread_references = set()
-_shutdown = False
-
-def _python_exit():
- global _shutdown
- _shutdown = True
- for thread_reference in _thread_references:
- thread = thread_reference()
- if thread is not None:
- thread.join()
-
-def _remove_dead_thread_references():
- """Remove inactive threads from _thread_references.
-
- Should be called periodically to prevent memory leaks in scenarios such as:
- >>> while True:
- ... t = ThreadPoolExecutor(max_workers=5)
- ... t.map(int, ['1', '2', '3', '4', '5'])
- """
- for thread_reference in set(_thread_references):
- if thread_reference() is None:
- _thread_references.discard(thread_reference)
-
-atexit.register(_python_exit)
-
-class _WorkItem(object):
- def __init__(self, future, fn, args, kwargs):
- self.future = future
- self.fn = fn
- self.args = args
- self.kwargs = kwargs
-
- def run(self):
- if not self.future.set_running_or_notify_cancel():
- return
-
- try:
- result = self.fn(*self.args, **self.kwargs)
- except BaseException as e:
- self.future.set_exception(e)
- else:
- self.future.set_result(result)
-
-def _worker(executor_reference, work_queue):
- try:
- while True:
- try:
- work_item = work_queue.get(block=True, timeout=0.1)
- except Queue.Empty:
- executor = executor_reference()
- # Exit if:
- # - The interpreter is shutting down OR
- # - The executor that owns the worker has been collected OR
- # - The executor that owns the worker has been shutdown.
- if _shutdown or executor is None or executor._shutdown:
- return
- del executor
- else:
- work_item.run()
- except BaseException as e:
- _base.LOGGER.critical('Exception in worker', exc_info=True)
-
-class ThreadPoolExecutor(_base.Executor):
- def __init__(self, max_workers):
- """Initializes a new ThreadPoolExecutor instance.
-
- Args:
- max_workers: The maximum number of threads that can be used to
- execute the given calls.
- """
- _remove_dead_thread_references()
-
- self._max_workers = max_workers
- self._work_queue = Queue.Queue()
- self._threads = set()
- self._shutdown = False
- self._shutdown_lock = threading.Lock()
-
- def submit(self, fn, *args, **kwargs):
- with self._shutdown_lock:
- if self._shutdown:
- raise RuntimeError('cannot schedule new futures after shutdown')
-
- f = _base.Future()
- w = _WorkItem(f, fn, args, kwargs)
-
- self._work_queue.put(w)
- self._adjust_thread_count()
- return f
- submit.__doc__ = _base.Executor.submit.__doc__
-
- def _adjust_thread_count(self):
- # TODO(bquinlan): Should avoid creating new threads if there are more
- # idle threads than items in the work queue.
- if len(self._threads) < self._max_workers:
- t = threading.Thread(target=_worker,
- args=(weakref.ref(self), self._work_queue))
- t.daemon = True
- t.start()
- self._threads.add(t)
- _thread_references.add(weakref.ref(t))
-
- def shutdown(self, wait=True):
- with self._shutdown_lock:
- self._shutdown = True
- if wait:
- for t in self._threads:
- t.join()
- shutdown.__doc__ = _base.Executor.shutdown.__doc__