From bf9ebc03603f3843f6dfd7d9bd8a084f1e1c171e Mon Sep 17 00:00:00 2001 From: Brian Quinlan Date: Sat, 17 May 2014 13:51:10 -0700 Subject: Issue #21362: concurrent.futures does not validate that max_workers is proper --- Lib/concurrent/futures/process.py | 3 +++ 1 file changed, 3 insertions(+) (limited to 'Lib/concurrent/futures/process.py') diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index 07b5225d1d..12993901e3 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -334,6 +334,9 @@ class ProcessPoolExecutor(_base.Executor): if max_workers is None: self._max_workers = os.cpu_count() or 1 else: + if max_workers <= 0: + raise ValueError("max_workers must be greater than 0") + self._max_workers = max_workers # Make the call queue slightly larger than the number of processes to -- cgit v1.2.1 From 72425d2c7a2fff49e6126f575a385b0da2c1efc8 Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Sat, 4 Oct 2014 20:20:10 +0200 Subject: Issue #11271: concurrent.futures.Executor.map() now takes a *chunksize* argument to allow batching of tasks in child processes and improve performance of ProcessPoolExecutor. Patch by Dan O'Reilly. --- Lib/concurrent/futures/process.py | 51 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 51 insertions(+) (limited to 'Lib/concurrent/futures/process.py') diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index 12993901e3..fc64dbe84b 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -55,6 +55,8 @@ from multiprocessing import SimpleQueue from multiprocessing.connection import wait import threading import weakref +from functools import partial +import itertools # Workers are created as daemon threads and processes. This is done to allow the # interpreter to exit when there are still idle processes in a @@ -108,6 +110,26 @@ class _CallItem(object): self.args = args self.kwargs = kwargs +def _get_chunks(*iterables, chunksize): + """ Iterates over zip()ed iterables in chunks. """ + it = zip(*iterables) + while True: + chunk = tuple(itertools.islice(it, chunksize)) + if not chunk: + return + yield chunk + +def _process_chunk(fn, chunk): + """ Processes a chunk of an iterable passed to map. + + Runs the function passed to map() on a chunk of the + iterable passed to map. + + This function is run in a separate process. + + """ + return [fn(*args) for args in chunk] + def _process_worker(call_queue, result_queue): """Evaluates calls from call_queue and places the results in result_queue. @@ -411,6 +433,35 @@ class ProcessPoolExecutor(_base.Executor): return f submit.__doc__ = _base.Executor.submit.__doc__ + def map(self, fn, *iterables, timeout=None, chunksize=1): + """Returns a iterator equivalent to map(fn, iter). + + Args: + fn: A callable that will take as many arguments as there are + passed iterables. + timeout: The maximum number of seconds to wait. If None, then there + is no limit on the wait time. + chunksize: If greater than one, the iterables will be chopped into + chunks of size chunksize and submitted to the process pool. + If set to one, the items in the list will be sent one at a time. + + Returns: + An iterator equivalent to: map(func, *iterables) but the calls may + be evaluated out-of-order. + + Raises: + TimeoutError: If the entire result iterator could not be generated + before the given timeout. + Exception: If fn(*args) raises for any values. + """ + if chunksize < 1: + raise ValueError("chunksize must be >= 1.") + + results = super().map(partial(_process_chunk, fn), + _get_chunks(*iterables, chunksize=chunksize), + timeout=timeout) + return itertools.chain.from_iterable(results) + def shutdown(self, wait=True): with self._shutdown_lock: self._shutdown_thread = True -- cgit v1.2.1 From 08691050e848444c15cbb30eb4814bedf226cd5e Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Sat, 17 Jan 2015 20:02:14 +0100 Subject: Issue #21817: When an exception is raised in a task submitted to a ProcessPoolExecutor, the remote traceback is now displayed in the parent process. Patch by Claudiu Popa. --- Lib/concurrent/futures/process.py | 26 ++++++++++++++++++++++++-- 1 file changed, 24 insertions(+), 2 deletions(-) (limited to 'Lib/concurrent/futures/process.py') diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index fc64dbe84b..3dd6da1f0c 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -57,6 +57,7 @@ import threading import weakref from functools import partial import itertools +import traceback # Workers are created as daemon threads and processes. This is done to allow the # interpreter to exit when there are still idle processes in a @@ -90,6 +91,27 @@ def _python_exit(): # (Futures in the call queue cannot be cancelled). EXTRA_QUEUED_CALLS = 1 +# Hack to embed stringification of remote traceback in local traceback + +class _RemoteTraceback(Exception): + def __init__(self, tb): + self.tb = tb + def __str__(self): + return self.tb + +class _ExceptionWithTraceback: + def __init__(self, exc, tb): + tb = traceback.format_exception(type(exc), exc, tb) + tb = ''.join(tb) + self.exc = exc + self.tb = '\n"""\n%s"""' % tb + def __reduce__(self): + return _rebuild_exc, (self.exc, self.tb) + +def _rebuild_exc(exc, tb): + exc.__cause__ = _RemoteTraceback(tb) + return exc + class _WorkItem(object): def __init__(self, future, fn, args, kwargs): self.future = future @@ -152,8 +174,8 @@ def _process_worker(call_queue, result_queue): try: r = call_item.fn(*call_item.args, **call_item.kwargs) except BaseException as e: - result_queue.put(_ResultItem(call_item.work_id, - exception=e)) + exc = _ExceptionWithTraceback(e, e.__traceback__) + result_queue.put(_ResultItem(call_item.work_id, exception=exc)) else: result_queue.put(_ResultItem(call_item.work_id, result=r)) -- cgit v1.2.1 From 109322945b32e6622d17c244f5c4b3e1b8b31878 Mon Sep 17 00:00:00 2001 From: Martin Panter Date: Mon, 2 Nov 2015 04:20:33 +0000 Subject: Issue #25523: Further a-to-an corrections new in 3.5 --- Lib/concurrent/futures/process.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'Lib/concurrent/futures/process.py') diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index 3dd6da1f0c..590edba24e 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -456,7 +456,7 @@ class ProcessPoolExecutor(_base.Executor): submit.__doc__ = _base.Executor.submit.__doc__ def map(self, fn, *iterables, timeout=None, chunksize=1): - """Returns a iterator equivalent to map(fn, iter). + """Returns an iterator equivalent to map(fn, iter). Args: fn: A callable that will take as many arguments as there are -- cgit v1.2.1