summaryrefslogtreecommitdiff
path: root/Lib/multiprocessing
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/multiprocessing')
-rw-r--r--Lib/multiprocessing/__init__.py11
-rw-r--r--Lib/multiprocessing/connection.py28
-rw-r--r--Lib/multiprocessing/dummy/__init__.py28
-rw-r--r--Lib/multiprocessing/dummy/connection.py28
-rw-r--r--Lib/multiprocessing/forking.py40
-rw-r--r--Lib/multiprocessing/heap.py28
-rw-r--r--Lib/multiprocessing/managers.py47
-rw-r--r--Lib/multiprocessing/pool.py138
-rw-r--r--Lib/multiprocessing/process.py30
-rw-r--r--Lib/multiprocessing/queues.py28
-rw-r--r--Lib/multiprocessing/reduction.py28
-rw-r--r--Lib/multiprocessing/sharedctypes.py34
-rw-r--r--Lib/multiprocessing/synchronize.py49
-rw-r--r--Lib/multiprocessing/util.py28
14 files changed, 495 insertions, 50 deletions
diff --git a/Lib/multiprocessing/__init__.py b/Lib/multiprocessing/__init__.py
index 4fb0bd0bf5..4963293bd9 100644
--- a/Lib/multiprocessing/__init__.py
+++ b/Lib/multiprocessing/__init__.py
@@ -38,6 +38,7 @@
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+# SUCH DAMAGE.
#
__version__ = '0.70a1'
@@ -115,8 +116,12 @@ def cpu_count():
except (ValueError, KeyError):
num = 0
elif 'bsd' in sys.platform or sys.platform == 'darwin':
+ comm = '/sbin/sysctl -n hw.ncpu'
+ if sys.platform == 'darwin':
+ comm = '/usr' + comm
try:
- num = int(os.popen('sysctl -n hw.ncpu').read())
+ with os.popen(comm) as p:
+ num = int(p.read())
except ValueError:
num = 0
else:
@@ -219,12 +224,12 @@ def JoinableQueue(maxsize=0):
from multiprocessing.queues import JoinableQueue
return JoinableQueue(maxsize)
-def Pool(processes=None, initializer=None, initargs=()):
+def Pool(processes=None, initializer=None, initargs=(), maxtasksperchild=None):
'''
Returns a process pool object
'''
from multiprocessing.pool import Pool
- return Pool(processes, initializer, initargs)
+ return Pool(processes, initializer, initargs, maxtasksperchild)
def RawValue(typecode_or_type, *args):
'''
diff --git a/Lib/multiprocessing/connection.py b/Lib/multiprocessing/connection.py
index 99b338edb1..090796ead3 100644
--- a/Lib/multiprocessing/connection.py
+++ b/Lib/multiprocessing/connection.py
@@ -3,7 +3,33 @@
#
# multiprocessing/connection.py
#
-# Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt
+# Copyright (c) 2006-2008, R Oudkerk
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions
+# are met:
+#
+# 1. Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# 2. Redistributions in binary form must reproduce the above copyright
+# notice, this list of conditions and the following disclaimer in the
+# documentation and/or other materials provided with the distribution.
+# 3. Neither the name of author nor the names of any contributors may be
+# used to endorse or promote products derived from this software
+# without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND
+# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
+# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
+# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
+# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
+# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+# SUCH DAMAGE.
#
__all__ = [ 'Client', 'Listener', 'Pipe' ]
diff --git a/Lib/multiprocessing/dummy/__init__.py b/Lib/multiprocessing/dummy/__init__.py
index 30b1b20f56..19a3b697c2 100644
--- a/Lib/multiprocessing/dummy/__init__.py
+++ b/Lib/multiprocessing/dummy/__init__.py
@@ -3,7 +3,33 @@
#
# multiprocessing/dummy/__init__.py
#
-# Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt
+# Copyright (c) 2006-2008, R Oudkerk
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions
+# are met:
+#
+# 1. Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# 2. Redistributions in binary form must reproduce the above copyright
+# notice, this list of conditions and the following disclaimer in the
+# documentation and/or other materials provided with the distribution.
+# 3. Neither the name of author nor the names of any contributors may be
+# used to endorse or promote products derived from this software
+# without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND
+# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
+# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
+# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
+# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
+# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+# SUCH DAMAGE.
#
__all__ = [
diff --git a/Lib/multiprocessing/dummy/connection.py b/Lib/multiprocessing/dummy/connection.py
index 4f0a6805de..50dc9ffe73 100644
--- a/Lib/multiprocessing/dummy/connection.py
+++ b/Lib/multiprocessing/dummy/connection.py
@@ -3,7 +3,33 @@
#
# multiprocessing/dummy/connection.py
#
-# Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt
+# Copyright (c) 2006-2008, R Oudkerk
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions
+# are met:
+#
+# 1. Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# 2. Redistributions in binary form must reproduce the above copyright
+# notice, this list of conditions and the following disclaimer in the
+# documentation and/or other materials provided with the distribution.
+# 3. Neither the name of author nor the names of any contributors may be
+# used to endorse or promote products derived from this software
+# without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND
+# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
+# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
+# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
+# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
+# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+# SUCH DAMAGE.
#
__all__ = [ 'Client', 'Listener', 'Pipe' ]
diff --git a/Lib/multiprocessing/forking.py b/Lib/multiprocessing/forking.py
index cec184c748..3fca8b1132 100644
--- a/Lib/multiprocessing/forking.py
+++ b/Lib/multiprocessing/forking.py
@@ -3,7 +3,33 @@
#
# multiprocessing/forking.py
#
-# Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt
+# Copyright (c) 2006-2008, R Oudkerk
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions
+# are met:
+#
+# 1. Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# 2. Redistributions in binary form must reproduce the above copyright
+# notice, this list of conditions and the following disclaimer in the
+# documentation and/or other materials provided with the distribution.
+# 3. Neither the name of author nor the names of any contributors may be
+# used to endorse or promote products derived from this software
+# without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND
+# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
+# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
+# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
+# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
+# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+# SUCH DAMAGE.
#
import os
@@ -103,7 +129,12 @@ if sys.platform != 'win32':
def poll(self, flag=os.WNOHANG):
if self.returncode is None:
- pid, sts = os.waitpid(self.pid, flag)
+ try:
+ pid, sts = os.waitpid(self.pid, flag)
+ except os.error:
+ # Child process not yet created. See #1731717
+ # e.errno == errno.ECHILD == 10
+ return None
if pid == self.pid:
if os.WIFSIGNALED(sts):
self.returncode = -os.WTERMSIG(sts)
@@ -167,6 +198,7 @@ else:
TERMINATE = 0x10000
WINEXE = (sys.platform == 'win32' and getattr(sys, 'frozen', False))
+ WINSERVICE = sys.executable.lower().endswith("pythonservice.exe")
exit = win32.ExitProcess
close = win32.CloseHandle
@@ -176,7 +208,7 @@ else:
# People embedding Python want to modify it.
#
- if sys.executable.lower().endswith('pythonservice.exe'):
+ if WINSERVICE:
_python_exe = os.path.join(sys.exec_prefix, 'python.exe')
else:
_python_exe = sys.executable
@@ -366,7 +398,7 @@ else:
if _logger is not None:
d['log_level'] = _logger.getEffectiveLevel()
- if not WINEXE:
+ if not WINEXE and not WINSERVICE:
main_path = getattr(sys.modules['__main__'], '__file__', None)
if not main_path and sys.argv[0] not in ('', '-c'):
main_path = sys.argv[0]
diff --git a/Lib/multiprocessing/heap.py b/Lib/multiprocessing/heap.py
index 697a5f5516..52ee49dc15 100644
--- a/Lib/multiprocessing/heap.py
+++ b/Lib/multiprocessing/heap.py
@@ -3,7 +3,33 @@
#
# multiprocessing/heap.py
#
-# Copyright (c) 2007-2008, R Oudkerk --- see COPYING.txt
+# Copyright (c) 2006-2008, R Oudkerk
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions
+# are met:
+#
+# 1. Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# 2. Redistributions in binary form must reproduce the above copyright
+# notice, this list of conditions and the following disclaimer in the
+# documentation and/or other materials provided with the distribution.
+# 3. Neither the name of author nor the names of any contributors may be
+# used to endorse or promote products derived from this software
+# without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND
+# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
+# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
+# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
+# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
+# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+# SUCH DAMAGE.
#
import bisect
diff --git a/Lib/multiprocessing/managers.py b/Lib/multiprocessing/managers.py
index e331116c89..78518fd522 100644
--- a/Lib/multiprocessing/managers.py
+++ b/Lib/multiprocessing/managers.py
@@ -4,7 +4,33 @@
#
# multiprocessing/managers.py
#
-# Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt
+# Copyright (c) 2006-2008, R Oudkerk
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions
+# are met:
+#
+# 1. Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# 2. Redistributions in binary form must reproduce the above copyright
+# notice, this list of conditions and the following disclaimer in the
+# documentation and/or other materials provided with the distribution.
+# 3. Neither the name of author nor the names of any contributors may be
+# used to endorse or promote products derived from this software
+# without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND
+# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
+# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
+# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
+# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
+# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+# SUCH DAMAGE.
#
__all__ = [ 'BaseManager', 'SyncManager', 'BaseProxy', 'Token' ]
@@ -136,7 +162,7 @@ class Server(object):
self.listener = Listener(address=address, backlog=5)
self.address = self.listener.address
- self.id_to_obj = {0: (None, ())}
+ self.id_to_obj = {'0': (None, ())}
self.id_to_refcount = {}
self.mutex = threading.RLock()
self.stop = 0
@@ -298,7 +324,7 @@ class Server(object):
keys = self.id_to_obj.keys()
keys.sort()
for ident in keys:
- if ident != 0:
+ if ident != '0':
result.append(' %s: refcount=%s\n %s' %
(ident, self.id_to_refcount[ident],
str(self.id_to_obj[ident][0])[:75]))
@@ -310,7 +336,7 @@ class Server(object):
'''
Number of shared objects
'''
- return len(self.id_to_obj) - 1 # don't count ident=0
+ return len(self.id_to_obj) - 1 # don't count ident='0'
def shutdown(self, c):
'''
@@ -475,12 +501,15 @@ class BaseManager(object):
dispatch(conn, None, 'dummy')
self._state.value = State.STARTED
- def start(self):
+ def start(self, initializer=None, initargs=()):
'''
Spawn a server process for this manager object
'''
assert self._state.value == State.INITIAL
+ if initializer is not None and not hasattr(initializer, '__call__'):
+ raise TypeError('initializer must be a callable')
+
# pipe over which we will retrieve address of server
reader, writer = connection.Pipe(duplex=False)
@@ -488,7 +517,7 @@ class BaseManager(object):
self._process = Process(
target=type(self)._run_server,
args=(self._registry, self._address, self._authkey,
- self._serializer, writer),
+ self._serializer, writer, initializer, initargs),
)
ident = ':'.join(str(i) for i in self._process._identity)
self._process.name = type(self).__name__ + '-' + ident
@@ -509,10 +538,14 @@ class BaseManager(object):
)
@classmethod
- def _run_server(cls, registry, address, authkey, serializer, writer):
+ def _run_server(cls, registry, address, authkey, serializer, writer,
+ initializer=None, initargs=()):
'''
Create a server, report its address and run it
'''
+ if initializer is not None:
+ initializer(*initargs)
+
# create server
server = cls._Server(registry, address, authkey, serializer)
diff --git a/Lib/multiprocessing/pool.py b/Lib/multiprocessing/pool.py
index 9da27d48ed..862a60e92c 100644
--- a/Lib/multiprocessing/pool.py
+++ b/Lib/multiprocessing/pool.py
@@ -3,7 +3,33 @@
#
# multiprocessing/pool.py
#
-# Copyright (c) 2007-2008, R Oudkerk --- see COPYING.txt
+# Copyright (c) 2006-2008, R Oudkerk
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions
+# are met:
+#
+# 1. Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# 2. Redistributions in binary form must reproduce the above copyright
+# notice, this list of conditions and the following disclaimer in the
+# documentation and/or other materials provided with the distribution.
+# 3. Neither the name of author nor the names of any contributors may be
+# used to endorse or promote products derived from this software
+# without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND
+# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
+# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
+# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
+# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
+# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+# SUCH DAMAGE.
#
__all__ = ['Pool']
@@ -42,7 +68,8 @@ def mapstar(args):
# Code run by worker processes
#
-def worker(inqueue, outqueue, initializer=None, initargs=()):
+def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None):
+ assert maxtasks is None or (type(maxtasks) == int and maxtasks > 0)
put = outqueue.put
get = inqueue.get
if hasattr(inqueue, '_writer'):
@@ -52,7 +79,8 @@ def worker(inqueue, outqueue, initializer=None, initargs=()):
if initializer is not None:
initializer(*initargs)
- while 1:
+ completed = 0
+ while maxtasks is None or (maxtasks and completed < maxtasks):
try:
task = get()
except (EOFError, IOError):
@@ -69,6 +97,8 @@ def worker(inqueue, outqueue, initializer=None, initargs=()):
except Exception, e:
result = (False, e)
put((job, i, result))
+ completed += 1
+ debug('worker exiting after %d tasks' % completed)
#
# Class representing a process pool
@@ -80,11 +110,15 @@ class Pool(object):
'''
Process = Process
- def __init__(self, processes=None, initializer=None, initargs=()):
+ def __init__(self, processes=None, initializer=None, initargs=(),
+ maxtasksperchild=None):
self._setup_queues()
self._taskqueue = Queue.Queue()
self._cache = {}
self._state = RUN
+ self._maxtasksperchild = maxtasksperchild
+ self._initializer = initializer
+ self._initargs = initargs
if processes is None:
try:
@@ -92,16 +126,21 @@ class Pool(object):
except NotImplementedError:
processes = 1
+ if initializer is not None and not hasattr(initializer, '__call__'):
+ raise TypeError('initializer must be a callable')
+
+ self._processes = processes
self._pool = []
- for i in range(processes):
- w = self.Process(
- target=worker,
- args=(self._inqueue, self._outqueue, initializer, initargs)
- )
- self._pool.append(w)
- w.name = w.name.replace('Process', 'PoolWorker')
- w.daemon = True
- w.start()
+ self._repopulate_pool()
+
+ self._worker_handler = threading.Thread(
+ target=Pool._handle_workers,
+ args=(self, )
+ )
+ self._worker_handler.daemon = True
+ self._worker_handler._state = RUN
+ self._worker_handler.start()
+
self._task_handler = threading.Thread(
target=Pool._handle_tasks,
@@ -122,10 +161,48 @@ class Pool(object):
self._terminate = Finalize(
self, self._terminate_pool,
args=(self._taskqueue, self._inqueue, self._outqueue, self._pool,
- self._task_handler, self._result_handler, self._cache),
+ self._worker_handler, self._task_handler,
+ self._result_handler, self._cache),
exitpriority=15
)
+ def _join_exited_workers(self):
+ """Cleanup after any worker processes which have exited due to reaching
+ their specified lifetime. Returns True if any workers were cleaned up.
+ """
+ cleaned = False
+ for i in reversed(range(len(self._pool))):
+ worker = self._pool[i]
+ if worker.exitcode is not None:
+ # worker exited
+ debug('cleaning up worker %d' % i)
+ worker.join()
+ cleaned = True
+ del self._pool[i]
+ return cleaned
+
+ def _repopulate_pool(self):
+ """Bring the number of pool processes up to the specified number,
+ for use after reaping workers which have exited.
+ """
+ for i in range(self._processes - len(self._pool)):
+ w = self.Process(target=worker,
+ args=(self._inqueue, self._outqueue,
+ self._initializer,
+ self._initargs, self._maxtasksperchild)
+ )
+ self._pool.append(w)
+ w.name = w.name.replace('Process', 'PoolWorker')
+ w.daemon = True
+ w.start()
+ debug('added worker')
+
+ def _maintain_pool(self):
+ """Clean up any exited workers and start replacements for them.
+ """
+ if self._join_exited_workers():
+ self._repopulate_pool()
+
def _setup_queues(self):
from .queues import SimpleQueue
self._inqueue = SimpleQueue()
@@ -204,6 +281,8 @@ class Pool(object):
chunksize, extra = divmod(len(iterable), len(self._pool) * 4)
if extra:
chunksize += 1
+ if len(iterable) == 0:
+ chunksize = 0
task_batches = Pool._get_tasks(func, iterable, chunksize)
result = MapResult(self._cache, chunksize, len(iterable), callback)
@@ -212,6 +291,15 @@ class Pool(object):
return result
@staticmethod
+ def _handle_workers(pool):
+ while pool._worker_handler._state == RUN and pool._state == RUN:
+ pool._maintain_pool()
+ time.sleep(0.1)
+ # send sentinel to stop workers
+ pool._taskqueue.put(None)
+ debug('worker handler exiting')
+
+ @staticmethod
def _handle_tasks(taskqueue, put, outqueue, pool):
thread = threading.current_thread()
@@ -326,16 +414,18 @@ class Pool(object):
debug('closing pool')
if self._state == RUN:
self._state = CLOSE
- self._taskqueue.put(None)
+ self._worker_handler._state = CLOSE
def terminate(self):
debug('terminating pool')
self._state = TERMINATE
+ self._worker_handler._state = TERMINATE
self._terminate()
def join(self):
debug('joining pool')
assert self._state in (CLOSE, TERMINATE)
+ self._worker_handler.join()
self._task_handler.join()
self._result_handler.join()
for p in self._pool:
@@ -352,12 +442,12 @@ class Pool(object):
@classmethod
def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool,
- task_handler, result_handler, cache):
+ worker_handler, task_handler, result_handler, cache):
# this is guaranteed to only be called once
debug('finalizing pool')
+ worker_handler._state = TERMINATE
task_handler._state = TERMINATE
- taskqueue.put(None) # sentinel
debug('helping task handler/workers to finish')
cls._help_stuff_finish(inqueue, task_handler, len(pool))
@@ -367,10 +457,17 @@ class Pool(object):
result_handler._state = TERMINATE
outqueue.put(None) # sentinel
+ # We must wait for the worker handler to exit before terminating
+ # workers because we don't want workers to be restarted behind our back.
+ debug('joining worker handler')
+ worker_handler.join()
+
+ # Terminate workers which haven't already finished.
if pool and hasattr(pool[0], 'terminate'):
debug('terminating workers')
for p in pool:
- p.terminate()
+ if p.exitcode is None:
+ p.terminate()
debug('joining task handler')
task_handler.join(1e100)
@@ -381,7 +478,10 @@ class Pool(object):
if pool and hasattr(pool[0], 'terminate'):
debug('joining pool workers')
for p in pool:
- p.join()
+ if p.is_alive():
+ # worker has not yet exited
+ debug('cleaning up worker %d' % p.pid)
+ p.join()
#
# Class whose instances are returned by `Pool.apply_async()`
diff --git a/Lib/multiprocessing/process.py b/Lib/multiprocessing/process.py
index 56719d9c9c..0697e74243 100644
--- a/Lib/multiprocessing/process.py
+++ b/Lib/multiprocessing/process.py
@@ -3,7 +3,33 @@
#
# multiprocessing/process.py
#
-# Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt
+# Copyright (c) 2006-2008, R Oudkerk
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions
+# are met:
+#
+# 1. Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# 2. Redistributions in binary form must reproduce the above copyright
+# notice, this list of conditions and the following disclaimer in the
+# documentation and/or other materials provided with the distribution.
+# 3. Neither the name of author nor the names of any contributors may be
+# used to endorse or promote products derived from this software
+# without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND
+# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
+# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
+# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
+# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
+# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+# SUCH DAMAGE.
#
__all__ = ['Process', 'current_process', 'active_children']
@@ -179,7 +205,7 @@ class Process(object):
@property
def ident(self):
'''
- Return indentifier (PID) of process or `None` if it has yet to start
+ Return identifier (PID) of process or `None` if it has yet to start
'''
if self is _current_process:
return os.getpid()
diff --git a/Lib/multiprocessing/queues.py b/Lib/multiprocessing/queues.py
index ea279911b6..8584408c60 100644
--- a/Lib/multiprocessing/queues.py
+++ b/Lib/multiprocessing/queues.py
@@ -3,7 +3,33 @@
#
# multiprocessing/queues.py
#
-# Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt
+# Copyright (c) 2006-2008, R Oudkerk
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions
+# are met:
+#
+# 1. Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# 2. Redistributions in binary form must reproduce the above copyright
+# notice, this list of conditions and the following disclaimer in the
+# documentation and/or other materials provided with the distribution.
+# 3. Neither the name of author nor the names of any contributors may be
+# used to endorse or promote products derived from this software
+# without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND
+# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
+# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
+# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
+# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
+# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+# SUCH DAMAGE.
#
__all__ = ['Queue', 'SimpleQueue', 'JoinableQueue']
diff --git a/Lib/multiprocessing/reduction.py b/Lib/multiprocessing/reduction.py
index 60e71513e9..6e5e5bc9de 100644
--- a/Lib/multiprocessing/reduction.py
+++ b/Lib/multiprocessing/reduction.py
@@ -4,7 +4,33 @@
#
# multiprocessing/reduction.py
#
-# Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt
+# Copyright (c) 2006-2008, R Oudkerk
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions
+# are met:
+#
+# 1. Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# 2. Redistributions in binary form must reproduce the above copyright
+# notice, this list of conditions and the following disclaimer in the
+# documentation and/or other materials provided with the distribution.
+# 3. Neither the name of author nor the names of any contributors may be
+# used to endorse or promote products derived from this software
+# without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND
+# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
+# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
+# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
+# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
+# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+# SUCH DAMAGE.
#
__all__ = []
diff --git a/Lib/multiprocessing/sharedctypes.py b/Lib/multiprocessing/sharedctypes.py
index 76b5e94b65..1eb044dd54 100644
--- a/Lib/multiprocessing/sharedctypes.py
+++ b/Lib/multiprocessing/sharedctypes.py
@@ -3,7 +3,33 @@
#
# multiprocessing/sharedctypes.py
#
-# Copyright (c) 2007-2008, R Oudkerk --- see COPYING.txt
+# Copyright (c) 2006-2008, R Oudkerk
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions
+# are met:
+#
+# 1. Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# 2. Redistributions in binary form must reproduce the above copyright
+# notice, this list of conditions and the following disclaimer in the
+# documentation and/or other materials provided with the distribution.
+# 3. Neither the name of author nor the names of any contributors may be
+# used to endorse or promote products derived from this software
+# without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND
+# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
+# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
+# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
+# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
+# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+# SUCH DAMAGE.
#
import sys
@@ -52,9 +78,11 @@ def RawArray(typecode_or_type, size_or_initializer):
Returns a ctypes array allocated from shared memory
'''
type_ = typecode_to_type.get(typecode_or_type, typecode_or_type)
- if isinstance(size_or_initializer, int):
+ if isinstance(size_or_initializer, (int, long)):
type_ = type_ * size_or_initializer
- return _new_value(type_)
+ obj = _new_value(type_)
+ ctypes.memset(ctypes.addressof(obj), 0, ctypes.sizeof(obj))
+ return obj
else:
type_ = type_ * len(size_or_initializer)
result = _new_value(type_)
diff --git a/Lib/multiprocessing/synchronize.py b/Lib/multiprocessing/synchronize.py
index dacf45acca..4b077e53ae 100644
--- a/Lib/multiprocessing/synchronize.py
+++ b/Lib/multiprocessing/synchronize.py
@@ -3,7 +3,33 @@
#
# multiprocessing/synchronize.py
#
-# Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt
+# Copyright (c) 2006-2008, R Oudkerk
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions
+# are met:
+#
+# 1. Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# 2. Redistributions in binary form must reproduce the above copyright
+# notice, this list of conditions and the following disclaimer in the
+# documentation and/or other materials provided with the distribution.
+# 3. Neither the name of author nor the names of any contributors may be
+# used to endorse or promote products derived from this software
+# without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND
+# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
+# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
+# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
+# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
+# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+# SUCH DAMAGE.
#
__all__ = [
@@ -58,8 +84,12 @@ class SemLock(object):
def _make_methods(self):
self.acquire = self._semlock.acquire
self.release = self._semlock.release
- self.__enter__ = self._semlock.__enter__
- self.__exit__ = self._semlock.__exit__
+
+ def __enter__(self):
+ return self._semlock.__enter__()
+
+ def __exit__(self, *args):
+ return self._semlock.__exit__(*args)
def __getstate__(self):
assert_spawning(self)
@@ -181,11 +211,15 @@ class Condition(object):
self._woken_count, self._wait_semaphore) = state
self._make_methods()
+ def __enter__(self):
+ return self._lock.__enter__()
+
+ def __exit__(self, *args):
+ return self._lock.__exit__(*args)
+
def _make_methods(self):
self.acquire = self._lock.acquire
self.release = self._lock.release
- self.__enter__ = self._lock.__enter__
- self.__exit__ = self._lock.__exit__
def __repr__(self):
try:
@@ -301,5 +335,10 @@ class Event(object):
self._flag.release()
else:
self._cond.wait(timeout)
+
+ if self._flag.acquire(False):
+ self._flag.release()
+ return True
+ return False
finally:
self._cond.release()
diff --git a/Lib/multiprocessing/util.py b/Lib/multiprocessing/util.py
index 632adb1d7c..c65dd9904e 100644
--- a/Lib/multiprocessing/util.py
+++ b/Lib/multiprocessing/util.py
@@ -3,7 +3,33 @@
#
# multiprocessing/util.py
#
-# Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt
+# Copyright (c) 2006-2008, R Oudkerk
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions
+# are met:
+#
+# 1. Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# 2. Redistributions in binary form must reproduce the above copyright
+# notice, this list of conditions and the following disclaimer in the
+# documentation and/or other materials provided with the distribution.
+# 3. Neither the name of author nor the names of any contributors may be
+# used to endorse or promote products derived from this software
+# without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND
+# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
+# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
+# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
+# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
+# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+# SUCH DAMAGE.
#
import itertools