diff options
Diffstat (limited to 'Lib/multiprocessing')
-rw-r--r-- | Lib/multiprocessing/__init__.py | 11 | ||||
-rw-r--r-- | Lib/multiprocessing/connection.py | 28 | ||||
-rw-r--r-- | Lib/multiprocessing/dummy/__init__.py | 28 | ||||
-rw-r--r-- | Lib/multiprocessing/dummy/connection.py | 28 | ||||
-rw-r--r-- | Lib/multiprocessing/forking.py | 40 | ||||
-rw-r--r-- | Lib/multiprocessing/heap.py | 28 | ||||
-rw-r--r-- | Lib/multiprocessing/managers.py | 47 | ||||
-rw-r--r-- | Lib/multiprocessing/pool.py | 138 | ||||
-rw-r--r-- | Lib/multiprocessing/process.py | 30 | ||||
-rw-r--r-- | Lib/multiprocessing/queues.py | 28 | ||||
-rw-r--r-- | Lib/multiprocessing/reduction.py | 28 | ||||
-rw-r--r-- | Lib/multiprocessing/sharedctypes.py | 34 | ||||
-rw-r--r-- | Lib/multiprocessing/synchronize.py | 49 | ||||
-rw-r--r-- | Lib/multiprocessing/util.py | 28 |
14 files changed, 495 insertions, 50 deletions
diff --git a/Lib/multiprocessing/__init__.py b/Lib/multiprocessing/__init__.py index 4fb0bd0bf5..4963293bd9 100644 --- a/Lib/multiprocessing/__init__.py +++ b/Lib/multiprocessing/__init__.py @@ -38,6 +38,7 @@ # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF +# SUCH DAMAGE. # __version__ = '0.70a1' @@ -115,8 +116,12 @@ def cpu_count(): except (ValueError, KeyError): num = 0 elif 'bsd' in sys.platform or sys.platform == 'darwin': + comm = '/sbin/sysctl -n hw.ncpu' + if sys.platform == 'darwin': + comm = '/usr' + comm try: - num = int(os.popen('sysctl -n hw.ncpu').read()) + with os.popen(comm) as p: + num = int(p.read()) except ValueError: num = 0 else: @@ -219,12 +224,12 @@ def JoinableQueue(maxsize=0): from multiprocessing.queues import JoinableQueue return JoinableQueue(maxsize) -def Pool(processes=None, initializer=None, initargs=()): +def Pool(processes=None, initializer=None, initargs=(), maxtasksperchild=None): ''' Returns a process pool object ''' from multiprocessing.pool import Pool - return Pool(processes, initializer, initargs) + return Pool(processes, initializer, initargs, maxtasksperchild) def RawValue(typecode_or_type, *args): ''' diff --git a/Lib/multiprocessing/connection.py b/Lib/multiprocessing/connection.py index 99b338edb1..090796ead3 100644 --- a/Lib/multiprocessing/connection.py +++ b/Lib/multiprocessing/connection.py @@ -3,7 +3,33 @@ # # multiprocessing/connection.py # -# Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt +# Copyright (c) 2006-2008, R Oudkerk +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions +# are met: +# +# 1. Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# 2. Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in the +# documentation and/or other materials provided with the distribution. +# 3. Neither the name of author nor the names of any contributors may be +# used to endorse or promote products derived from this software +# without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND +# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS +# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) +# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT +# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY +# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF +# SUCH DAMAGE. # __all__ = [ 'Client', 'Listener', 'Pipe' ] diff --git a/Lib/multiprocessing/dummy/__init__.py b/Lib/multiprocessing/dummy/__init__.py index 30b1b20f56..19a3b697c2 100644 --- a/Lib/multiprocessing/dummy/__init__.py +++ b/Lib/multiprocessing/dummy/__init__.py @@ -3,7 +3,33 @@ # # multiprocessing/dummy/__init__.py # -# Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt +# Copyright (c) 2006-2008, R Oudkerk +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions +# are met: +# +# 1. Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# 2. Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in the +# documentation and/or other materials provided with the distribution. +# 3. Neither the name of author nor the names of any contributors may be +# used to endorse or promote products derived from this software +# without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND +# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS +# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) +# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT +# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY +# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF +# SUCH DAMAGE. # __all__ = [ diff --git a/Lib/multiprocessing/dummy/connection.py b/Lib/multiprocessing/dummy/connection.py index 4f0a6805de..50dc9ffe73 100644 --- a/Lib/multiprocessing/dummy/connection.py +++ b/Lib/multiprocessing/dummy/connection.py @@ -3,7 +3,33 @@ # # multiprocessing/dummy/connection.py # -# Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt +# Copyright (c) 2006-2008, R Oudkerk +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions +# are met: +# +# 1. Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# 2. Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in the +# documentation and/or other materials provided with the distribution. +# 3. Neither the name of author nor the names of any contributors may be +# used to endorse or promote products derived from this software +# without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND +# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS +# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) +# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT +# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY +# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF +# SUCH DAMAGE. # __all__ = [ 'Client', 'Listener', 'Pipe' ] diff --git a/Lib/multiprocessing/forking.py b/Lib/multiprocessing/forking.py index cec184c748..3fca8b1132 100644 --- a/Lib/multiprocessing/forking.py +++ b/Lib/multiprocessing/forking.py @@ -3,7 +3,33 @@ # # multiprocessing/forking.py # -# Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt +# Copyright (c) 2006-2008, R Oudkerk +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions +# are met: +# +# 1. Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# 2. Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in the +# documentation and/or other materials provided with the distribution. +# 3. Neither the name of author nor the names of any contributors may be +# used to endorse or promote products derived from this software +# without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND +# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS +# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) +# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT +# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY +# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF +# SUCH DAMAGE. # import os @@ -103,7 +129,12 @@ if sys.platform != 'win32': def poll(self, flag=os.WNOHANG): if self.returncode is None: - pid, sts = os.waitpid(self.pid, flag) + try: + pid, sts = os.waitpid(self.pid, flag) + except os.error: + # Child process not yet created. See #1731717 + # e.errno == errno.ECHILD == 10 + return None if pid == self.pid: if os.WIFSIGNALED(sts): self.returncode = -os.WTERMSIG(sts) @@ -167,6 +198,7 @@ else: TERMINATE = 0x10000 WINEXE = (sys.platform == 'win32' and getattr(sys, 'frozen', False)) + WINSERVICE = sys.executable.lower().endswith("pythonservice.exe") exit = win32.ExitProcess close = win32.CloseHandle @@ -176,7 +208,7 @@ else: # People embedding Python want to modify it. # - if sys.executable.lower().endswith('pythonservice.exe'): + if WINSERVICE: _python_exe = os.path.join(sys.exec_prefix, 'python.exe') else: _python_exe = sys.executable @@ -366,7 +398,7 @@ else: if _logger is not None: d['log_level'] = _logger.getEffectiveLevel() - if not WINEXE: + if not WINEXE and not WINSERVICE: main_path = getattr(sys.modules['__main__'], '__file__', None) if not main_path and sys.argv[0] not in ('', '-c'): main_path = sys.argv[0] diff --git a/Lib/multiprocessing/heap.py b/Lib/multiprocessing/heap.py index 697a5f5516..52ee49dc15 100644 --- a/Lib/multiprocessing/heap.py +++ b/Lib/multiprocessing/heap.py @@ -3,7 +3,33 @@ # # multiprocessing/heap.py # -# Copyright (c) 2007-2008, R Oudkerk --- see COPYING.txt +# Copyright (c) 2006-2008, R Oudkerk +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions +# are met: +# +# 1. Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# 2. Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in the +# documentation and/or other materials provided with the distribution. +# 3. Neither the name of author nor the names of any contributors may be +# used to endorse or promote products derived from this software +# without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND +# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS +# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) +# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT +# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY +# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF +# SUCH DAMAGE. # import bisect diff --git a/Lib/multiprocessing/managers.py b/Lib/multiprocessing/managers.py index e331116c89..78518fd522 100644 --- a/Lib/multiprocessing/managers.py +++ b/Lib/multiprocessing/managers.py @@ -4,7 +4,33 @@ # # multiprocessing/managers.py # -# Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt +# Copyright (c) 2006-2008, R Oudkerk +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions +# are met: +# +# 1. Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# 2. Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in the +# documentation and/or other materials provided with the distribution. +# 3. Neither the name of author nor the names of any contributors may be +# used to endorse or promote products derived from this software +# without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND +# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS +# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) +# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT +# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY +# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF +# SUCH DAMAGE. # __all__ = [ 'BaseManager', 'SyncManager', 'BaseProxy', 'Token' ] @@ -136,7 +162,7 @@ class Server(object): self.listener = Listener(address=address, backlog=5) self.address = self.listener.address - self.id_to_obj = {0: (None, ())} + self.id_to_obj = {'0': (None, ())} self.id_to_refcount = {} self.mutex = threading.RLock() self.stop = 0 @@ -298,7 +324,7 @@ class Server(object): keys = self.id_to_obj.keys() keys.sort() for ident in keys: - if ident != 0: + if ident != '0': result.append(' %s: refcount=%s\n %s' % (ident, self.id_to_refcount[ident], str(self.id_to_obj[ident][0])[:75])) @@ -310,7 +336,7 @@ class Server(object): ''' Number of shared objects ''' - return len(self.id_to_obj) - 1 # don't count ident=0 + return len(self.id_to_obj) - 1 # don't count ident='0' def shutdown(self, c): ''' @@ -475,12 +501,15 @@ class BaseManager(object): dispatch(conn, None, 'dummy') self._state.value = State.STARTED - def start(self): + def start(self, initializer=None, initargs=()): ''' Spawn a server process for this manager object ''' assert self._state.value == State.INITIAL + if initializer is not None and not hasattr(initializer, '__call__'): + raise TypeError('initializer must be a callable') + # pipe over which we will retrieve address of server reader, writer = connection.Pipe(duplex=False) @@ -488,7 +517,7 @@ class BaseManager(object): self._process = Process( target=type(self)._run_server, args=(self._registry, self._address, self._authkey, - self._serializer, writer), + self._serializer, writer, initializer, initargs), ) ident = ':'.join(str(i) for i in self._process._identity) self._process.name = type(self).__name__ + '-' + ident @@ -509,10 +538,14 @@ class BaseManager(object): ) @classmethod - def _run_server(cls, registry, address, authkey, serializer, writer): + def _run_server(cls, registry, address, authkey, serializer, writer, + initializer=None, initargs=()): ''' Create a server, report its address and run it ''' + if initializer is not None: + initializer(*initargs) + # create server server = cls._Server(registry, address, authkey, serializer) diff --git a/Lib/multiprocessing/pool.py b/Lib/multiprocessing/pool.py index 9da27d48ed..862a60e92c 100644 --- a/Lib/multiprocessing/pool.py +++ b/Lib/multiprocessing/pool.py @@ -3,7 +3,33 @@ # # multiprocessing/pool.py # -# Copyright (c) 2007-2008, R Oudkerk --- see COPYING.txt +# Copyright (c) 2006-2008, R Oudkerk +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions +# are met: +# +# 1. Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# 2. Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in the +# documentation and/or other materials provided with the distribution. +# 3. Neither the name of author nor the names of any contributors may be +# used to endorse or promote products derived from this software +# without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND +# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS +# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) +# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT +# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY +# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF +# SUCH DAMAGE. # __all__ = ['Pool'] @@ -42,7 +68,8 @@ def mapstar(args): # Code run by worker processes # -def worker(inqueue, outqueue, initializer=None, initargs=()): +def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None): + assert maxtasks is None or (type(maxtasks) == int and maxtasks > 0) put = outqueue.put get = inqueue.get if hasattr(inqueue, '_writer'): @@ -52,7 +79,8 @@ def worker(inqueue, outqueue, initializer=None, initargs=()): if initializer is not None: initializer(*initargs) - while 1: + completed = 0 + while maxtasks is None or (maxtasks and completed < maxtasks): try: task = get() except (EOFError, IOError): @@ -69,6 +97,8 @@ def worker(inqueue, outqueue, initializer=None, initargs=()): except Exception, e: result = (False, e) put((job, i, result)) + completed += 1 + debug('worker exiting after %d tasks' % completed) # # Class representing a process pool @@ -80,11 +110,15 @@ class Pool(object): ''' Process = Process - def __init__(self, processes=None, initializer=None, initargs=()): + def __init__(self, processes=None, initializer=None, initargs=(), + maxtasksperchild=None): self._setup_queues() self._taskqueue = Queue.Queue() self._cache = {} self._state = RUN + self._maxtasksperchild = maxtasksperchild + self._initializer = initializer + self._initargs = initargs if processes is None: try: @@ -92,16 +126,21 @@ class Pool(object): except NotImplementedError: processes = 1 + if initializer is not None and not hasattr(initializer, '__call__'): + raise TypeError('initializer must be a callable') + + self._processes = processes self._pool = [] - for i in range(processes): - w = self.Process( - target=worker, - args=(self._inqueue, self._outqueue, initializer, initargs) - ) - self._pool.append(w) - w.name = w.name.replace('Process', 'PoolWorker') - w.daemon = True - w.start() + self._repopulate_pool() + + self._worker_handler = threading.Thread( + target=Pool._handle_workers, + args=(self, ) + ) + self._worker_handler.daemon = True + self._worker_handler._state = RUN + self._worker_handler.start() + self._task_handler = threading.Thread( target=Pool._handle_tasks, @@ -122,10 +161,48 @@ class Pool(object): self._terminate = Finalize( self, self._terminate_pool, args=(self._taskqueue, self._inqueue, self._outqueue, self._pool, - self._task_handler, self._result_handler, self._cache), + self._worker_handler, self._task_handler, + self._result_handler, self._cache), exitpriority=15 ) + def _join_exited_workers(self): + """Cleanup after any worker processes which have exited due to reaching + their specified lifetime. Returns True if any workers were cleaned up. + """ + cleaned = False + for i in reversed(range(len(self._pool))): + worker = self._pool[i] + if worker.exitcode is not None: + # worker exited + debug('cleaning up worker %d' % i) + worker.join() + cleaned = True + del self._pool[i] + return cleaned + + def _repopulate_pool(self): + """Bring the number of pool processes up to the specified number, + for use after reaping workers which have exited. + """ + for i in range(self._processes - len(self._pool)): + w = self.Process(target=worker, + args=(self._inqueue, self._outqueue, + self._initializer, + self._initargs, self._maxtasksperchild) + ) + self._pool.append(w) + w.name = w.name.replace('Process', 'PoolWorker') + w.daemon = True + w.start() + debug('added worker') + + def _maintain_pool(self): + """Clean up any exited workers and start replacements for them. + """ + if self._join_exited_workers(): + self._repopulate_pool() + def _setup_queues(self): from .queues import SimpleQueue self._inqueue = SimpleQueue() @@ -204,6 +281,8 @@ class Pool(object): chunksize, extra = divmod(len(iterable), len(self._pool) * 4) if extra: chunksize += 1 + if len(iterable) == 0: + chunksize = 0 task_batches = Pool._get_tasks(func, iterable, chunksize) result = MapResult(self._cache, chunksize, len(iterable), callback) @@ -212,6 +291,15 @@ class Pool(object): return result @staticmethod + def _handle_workers(pool): + while pool._worker_handler._state == RUN and pool._state == RUN: + pool._maintain_pool() + time.sleep(0.1) + # send sentinel to stop workers + pool._taskqueue.put(None) + debug('worker handler exiting') + + @staticmethod def _handle_tasks(taskqueue, put, outqueue, pool): thread = threading.current_thread() @@ -326,16 +414,18 @@ class Pool(object): debug('closing pool') if self._state == RUN: self._state = CLOSE - self._taskqueue.put(None) + self._worker_handler._state = CLOSE def terminate(self): debug('terminating pool') self._state = TERMINATE + self._worker_handler._state = TERMINATE self._terminate() def join(self): debug('joining pool') assert self._state in (CLOSE, TERMINATE) + self._worker_handler.join() self._task_handler.join() self._result_handler.join() for p in self._pool: @@ -352,12 +442,12 @@ class Pool(object): @classmethod def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool, - task_handler, result_handler, cache): + worker_handler, task_handler, result_handler, cache): # this is guaranteed to only be called once debug('finalizing pool') + worker_handler._state = TERMINATE task_handler._state = TERMINATE - taskqueue.put(None) # sentinel debug('helping task handler/workers to finish') cls._help_stuff_finish(inqueue, task_handler, len(pool)) @@ -367,10 +457,17 @@ class Pool(object): result_handler._state = TERMINATE outqueue.put(None) # sentinel + # We must wait for the worker handler to exit before terminating + # workers because we don't want workers to be restarted behind our back. + debug('joining worker handler') + worker_handler.join() + + # Terminate workers which haven't already finished. if pool and hasattr(pool[0], 'terminate'): debug('terminating workers') for p in pool: - p.terminate() + if p.exitcode is None: + p.terminate() debug('joining task handler') task_handler.join(1e100) @@ -381,7 +478,10 @@ class Pool(object): if pool and hasattr(pool[0], 'terminate'): debug('joining pool workers') for p in pool: - p.join() + if p.is_alive(): + # worker has not yet exited + debug('cleaning up worker %d' % p.pid) + p.join() # # Class whose instances are returned by `Pool.apply_async()` diff --git a/Lib/multiprocessing/process.py b/Lib/multiprocessing/process.py index 56719d9c9c..0697e74243 100644 --- a/Lib/multiprocessing/process.py +++ b/Lib/multiprocessing/process.py @@ -3,7 +3,33 @@ # # multiprocessing/process.py # -# Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt +# Copyright (c) 2006-2008, R Oudkerk +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions +# are met: +# +# 1. Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# 2. Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in the +# documentation and/or other materials provided with the distribution. +# 3. Neither the name of author nor the names of any contributors may be +# used to endorse or promote products derived from this software +# without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND +# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS +# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) +# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT +# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY +# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF +# SUCH DAMAGE. # __all__ = ['Process', 'current_process', 'active_children'] @@ -179,7 +205,7 @@ class Process(object): @property def ident(self): ''' - Return indentifier (PID) of process or `None` if it has yet to start + Return identifier (PID) of process or `None` if it has yet to start ''' if self is _current_process: return os.getpid() diff --git a/Lib/multiprocessing/queues.py b/Lib/multiprocessing/queues.py index ea279911b6..8584408c60 100644 --- a/Lib/multiprocessing/queues.py +++ b/Lib/multiprocessing/queues.py @@ -3,7 +3,33 @@ # # multiprocessing/queues.py # -# Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt +# Copyright (c) 2006-2008, R Oudkerk +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions +# are met: +# +# 1. Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# 2. Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in the +# documentation and/or other materials provided with the distribution. +# 3. Neither the name of author nor the names of any contributors may be +# used to endorse or promote products derived from this software +# without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND +# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS +# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) +# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT +# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY +# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF +# SUCH DAMAGE. # __all__ = ['Queue', 'SimpleQueue', 'JoinableQueue'] diff --git a/Lib/multiprocessing/reduction.py b/Lib/multiprocessing/reduction.py index 60e71513e9..6e5e5bc9de 100644 --- a/Lib/multiprocessing/reduction.py +++ b/Lib/multiprocessing/reduction.py @@ -4,7 +4,33 @@ # # multiprocessing/reduction.py # -# Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt +# Copyright (c) 2006-2008, R Oudkerk +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions +# are met: +# +# 1. Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# 2. Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in the +# documentation and/or other materials provided with the distribution. +# 3. Neither the name of author nor the names of any contributors may be +# used to endorse or promote products derived from this software +# without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND +# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS +# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) +# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT +# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY +# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF +# SUCH DAMAGE. # __all__ = [] diff --git a/Lib/multiprocessing/sharedctypes.py b/Lib/multiprocessing/sharedctypes.py index 76b5e94b65..1eb044dd54 100644 --- a/Lib/multiprocessing/sharedctypes.py +++ b/Lib/multiprocessing/sharedctypes.py @@ -3,7 +3,33 @@ # # multiprocessing/sharedctypes.py # -# Copyright (c) 2007-2008, R Oudkerk --- see COPYING.txt +# Copyright (c) 2006-2008, R Oudkerk +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions +# are met: +# +# 1. Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# 2. Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in the +# documentation and/or other materials provided with the distribution. +# 3. Neither the name of author nor the names of any contributors may be +# used to endorse or promote products derived from this software +# without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND +# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS +# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) +# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT +# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY +# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF +# SUCH DAMAGE. # import sys @@ -52,9 +78,11 @@ def RawArray(typecode_or_type, size_or_initializer): Returns a ctypes array allocated from shared memory ''' type_ = typecode_to_type.get(typecode_or_type, typecode_or_type) - if isinstance(size_or_initializer, int): + if isinstance(size_or_initializer, (int, long)): type_ = type_ * size_or_initializer - return _new_value(type_) + obj = _new_value(type_) + ctypes.memset(ctypes.addressof(obj), 0, ctypes.sizeof(obj)) + return obj else: type_ = type_ * len(size_or_initializer) result = _new_value(type_) diff --git a/Lib/multiprocessing/synchronize.py b/Lib/multiprocessing/synchronize.py index dacf45acca..4b077e53ae 100644 --- a/Lib/multiprocessing/synchronize.py +++ b/Lib/multiprocessing/synchronize.py @@ -3,7 +3,33 @@ # # multiprocessing/synchronize.py # -# Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt +# Copyright (c) 2006-2008, R Oudkerk +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions +# are met: +# +# 1. Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# 2. Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in the +# documentation and/or other materials provided with the distribution. +# 3. Neither the name of author nor the names of any contributors may be +# used to endorse or promote products derived from this software +# without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND +# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS +# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) +# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT +# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY +# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF +# SUCH DAMAGE. # __all__ = [ @@ -58,8 +84,12 @@ class SemLock(object): def _make_methods(self): self.acquire = self._semlock.acquire self.release = self._semlock.release - self.__enter__ = self._semlock.__enter__ - self.__exit__ = self._semlock.__exit__ + + def __enter__(self): + return self._semlock.__enter__() + + def __exit__(self, *args): + return self._semlock.__exit__(*args) def __getstate__(self): assert_spawning(self) @@ -181,11 +211,15 @@ class Condition(object): self._woken_count, self._wait_semaphore) = state self._make_methods() + def __enter__(self): + return self._lock.__enter__() + + def __exit__(self, *args): + return self._lock.__exit__(*args) + def _make_methods(self): self.acquire = self._lock.acquire self.release = self._lock.release - self.__enter__ = self._lock.__enter__ - self.__exit__ = self._lock.__exit__ def __repr__(self): try: @@ -301,5 +335,10 @@ class Event(object): self._flag.release() else: self._cond.wait(timeout) + + if self._flag.acquire(False): + self._flag.release() + return True + return False finally: self._cond.release() diff --git a/Lib/multiprocessing/util.py b/Lib/multiprocessing/util.py index 632adb1d7c..c65dd9904e 100644 --- a/Lib/multiprocessing/util.py +++ b/Lib/multiprocessing/util.py @@ -3,7 +3,33 @@ # # multiprocessing/util.py # -# Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt +# Copyright (c) 2006-2008, R Oudkerk +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions +# are met: +# +# 1. Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# 2. Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in the +# documentation and/or other materials provided with the distribution. +# 3. Neither the name of author nor the names of any contributors may be +# used to endorse or promote products derived from this software +# without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND +# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS +# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) +# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT +# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY +# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF +# SUCH DAMAGE. # import itertools |