summaryrefslogtreecommitdiff
path: root/python2/futures/process.py
diff options
context:
space:
mode:
Diffstat (limited to 'python2/futures/process.py')
-rw-r--r--python2/futures/process.py132
1 files changed, 62 insertions, 70 deletions
diff --git a/python2/futures/process.py b/python2/futures/process.py
index 7f1b153..ec48377 100644
--- a/python2/futures/process.py
+++ b/python2/futures/process.py
@@ -1,4 +1,5 @@
-# Copyright 2009 Brian Quinlan. All Rights Reserved. See LICENSE file.
+# Copyright 2009 Brian Quinlan. All Rights Reserved.
+# Licensed to PSF under a Contributor Agreement.
"""Implements ProcessPoolExecutor.
@@ -23,9 +24,8 @@ The follow diagram and text describe the data-flow through the system:
| | | ... | | | | 3, except | | |
+----------+ +------------+ +--------+ +-----------+ +---------+
-Executor.run_to_futures() called:
-- creates a uniquely numbered _WorkItem for each call and adds them to the
- "Work Items" dict
+Executor.submit() called:
+- creates a uniquely numbered _WorkItem and adds it to the "Work Items" dict
- adds the id of the _WorkItem to the "Work Ids" queue
Local worker thread:
@@ -42,15 +42,11 @@ Process #1..n:
- reads _CallItems from "Call Q", executes the calls, and puts the resulting
_ResultItems in "Request Q"
"""
-
+
__author__ = 'Brian Quinlan (brian@sweetapp.com)'
-from futures._base import (PENDING, RUNNING, CANCELLED,
- CANCELLED_AND_NOTIFIED, FINISHED,
- ALL_COMPLETED,
- set_future_exception, set_future_result,
- Executor, Future, FutureList, ThreadEventSink)
import atexit
+import _base
import Queue
import multiprocessing
import threading
@@ -63,7 +59,7 @@ import weakref
# - The workers would still be running during interpretor shutdown,
# meaning that they would fail in unpredictable ways.
# - The workers could be killed while evaluating a work item, which could
-# be bad if the function being evaluated has external side-effects e.g.
+# be bad if the callable being evaluated has external side-effects e.g.
# writing to a file.
#
# To work around this problem, an exit handler is installed which tells the
@@ -86,7 +82,7 @@ def _remove_dead_thread_references():
Should be called periodically to prevent memory leaks in scenarios such as:
>>> while True:
- >>> ... t = ThreadPoolExecutor(max_threads=5)
+ >>> ... t = ThreadPoolExecutor(max_workers=5)
>>> ... t.map(int, ['1', '2', '3', '4', '5'])
"""
for thread_reference in set(_thread_references):
@@ -100,10 +96,11 @@ def _remove_dead_thread_references():
EXTRA_QUEUED_CALLS = 1
class _WorkItem(object):
- def __init__(self, call, future, completion_tracker):
- self.call = call
+ def __init__(self, future, fn, args, kwargs):
self.future = future
- self.completion_tracker = completion_tracker
+ self.fn = fn
+ self.args = args
+ self.kwargs = kwargs
class _ResultItem(object):
def __init__(self, work_id, exception=None, result=None):
@@ -112,9 +109,11 @@ class _ResultItem(object):
self.result = result
class _CallItem(object):
- def __init__(self, work_id, call):
+ def __init__(self, work_id, fn, args, kwargs):
self.work_id = work_id
- self.call = call
+ self.fn = fn
+ self.args = args
+ self.kwargs = kwargs
def _process_worker(call_queue, result_queue, shutdown):
"""Evaluates calls from call_queue and places the results in result_queue.
@@ -137,8 +136,8 @@ def _process_worker(call_queue, result_queue, shutdown):
return
else:
try:
- r = call_item.call()
- except Exception, e:
+ r = call_item.fn(*call_item.args, **call_item.kwargs)
+ except BaseException as e:
result_queue.put(_ResultItem(call_item.work_id,
exception=e))
else:
@@ -172,19 +171,15 @@ def _add_call_item_to_queue(pending_work_items,
else:
work_item = pending_work_items[work_id]
- if work_item.future.cancelled():
- work_item.future._condition.acquire()
- work_item.future._condition.notify_all()
- work_item.future._condition.release()
-
- work_item.completion_tracker.add_cancelled()
+ if work_item.future.set_running_or_notify_cancel():
+ call_queue.put(_CallItem(work_id,
+ work_item.fn,
+ work_item.args,
+ work_item.kwargs),
+ block=True)
+ else:
del pending_work_items[work_id]
continue
- else:
- work_item.future._condition.acquire()
- work_item.future._state = RUNNING
- work_item.future._condition.release()
- call_queue.put(_CallItem(work_id, work_item.call), block=True)
def _queue_manangement_worker(executor_reference,
processes,
@@ -218,6 +213,7 @@ def _queue_manangement_worker(executor_reference,
_add_call_item_to_queue(pending_work_items,
work_ids_queue,
call_queue)
+
try:
result_item = result_queue.get(block=True, timeout=0.1)
except Queue.Empty:
@@ -244,34 +240,30 @@ def _queue_manangement_worker(executor_reference,
del pending_work_items[result_item.work_id]
if result_item.exception:
- set_future_exception(work_item.future,
- work_item.completion_tracker,
- result_item.exception)
+ work_item.future.set_exception(result_item.exception)
else:
- set_future_result(work_item.future,
- work_item.completion_tracker,
- result_item.result)
+ work_item.future.set_result(result_item.result)
-class ProcessPoolExecutor(Executor):
- def __init__(self, max_processes=None):
+class ProcessPoolExecutor(_base.Executor):
+ def __init__(self, max_workers=None):
"""Initializes a new ProcessPoolExecutor instance.
Args:
- max_processes: The maximum number of processes that can be used to
+ max_workers: The maximum number of processes that can be used to
execute the given calls. If None or not given then as many
worker processes will be created as the machine has processors.
"""
_remove_dead_thread_references()
- if max_processes is None:
- self._max_processes = multiprocessing.cpu_count()
+ if max_workers is None:
+ self._max_workers = multiprocessing.cpu_count()
else:
- self._max_processes = max_processes
+ self._max_workers = max_workers
# Make the call queue slightly larger than the number of processes to
# prevent the worker processes from idling. But don't make it too big
# because futures in the call queue cannot be cancelled.
- self._call_queue = multiprocessing.Queue(self._max_processes +
+ self._call_queue = multiprocessing.Queue(self._max_workers +
EXTRA_QUEUED_CALLS)
self._result_queue = multiprocessing.Queue()
self._work_ids = Queue.Queue()
@@ -296,12 +288,12 @@ class ProcessPoolExecutor(Executor):
self._call_queue,
self._result_queue,
self._shutdown_process_event))
- self._queue_management_thread.setDaemon(True)
+ self._queue_management_thread.daemon = True
self._queue_management_thread.start()
_thread_references.add(weakref.ref(self._queue_management_thread))
def _adjust_process_count(self):
- for _ in range(len(self._processes), self._max_processes):
+ for _ in range(len(self._processes), self._max_workers):
p = multiprocessing.Process(
target=_process_worker,
args=(self._call_queue,
@@ -310,36 +302,36 @@ class ProcessPoolExecutor(Executor):
p.start()
self._processes.add(p)
- def run_to_futures(self, calls, timeout=None, return_when=ALL_COMPLETED):
- self._shutdown_lock.acquire()
- try:
+ def submit(self, fn, *args, **kwargs):
+ with self._shutdown_lock:
if self._shutdown_thread:
- raise RuntimeError('cannot run new futures after shutdown')
+ raise RuntimeError('cannot schedule new futures after shutdown')
- futures = []
- event_sink = ThreadEventSink()
+ f = _base.Future()
+ w = _WorkItem(f, fn, args, kwargs)
- for index, call in enumerate(calls):
- f = Future(index)
- self._pending_work_items[self._queue_count] = _WorkItem(
- call, f, event_sink)
- self._work_ids.put(self._queue_count)
- futures.append(f)
- self._queue_count += 1
+ self._pending_work_items[self._queue_count] = w
+ self._work_ids.put(self._queue_count)
+ self._queue_count += 1
self._start_queue_management_thread()
self._adjust_process_count()
- fl = FutureList(futures, event_sink)
- fl.wait(timeout=timeout, return_when=return_when)
- return fl
- finally:
- self._shutdown_lock.release()
-
- def shutdown(self):
- self._shutdown_lock.acquire()
- try:
+ return f
+ submit.__doc__ = _base.Executor.submit.__doc__
+
+ def shutdown(self, wait=True):
+ with self._shutdown_lock:
self._shutdown_thread = True
- finally:
- self._shutdown_lock.release()
+ if wait:
+ if self._queue_management_thread:
+ self._queue_management_thread.join()
+ # To reduce the risk of openning too many files, remove references to
+ # objects that use file descriptors.
+ self._queue_management_thread = None
+ self._call_queue = None
+ self._result_queue = None
+ self._shutdown_process_event = None
+ self._processes = None
+ shutdown.__doc__ = _base.Executor.shutdown.__doc__
-atexit.register(_python_exit) \ No newline at end of file
+atexit.register(_python_exit)