From 096c745e602a5c3f2fcf697ab6323422ef4a2f68 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marc-Andr=C3=A9=20Lemburg?= Date: Fri, 25 Feb 2011 15:42:01 +0000 Subject: Normalize the encoding names for Latin-1 and UTF-8 to 'latin-1' and 'utf-8'. These are optimized in the Python Unicode implementation to result in more direct processing, bypassing the codec registry. Also see issue11303. --- Lib/multiprocessing/connection.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'Lib/multiprocessing') diff --git a/Lib/multiprocessing/connection.py b/Lib/multiprocessing/connection.py index d6c23fb0ec..d6627e5d2d 100644 --- a/Lib/multiprocessing/connection.py +++ b/Lib/multiprocessing/connection.py @@ -434,10 +434,10 @@ class ConnectionWrapper(object): return self._loads(s) def _xml_dumps(obj): - return xmlrpclib.dumps((obj,), None, None, None, 1).encode('utf8') + return xmlrpclib.dumps((obj,), None, None, None, 1).encode('utf-8') def _xml_loads(s): - (obj,), method = xmlrpclib.loads(s.decode('utf8')) + (obj,), method = xmlrpclib.loads(s.decode('utf-8')) return obj class XmlListener(Listener): -- cgit v1.2.1 From e8406df0c7f65212f9dc305daf4f0075e50ac665 Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Fri, 25 Feb 2011 22:07:43 +0000 Subject: Issue #6064: Add a `daemon` keyword argument to the threading.Thread and multiprocessing.Process constructors in order to override the default behaviour of inheriting the daemonic property from the current thread/process. --- Lib/multiprocessing/process.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) (limited to 'Lib/multiprocessing') diff --git a/Lib/multiprocessing/process.py b/Lib/multiprocessing/process.py index b56a061079..3fb9ff600f 100644 --- a/Lib/multiprocessing/process.py +++ b/Lib/multiprocessing/process.py @@ -91,12 +91,16 @@ class Process(object): ''' _Popen = None - def __init__(self, group=None, target=None, name=None, args=(), kwargs={}): + def __init__(self, group=None, target=None, name=None, args=(), kwargs={}, + *, daemon=None): assert group is None, 'group argument must be None for now' count = next(_current_process._counter) self._identity = _current_process._identity + (count,) self._authkey = _current_process._authkey - self._daemonic = _current_process._daemonic + if daemon is not None: + self._daemonic = daemon + else: + self._daemonic = _current_process._daemonic self._tempdir = _current_process._tempdir self._parent_pid = os.getpid() self._popen = None -- cgit v1.2.1 From 41224435f0695630a8df433ed28a65d5bb74e0c3 Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Mon, 9 May 2011 17:04:27 +0200 Subject: Issue #11743: Rewrite multiprocessing connection classes in pure Python. --- Lib/multiprocessing/connection.py | 315 ++++++++++++++++++++++++++++++++++++-- Lib/multiprocessing/forking.py | 5 +- Lib/multiprocessing/reduction.py | 13 +- 3 files changed, 311 insertions(+), 22 deletions(-) (limited to 'Lib/multiprocessing') diff --git a/Lib/multiprocessing/connection.py b/Lib/multiprocessing/connection.py index d6627e5d2d..afd580bff2 100644 --- a/Lib/multiprocessing/connection.py +++ b/Lib/multiprocessing/connection.py @@ -34,19 +34,27 @@ __all__ = [ 'Client', 'Listener', 'Pipe' ] +import io import os import sys +import pickle +import select import socket +import struct import errno import time import tempfile import itertools import _multiprocessing -from multiprocessing import current_process, AuthenticationError +from multiprocessing import current_process, AuthenticationError, BufferTooShort from multiprocessing.util import get_temp_dir, Finalize, sub_debug, debug -from multiprocessing.forking import duplicate, close - +try: + from _multiprocessing import win32 +except ImportError: + if sys.platform == 'win32': + raise + win32 = None # # @@ -110,6 +118,281 @@ def address_type(address): else: raise ValueError('address type of %r unrecognized' % address) +# +# Connection classes +# + +class _ConnectionBase: + _handle = None + + def __init__(self, handle, readable=True, writable=True): + handle = handle.__index__() + if handle < 0: + raise ValueError("invalid handle") + if not readable and not writable: + raise ValueError( + "at least one of `readable` and `writable` must be True") + self._handle = handle + self._readable = readable + self._writable = writable + + def __del__(self): + if self._handle is not None: + self._close() + + def _check_closed(self): + if self._handle is None: + raise IOError("handle is closed") + + def _check_readable(self): + if not self._readable: + raise IOError("connection is write-only") + + def _check_writable(self): + if not self._writable: + raise IOError("connection is read-only") + + def _bad_message_length(self): + if self._writable: + self._readable = False + else: + self.close() + raise IOError("bad message length") + + @property + def closed(self): + """True if the connection is closed""" + return self._handle is None + + @property + def readable(self): + """True if the connection is readable""" + return self._readable + + @property + def writable(self): + """True if the connection is writable""" + return self._writable + + def fileno(self): + """File descriptor or handle of the connection""" + self._check_closed() + return self._handle + + def close(self): + """Close the connection""" + if self._handle is not None: + try: + self._close() + finally: + self._handle = None + + def send_bytes(self, buf, offset=0, size=None): + """Send the bytes data from a bytes-like object""" + self._check_closed() + self._check_writable() + m = memoryview(buf) + # HACK for byte-indexing of non-bytewise buffers (e.g. array.array) + if m.itemsize > 1: + m = memoryview(bytes(m)) + n = len(m) + if offset < 0: + raise ValueError("offset is negative") + if n < offset: + raise ValueError("buffer length < offset") + if size is None: + size = n - offset + elif size < 0: + raise ValueError("size is negative") + elif offset + size > n: + raise ValueError("buffer length < offset + size") + self._send_bytes(m[offset:offset + size]) + + def send(self, obj): + """Send a (picklable) object""" + self._check_closed() + self._check_writable() + buf = pickle.dumps(obj, protocol=pickle.HIGHEST_PROTOCOL) + self._send_bytes(memoryview(buf)) + + def recv_bytes(self, maxlength=None): + """ + Receive bytes data as a bytes object. + """ + self._check_closed() + self._check_readable() + if maxlength is not None and maxlength < 0: + raise ValueError("negative maxlength") + buf = self._recv_bytes(maxlength) + if buf is None: + self._bad_message_length() + return buf.getvalue() + + def recv_bytes_into(self, buf, offset=0): + """ + Receive bytes data into a writeable buffer-like object. + Return the number of bytes read. + """ + self._check_closed() + self._check_readable() + with memoryview(buf) as m: + # Get bytesize of arbitrary buffer + itemsize = m.itemsize + bytesize = itemsize * len(m) + if offset < 0: + raise ValueError("negative offset") + elif offset > bytesize: + raise ValueError("offset too large") + result = self._recv_bytes() + size = result.tell() + if bytesize < offset + size: + raise BufferTooShort(result.getvalue()) + # Message can fit in dest + result.seek(0) + result.readinto(m[offset // itemsize : + (offset + size) // itemsize]) + return size + + def recv(self): + """Receive a (picklable) object""" + self._check_closed() + self._check_readable() + buf = self._recv_bytes() + return pickle.loads(buf.getbuffer()) + + def poll(self, timeout=0.0): + """Whether there is any input available to be read""" + self._check_closed() + self._check_readable() + if timeout < 0.0: + timeout = None + return self._poll(timeout) + + +if win32: + + class PipeConnection(_ConnectionBase): + """ + Connection class based on a Windows named pipe. + """ + + def _close(self): + win32.CloseHandle(self._handle) + + def _send_bytes(self, buf): + nwritten = win32.WriteFile(self._handle, buf) + assert nwritten == len(buf) + + def _recv_bytes(self, maxsize=None): + buf = io.BytesIO() + bufsize = 512 + if maxsize is not None: + bufsize = min(bufsize, maxsize) + try: + firstchunk, complete = win32.ReadFile(self._handle, bufsize) + except IOError as e: + if e.errno == win32.ERROR_BROKEN_PIPE: + raise EOFError + raise + lenfirstchunk = len(firstchunk) + buf.write(firstchunk) + if complete: + return buf + navail, nleft = win32.PeekNamedPipe(self._handle) + if maxsize is not None and lenfirstchunk + nleft > maxsize: + return None + lastchunk, complete = win32.ReadFile(self._handle, nleft) + assert complete + buf.write(lastchunk) + return buf + + def _poll(self, timeout): + navail, nleft = win32.PeekNamedPipe(self._handle) + if navail > 0: + return True + elif timeout == 0.0: + return False + # Setup a polling loop (translated straight from old + # pipe_connection.c) + if timeout < 0.0: + deadline = None + else: + deadline = time.time() + timeout + delay = 0.001 + max_delay = 0.02 + while True: + time.sleep(delay) + navail, nleft = win32.PeekNamedPipe(self._handle) + if navail > 0: + return True + if deadline and time.time() > deadline: + return False + if delay < max_delay: + delay += 0.001 + + +class Connection(_ConnectionBase): + """ + Connection class based on an arbitrary file descriptor (Unix only), or + a socket handle (Windows). + """ + + if win32: + def _close(self): + win32.closesocket(self._handle) + _write = win32.send + _read = win32.recv + else: + def _close(self): + os.close(self._handle) + _write = os.write + _read = os.read + + def _send(self, buf, write=_write): + remaining = len(buf) + while True: + n = write(self._handle, buf) + remaining -= n + if remaining == 0: + break + buf = buf[n:] + + def _recv(self, size, read=_read): + buf = io.BytesIO() + remaining = size + while remaining > 0: + chunk = read(self._handle, remaining) + n = len(chunk) + if n == 0: + if remaining == size: + raise EOFError + else: + raise IOError("got end of file during message") + buf.write(chunk) + remaining -= n + return buf + + def _send_bytes(self, buf): + # For wire compatibility with 3.2 and lower + n = len(buf) + self._send(struct.pack("=i", len(buf))) + # The condition is necessary to avoid "broken pipe" errors + # when sending a 0-length buffer if the other end closed the pipe. + if n > 0: + self._send(buf) + + def _recv_bytes(self, maxsize=None): + buf = self._recv(4) + size, = struct.unpack("=i", buf.getvalue()) + if maxsize is not None and size > maxsize: + return None + return self._recv(size) + + def _poll(self, timeout): + r = select.select([self._handle], [], [], timeout)[0] + return bool(r) + + # # Public functions # @@ -186,21 +469,19 @@ if sys.platform != 'win32': ''' if duplex: s1, s2 = socket.socketpair() - c1 = _multiprocessing.Connection(os.dup(s1.fileno())) - c2 = _multiprocessing.Connection(os.dup(s2.fileno())) + c1 = Connection(os.dup(s1.fileno())) + c2 = Connection(os.dup(s2.fileno())) s1.close() s2.close() else: fd1, fd2 = os.pipe() - c1 = _multiprocessing.Connection(fd1, writable=False) - c2 = _multiprocessing.Connection(fd2, readable=False) + c1 = Connection(fd1, writable=False) + c2 = Connection(fd2, readable=False) return c1, c2 else: - from _multiprocessing import win32 - def Pipe(duplex=True): ''' Returns pair of connection objects at either end of a pipe @@ -234,8 +515,8 @@ else: if e.args[0] != win32.ERROR_PIPE_CONNECTED: raise - c1 = _multiprocessing.PipeConnection(h1, writable=duplex) - c2 = _multiprocessing.PipeConnection(h2, readable=duplex) + c1 = PipeConnection(h1, writable=duplex) + c2 = PipeConnection(h2, readable=duplex) return c1, c2 @@ -266,7 +547,7 @@ class SocketListener(object): def accept(self): s, self._last_accepted = self._socket.accept() fd = duplicate(s.fileno()) - conn = _multiprocessing.Connection(fd) + conn = Connection(fd) s.close() return conn @@ -298,7 +579,7 @@ def SocketClient(address): raise fd = duplicate(s.fileno()) - conn = _multiprocessing.Connection(fd) + conn = Connection(fd) return conn # @@ -345,7 +626,7 @@ if sys.platform == 'win32': except WindowsError as e: if e.args[0] != win32.ERROR_PIPE_CONNECTED: raise - return _multiprocessing.PipeConnection(handle) + return PipeConnection(handle) @staticmethod def _finalize_pipe_listener(queue, address): @@ -377,7 +658,7 @@ if sys.platform == 'win32': win32.SetNamedPipeHandleState( h, win32.PIPE_READMODE_MESSAGE, None, None ) - return _multiprocessing.PipeConnection(h) + return PipeConnection(h) # # Authentication stuff @@ -451,3 +732,7 @@ def XmlClient(*args, **kwds): global xmlrpclib import xmlrpc.client as xmlrpclib return ConnectionWrapper(Client(*args, **kwds), _xml_dumps, _xml_loads) + + +# Late import because of circular import +from multiprocessing.forking import duplicate, close diff --git a/Lib/multiprocessing/forking.py b/Lib/multiprocessing/forking.py index cc7c326c07..3d9555708e 100644 --- a/Lib/multiprocessing/forking.py +++ b/Lib/multiprocessing/forking.py @@ -183,7 +183,7 @@ else: import time from pickle import dump, load, HIGHEST_PROTOCOL - from _multiprocessing import win32, Connection, PipeConnection + from _multiprocessing import win32 from .util import Finalize def dump(obj, file, protocol=None): @@ -411,6 +411,9 @@ else: # Make (Pipe)Connection picklable # + # Late import because of circular import + from .connection import Connection, PipeConnection + def reduce_connection(conn): if not Popen.thread_is_spawning(): raise RuntimeError( diff --git a/Lib/multiprocessing/reduction.py b/Lib/multiprocessing/reduction.py index 6e5e5bc9de..b32c72590a 100644 --- a/Lib/multiprocessing/reduction.py +++ b/Lib/multiprocessing/reduction.py @@ -44,7 +44,7 @@ import _multiprocessing from multiprocessing import current_process from multiprocessing.forking import Popen, duplicate, close, ForkingPickler from multiprocessing.util import register_after_fork, debug, sub_debug -from multiprocessing.connection import Client, Listener +from multiprocessing.connection import Client, Listener, Connection # @@ -159,7 +159,7 @@ def rebuild_handle(pickled_data): return new_handle # -# Register `_multiprocessing.Connection` with `ForkingPickler` +# Register `Connection` with `ForkingPickler` # def reduce_connection(conn): @@ -168,11 +168,11 @@ def reduce_connection(conn): def rebuild_connection(reduced_handle, readable, writable): handle = rebuild_handle(reduced_handle) - return _multiprocessing.Connection( + return Connection( handle, readable=readable, writable=writable ) -ForkingPickler.register(_multiprocessing.Connection, reduce_connection) +ForkingPickler.register(Connection, reduce_connection) # # Register `socket.socket` with `ForkingPickler` @@ -201,6 +201,7 @@ ForkingPickler.register(socket.socket, reduce_socket) # if sys.platform == 'win32': + from multiprocessing.connection import PipeConnection def reduce_pipe_connection(conn): rh = reduce_handle(conn.fileno()) @@ -208,8 +209,8 @@ if sys.platform == 'win32': def rebuild_pipe_connection(reduced_handle, readable, writable): handle = rebuild_handle(reduced_handle) - return _multiprocessing.PipeConnection( + return PipeConnection( handle, readable=readable, writable=writable ) - ForkingPickler.register(_multiprocessing.PipeConnection, reduce_pipe_connection) + ForkingPickler.register(PipeConnection, reduce_pipe_connection) -- cgit v1.2.1 From 304d84379e123e3845127aecab86ee8ac903f49c Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Mon, 9 May 2011 21:00:28 +0200 Subject: os.dup()-then-close() can be replaced with detach() --- Lib/multiprocessing/connection.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) (limited to 'Lib/multiprocessing') diff --git a/Lib/multiprocessing/connection.py b/Lib/multiprocessing/connection.py index afd580bff2..415e21066a 100644 --- a/Lib/multiprocessing/connection.py +++ b/Lib/multiprocessing/connection.py @@ -469,10 +469,8 @@ if sys.platform != 'win32': ''' if duplex: s1, s2 = socket.socketpair() - c1 = Connection(os.dup(s1.fileno())) - c2 = Connection(os.dup(s2.fileno())) - s1.close() - s2.close() + c1 = Connection(s1.detach()) + c2 = Connection(s2.detach()) else: fd1, fd2 = os.pipe() c1 = Connection(fd1, writable=False) -- cgit v1.2.1 From c67742818cff89d360c3bcceebe6e388b951b72c Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Mon, 6 Jun 2011 19:35:31 +0200 Subject: Issue #12040: Expose a new attribute `sentinel` on instances of :class:`multiprocessing.Process`. Also, fix Process.join() to not use polling anymore, when given a timeout. --- Lib/multiprocessing/forking.py | 34 ++++++++++++++++++++-------------- Lib/multiprocessing/process.py | 12 ++++++++++++ Lib/multiprocessing/util.py | 21 +++++++++++++++++++++ 3 files changed, 53 insertions(+), 14 deletions(-) (limited to 'Lib/multiprocessing') diff --git a/Lib/multiprocessing/forking.py b/Lib/multiprocessing/forking.py index 3d9555708e..3c359cb28b 100644 --- a/Lib/multiprocessing/forking.py +++ b/Lib/multiprocessing/forking.py @@ -101,10 +101,12 @@ else: if sys.platform != 'win32': import time + import select exit = os._exit duplicate = os.dup close = os.close + _select = util._eintr_retry(select.select) # # We define a Popen class similar to the one from subprocess, but @@ -118,8 +120,12 @@ if sys.platform != 'win32': sys.stderr.flush() self.returncode = None + r, w = os.pipe() + self.sentinel = r + self.pid = os.fork() if self.pid == 0: + os.close(r) if 'random' in sys.modules: import random random.seed() @@ -128,6 +134,11 @@ if sys.platform != 'win32': sys.stderr.flush() os._exit(code) + # `w` will be closed when the child exits, at which point `r` + # will become ready for reading (using e.g. select()). + os.close(w) + util.Finalize(self, os.close, (r,)) + def poll(self, flag=os.WNOHANG): if self.returncode is None: try: @@ -145,20 +156,14 @@ if sys.platform != 'win32': return self.returncode def wait(self, timeout=None): - if timeout is None: - return self.poll(0) - deadline = time.time() + timeout - delay = 0.0005 - while 1: - res = self.poll() - if res is not None: - break - remaining = deadline - time.time() - if remaining <= 0: - break - delay = min(delay * 2, remaining, 0.05) - time.sleep(delay) - return res + if self.returncode is None: + if timeout is not None: + r = _select([self.sentinel], [], [], timeout)[0] + if not r: + return None + # This shouldn't block if select() returned successfully. + return self.poll(os.WNOHANG if timeout == 0.0 else 0) + return self.returncode def terminate(self): if self.returncode is None: @@ -258,6 +263,7 @@ else: self.pid = pid self.returncode = None self._handle = hp + self.sentinel = int(hp) # send information to child prep_data = get_preparation_data(process_obj._name) diff --git a/Lib/multiprocessing/process.py b/Lib/multiprocessing/process.py index 3fb9ff600f..99ee5326c3 100644 --- a/Lib/multiprocessing/process.py +++ b/Lib/multiprocessing/process.py @@ -132,6 +132,7 @@ class Process(object): else: from .forking import Popen self._popen = Popen(self) + self._sentinel = self._popen.sentinel _current_process._children.add(self) def terminate(self): @@ -218,6 +219,17 @@ class Process(object): pid = ident + @property + def sentinel(self): + ''' + Return a file descriptor (Unix) or handle (Windows) suitable for + waiting for process termination. + ''' + try: + return self._sentinel + except AttributeError: + raise ValueError("process not started") + def __repr__(self): if self is _current_process: status = 'started' diff --git a/Lib/multiprocessing/util.py b/Lib/multiprocessing/util.py index 30b7a85fa2..7949d3ae24 100644 --- a/Lib/multiprocessing/util.py +++ b/Lib/multiprocessing/util.py @@ -32,9 +32,11 @@ # SUCH DAMAGE. # +import functools import itertools import weakref import atexit +import select import threading # we want threading to install it's # cleanup function before multiprocessing does @@ -315,3 +317,22 @@ class ForkAwareLocal(threading.local): register_after_fork(self, lambda obj : obj.__dict__.clear()) def __reduce__(self): return type(self), () + + +# +# Automatic retry after EINTR +# + +def _eintr_retry(func, _errors=(EnvironmentError, select.error)): + @functools.wraps(func) + def wrapped(*args, **kwargs): + while True: + try: + return func(*args, **kwargs) + except _errors as e: + # select.error has no `errno` attribute + if e.args[0] == errno.EINTR: + continue + raise + return wrapped + -- cgit v1.2.1 From 3b6a4b1a19f5af43418e486ad0a7839b8a49e9b6 Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Mon, 6 Jun 2011 19:36:01 +0200 Subject: Whitespace normalization --- Lib/multiprocessing/util.py | 1 - 1 file changed, 1 deletion(-) (limited to 'Lib/multiprocessing') diff --git a/Lib/multiprocessing/util.py b/Lib/multiprocessing/util.py index 7949d3ae24..b59ac9fc61 100644 --- a/Lib/multiprocessing/util.py +++ b/Lib/multiprocessing/util.py @@ -335,4 +335,3 @@ def _eintr_retry(func, _errors=(EnvironmentError, select.error)): continue raise return wrapped - -- cgit v1.2.1 From 8d62ed29484e5717297d02b7f861bc432423ed33 Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Wed, 8 Jun 2011 17:21:55 +0200 Subject: Issue #9205: concurrent.futures.ProcessPoolExecutor now detects killed children and raises BrokenProcessPool in such a situation. Previously it would reliably freeze/deadlock. --- Lib/multiprocessing/connection.py | 149 +++++++++++++++++++++++++------------- Lib/multiprocessing/forking.py | 1 + Lib/multiprocessing/queues.py | 6 +- 3 files changed, 101 insertions(+), 55 deletions(-) (limited to 'Lib/multiprocessing') diff --git a/Lib/multiprocessing/connection.py b/Lib/multiprocessing/connection.py index 415e21066a..ede2908eec 100644 --- a/Lib/multiprocessing/connection.py +++ b/Lib/multiprocessing/connection.py @@ -48,14 +48,18 @@ import itertools import _multiprocessing from multiprocessing import current_process, AuthenticationError, BufferTooShort -from multiprocessing.util import get_temp_dir, Finalize, sub_debug, debug +from multiprocessing.util import ( + get_temp_dir, Finalize, sub_debug, debug, _eintr_retry) try: from _multiprocessing import win32 + from _subprocess import WAIT_OBJECT_0, WAIT_TIMEOUT, INFINITE except ImportError: if sys.platform == 'win32': raise win32 = None +_select = _eintr_retry(select.select) + # # # @@ -118,6 +122,15 @@ def address_type(address): else: raise ValueError('address type of %r unrecognized' % address) + +class SentinelReady(Exception): + """ + Raised when a sentinel is ready when polling. + """ + def __init__(self, *args): + Exception.__init__(self, *args) + self.sentinels = args[0] + # # Connection classes # @@ -253,19 +266,17 @@ class _ConnectionBase: (offset + size) // itemsize]) return size - def recv(self): + def recv(self, sentinels=None): """Receive a (picklable) object""" self._check_closed() self._check_readable() - buf = self._recv_bytes() + buf = self._recv_bytes(sentinels=sentinels) return pickle.loads(buf.getbuffer()) def poll(self, timeout=0.0): """Whether there is any input available to be read""" self._check_closed() self._check_readable() - if timeout < 0.0: - timeout = None return self._poll(timeout) @@ -274,61 +285,88 @@ if win32: class PipeConnection(_ConnectionBase): """ Connection class based on a Windows named pipe. + Overlapped I/O is used, so the handles must have been created + with FILE_FLAG_OVERLAPPED. """ + _buffered = b'' def _close(self): win32.CloseHandle(self._handle) def _send_bytes(self, buf): - nwritten = win32.WriteFile(self._handle, buf) + overlapped = win32.WriteFile(self._handle, buf, overlapped=True) + nwritten, complete = overlapped.GetOverlappedResult(True) + assert complete assert nwritten == len(buf) - def _recv_bytes(self, maxsize=None): + def _recv_bytes(self, maxsize=None, sentinels=()): + if sentinels: + self._poll(-1.0, sentinels) buf = io.BytesIO() - bufsize = 512 - if maxsize is not None: - bufsize = min(bufsize, maxsize) - try: - firstchunk, complete = win32.ReadFile(self._handle, bufsize) - except IOError as e: - if e.errno == win32.ERROR_BROKEN_PIPE: - raise EOFError - raise - lenfirstchunk = len(firstchunk) - buf.write(firstchunk) - if complete: - return buf + firstchunk = self._buffered + if firstchunk: + lenfirstchunk = len(firstchunk) + buf.write(firstchunk) + self._buffered = b'' + else: + # A reasonable size for the first chunk transfer + bufsize = 128 + if maxsize is not None and maxsize < bufsize: + bufsize = maxsize + try: + overlapped = win32.ReadFile(self._handle, bufsize, overlapped=True) + lenfirstchunk, complete = overlapped.GetOverlappedResult(True) + firstchunk = overlapped.getbuffer() + assert lenfirstchunk == len(firstchunk) + except IOError as e: + if e.errno == win32.ERROR_BROKEN_PIPE: + raise EOFError + raise + buf.write(firstchunk) + if complete: + return buf navail, nleft = win32.PeekNamedPipe(self._handle) if maxsize is not None and lenfirstchunk + nleft > maxsize: return None - lastchunk, complete = win32.ReadFile(self._handle, nleft) - assert complete - buf.write(lastchunk) + if nleft > 0: + overlapped = win32.ReadFile(self._handle, nleft, overlapped=True) + res, complete = overlapped.GetOverlappedResult(True) + assert res == nleft + assert complete + buf.write(overlapped.getbuffer()) return buf - def _poll(self, timeout): + def _poll(self, timeout, sentinels=()): + # Fast non-blocking path navail, nleft = win32.PeekNamedPipe(self._handle) if navail > 0: return True elif timeout == 0.0: return False - # Setup a polling loop (translated straight from old - # pipe_connection.c) + # Blocking: use overlapped I/O if timeout < 0.0: - deadline = None + timeout = INFINITE else: - deadline = time.time() + timeout - delay = 0.001 - max_delay = 0.02 - while True: - time.sleep(delay) - navail, nleft = win32.PeekNamedPipe(self._handle) - if navail > 0: - return True - if deadline and time.time() > deadline: - return False - if delay < max_delay: - delay += 0.001 + timeout = int(timeout * 1000 + 0.5) + overlapped = win32.ReadFile(self._handle, 1, overlapped=True) + try: + handles = [overlapped.event] + handles += sentinels + res = win32.WaitForMultipleObjects(handles, False, timeout) + finally: + # Always cancel overlapped I/O in the same thread + # (because CancelIoEx() appears only in Vista) + overlapped.cancel() + if res == WAIT_TIMEOUT: + return False + idx = res - WAIT_OBJECT_0 + if idx == 0: + # I/O was successful, store received data + overlapped.GetOverlappedResult(True) + self._buffered += overlapped.getbuffer() + return True + assert 0 < idx < len(handles) + raise SentinelReady([handles[idx]]) class Connection(_ConnectionBase): @@ -357,11 +395,18 @@ class Connection(_ConnectionBase): break buf = buf[n:] - def _recv(self, size, read=_read): + def _recv(self, size, sentinels=(), read=_read): buf = io.BytesIO() + handle = self._handle + if sentinels: + handles = [handle] + sentinels remaining = size while remaining > 0: - chunk = read(self._handle, remaining) + if sentinels: + r = _select(handles, [], [])[0] + if handle not in r: + raise SentinelReady(r) + chunk = read(handle, remaining) n = len(chunk) if n == 0: if remaining == size: @@ -381,15 +426,17 @@ class Connection(_ConnectionBase): if n > 0: self._send(buf) - def _recv_bytes(self, maxsize=None): - buf = self._recv(4) + def _recv_bytes(self, maxsize=None, sentinels=()): + buf = self._recv(4, sentinels) size, = struct.unpack("=i", buf.getvalue()) if maxsize is not None and size > maxsize: return None - return self._recv(size) + return self._recv(size, sentinels) def _poll(self, timeout): - r = select.select([self._handle], [], [], timeout)[0] + if timeout < 0.0: + timeout = None + r = _select([self._handle], [], [], timeout)[0] return bool(r) @@ -495,23 +542,21 @@ else: obsize, ibsize = 0, BUFSIZE h1 = win32.CreateNamedPipe( - address, openmode, + address, openmode | win32.FILE_FLAG_OVERLAPPED, win32.PIPE_TYPE_MESSAGE | win32.PIPE_READMODE_MESSAGE | win32.PIPE_WAIT, 1, obsize, ibsize, win32.NMPWAIT_WAIT_FOREVER, win32.NULL ) h2 = win32.CreateFile( - address, access, 0, win32.NULL, win32.OPEN_EXISTING, 0, win32.NULL + address, access, 0, win32.NULL, win32.OPEN_EXISTING, + win32.FILE_FLAG_OVERLAPPED, win32.NULL ) win32.SetNamedPipeHandleState( h2, win32.PIPE_READMODE_MESSAGE, None, None ) - try: - win32.ConnectNamedPipe(h1, win32.NULL) - except WindowsError as e: - if e.args[0] != win32.ERROR_PIPE_CONNECTED: - raise + overlapped = win32.ConnectNamedPipe(h1, overlapped=True) + overlapped.GetOverlappedResult(True) c1 = PipeConnection(h1, writable=duplex) c2 = PipeConnection(h2, readable=duplex) diff --git a/Lib/multiprocessing/forking.py b/Lib/multiprocessing/forking.py index 3c359cb28b..a2c61ef27e 100644 --- a/Lib/multiprocessing/forking.py +++ b/Lib/multiprocessing/forking.py @@ -35,6 +35,7 @@ import os import sys import signal +import select from multiprocessing import util, process diff --git a/Lib/multiprocessing/queues.py b/Lib/multiprocessing/queues.py index 3280a2533d..33243636a3 100644 --- a/Lib/multiprocessing/queues.py +++ b/Lib/multiprocessing/queues.py @@ -44,7 +44,7 @@ import weakref from queue import Empty, Full import _multiprocessing -from multiprocessing import Pipe +from multiprocessing.connection import Pipe, SentinelReady from multiprocessing.synchronize import Lock, BoundedSemaphore, Semaphore, Condition from multiprocessing.util import debug, info, Finalize, register_after_fork from multiprocessing.forking import assert_spawning @@ -372,10 +372,10 @@ class SimpleQueue(object): def _make_methods(self): recv = self._reader.recv racquire, rrelease = self._rlock.acquire, self._rlock.release - def get(): + def get(*, sentinels=None): racquire() try: - return recv() + return recv(sentinels) finally: rrelease() self.get = get -- cgit v1.2.1 From b560670f913da1e5b77bfa4be55383e10b6a6d44 Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Sat, 9 Jul 2011 01:03:00 +0200 Subject: Rebind locally the globals which can be looked up at shutdown (to avoid the warnings seen on a buildbot) --- Lib/multiprocessing/connection.py | 12 ++++++------ Lib/multiprocessing/util.py | 6 +++++- 2 files changed, 11 insertions(+), 7 deletions(-) (limited to 'Lib/multiprocessing') diff --git a/Lib/multiprocessing/connection.py b/Lib/multiprocessing/connection.py index ede2908eec..2661898f9f 100644 --- a/Lib/multiprocessing/connection.py +++ b/Lib/multiprocessing/connection.py @@ -290,8 +290,8 @@ if win32: """ _buffered = b'' - def _close(self): - win32.CloseHandle(self._handle) + def _close(self, _CloseHandle=win32.CloseHandle): + _CloseHandle(self._handle) def _send_bytes(self, buf): overlapped = win32.WriteFile(self._handle, buf, overlapped=True) @@ -376,13 +376,13 @@ class Connection(_ConnectionBase): """ if win32: - def _close(self): - win32.closesocket(self._handle) + def _close(self, _close=win32.closesocket): + _close(self._handle) _write = win32.send _read = win32.recv else: - def _close(self): - os.close(self._handle) + def _close(self, _close=os.close): + _close(self._handle) _write = os.write _read = os.read diff --git a/Lib/multiprocessing/util.py b/Lib/multiprocessing/util.py index b59ac9fc61..c48718026b 100644 --- a/Lib/multiprocessing/util.py +++ b/Lib/multiprocessing/util.py @@ -188,7 +188,11 @@ class Finalize(object): _finalizer_registry[self._key] = self - def __call__(self, wr=None): + def __call__(self, wr=None, + # Need to bind these locally because the globals can have + # been cleared at shutdown + _finalizer_registry=_finalizer_registry, + sub_debug=sub_debug): ''' Run the callback unless it has already been called or cancelled ''' -- cgit v1.2.1 From 7df83edf3f67ce19cf8ccb25087b2e76e5482451 Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Sat, 9 Jul 2011 01:03:46 +0200 Subject: Add comment --- Lib/multiprocessing/connection.py | 2 ++ 1 file changed, 2 insertions(+) (limited to 'Lib/multiprocessing') diff --git a/Lib/multiprocessing/connection.py b/Lib/multiprocessing/connection.py index 2661898f9f..ae2d135e03 100644 --- a/Lib/multiprocessing/connection.py +++ b/Lib/multiprocessing/connection.py @@ -149,6 +149,8 @@ class _ConnectionBase: self._readable = readable self._writable = writable + # XXX should we use util.Finalize instead of a __del__? + def __del__(self): if self._handle is not None: self._close() -- cgit v1.2.1 From 9a83222ed37f93f81f03fd7671df2df1ca2841e1 Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Sat, 16 Jul 2011 01:51:58 +0200 Subject: Silence spurious "broken pipe" tracebacks when shutting down a ProcessPoolExecutor. --- Lib/multiprocessing/queues.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) (limited to 'Lib/multiprocessing') diff --git a/Lib/multiprocessing/queues.py b/Lib/multiprocessing/queues.py index 33243636a3..4696ccc5dc 100644 --- a/Lib/multiprocessing/queues.py +++ b/Lib/multiprocessing/queues.py @@ -41,6 +41,7 @@ import collections import time import atexit import weakref +import errno from queue import Empty, Full import _multiprocessing @@ -67,6 +68,8 @@ class Queue(object): else: self._wlock = Lock() self._sem = BoundedSemaphore(maxsize) + # For use by concurrent.futures + self._ignore_epipe = False self._after_fork() @@ -178,7 +181,7 @@ class Queue(object): self._thread = threading.Thread( target=Queue._feed, args=(self._buffer, self._notempty, self._send, - self._wlock, self._writer.close), + self._wlock, self._writer.close, self._ignore_epipe), name='QueueFeederThread' ) self._thread.daemon = True @@ -229,7 +232,7 @@ class Queue(object): notempty.release() @staticmethod - def _feed(buffer, notempty, send, writelock, close): + def _feed(buffer, notempty, send, writelock, close, ignore_epipe): debug('starting thread to feed data to pipe') from .util import is_exiting @@ -271,6 +274,8 @@ class Queue(object): except IndexError: pass except Exception as e: + if ignore_epipe and getattr(e, 'errno', 0) == errno.EPIPE: + return # Since this runs in a daemon thread the resources it uses # may be become unusable while the process is cleaning up. # We ignore errors which happen after the process has -- cgit v1.2.1 From 24af0bf30b37b2251d1582bdd19290687391e1ca Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Wed, 20 Jul 2011 02:01:39 +0200 Subject: Fix test_multiprocessing failure under Windows. (followup to dfaa3a149a92) --- Lib/multiprocessing/queues.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'Lib/multiprocessing') diff --git a/Lib/multiprocessing/queues.py b/Lib/multiprocessing/queues.py index 4696ccc5dc..bb4c7d874f 100644 --- a/Lib/multiprocessing/queues.py +++ b/Lib/multiprocessing/queues.py @@ -78,11 +78,11 @@ class Queue(object): def __getstate__(self): assert_spawning(self) - return (self._maxsize, self._reader, self._writer, + return (self._ignore_epipe, self._maxsize, self._reader, self._writer, self._rlock, self._wlock, self._sem, self._opid) def __setstate__(self, state): - (self._maxsize, self._reader, self._writer, + (self._ignore_epipe, self._maxsize, self._reader, self._writer, self._rlock, self._wlock, self._sem, self._opid) = state self._after_fork() -- cgit v1.2.1 From 5ad70b684e0eac7852bfffe225be4731b4e6ac20 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=89ric=20Araujo?= Date: Thu, 28 Jul 2011 00:28:28 +0200 Subject: Remove indirection in threading (issue #10968). The public names (Thread, Condition, etc.) used to be factory functions returning instances of hidden classes (_Thread, _Condition, etc.), because (if Guido recalls correctly) this code pre-dates the ability to subclass extension types. It is now possible to inherit from Thread and other classes, without having to import the private underscored names like multiprocessing did. A doc update will follow: a patch is under discussion on the issue. --- Lib/multiprocessing/dummy/__init__.py | 13 +------------ 1 file changed, 1 insertion(+), 12 deletions(-) (limited to 'Lib/multiprocessing') diff --git a/Lib/multiprocessing/dummy/__init__.py b/Lib/multiprocessing/dummy/__init__.py index c4933d9ec5..60add922bf 100644 --- a/Lib/multiprocessing/dummy/__init__.py +++ b/Lib/multiprocessing/dummy/__init__.py @@ -51,7 +51,7 @@ import itertools from multiprocessing import TimeoutError, cpu_count from multiprocessing.dummy.connection import Pipe from threading import Lock, RLock, Semaphore, BoundedSemaphore -from threading import Event +from threading import Event, Condition from queue import Queue # @@ -84,17 +84,6 @@ class DummyProcess(threading.Thread): # # -class Condition(threading._Condition): - # XXX - if sys.version_info < (3, 0): - notify_all = threading._Condition.notify_all.__func__ - else: - notify_all = threading._Condition.notify_all - -# -# -# - Process = DummyProcess current_process = threading.current_thread current_process()._children = weakref.WeakKeyDictionary() -- cgit v1.2.1 From aa253acaa212426f38c64ac2a1c7f06c75f70781 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Charles-Fran=C3=A7ois=20Natali?= Date: Tue, 20 Sep 2011 19:27:39 +0200 Subject: Issue #12996: multiprocessing.connection: transmit the header in network byte order (endpoints machines can have different endianness). --- Lib/multiprocessing/connection.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'Lib/multiprocessing') diff --git a/Lib/multiprocessing/connection.py b/Lib/multiprocessing/connection.py index ae2d135e03..13d3d777f2 100644 --- a/Lib/multiprocessing/connection.py +++ b/Lib/multiprocessing/connection.py @@ -422,7 +422,7 @@ class Connection(_ConnectionBase): def _send_bytes(self, buf): # For wire compatibility with 3.2 and lower n = len(buf) - self._send(struct.pack("=i", len(buf))) + self._send(struct.pack("!i", n)) # The condition is necessary to avoid "broken pipe" errors # when sending a 0-length buffer if the other end closed the pipe. if n > 0: @@ -430,7 +430,7 @@ class Connection(_ConnectionBase): def _recv_bytes(self, maxsize=None, sentinels=()): buf = self._recv(4, sentinels) - size, = struct.unpack("=i", buf.getvalue()) + size, = struct.unpack("!i", buf.getvalue()) if maxsize is not None and size > maxsize: return None return self._recv(size, sentinels) -- cgit v1.2.1 From 108edaf29073feabc1c478fbba107be997da4f17 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Charles-Fran=C3=A7ois=20Natali?= Date: Sat, 24 Sep 2011 20:04:29 +0200 Subject: Issue #12981: rewrite multiprocessing_{sendfd,recvfd} in Python. --- Lib/multiprocessing/reduction.py | 21 ++++++++++++++++++--- 1 file changed, 18 insertions(+), 3 deletions(-) (limited to 'Lib/multiprocessing') diff --git a/Lib/multiprocessing/reduction.py b/Lib/multiprocessing/reduction.py index b32c72590a..042a1368e0 100644 --- a/Lib/multiprocessing/reduction.py +++ b/Lib/multiprocessing/reduction.py @@ -39,6 +39,7 @@ import os import sys import socket import threading +import struct import _multiprocessing from multiprocessing import current_process @@ -51,7 +52,8 @@ from multiprocessing.connection import Client, Listener, Connection # # -if not(sys.platform == 'win32' or hasattr(_multiprocessing, 'recvfd')): +if not(sys.platform == 'win32' or (hasattr(socket, 'CMSG_LEN') and + hasattr(socket, 'SCM_RIGHTS'))): raise ImportError('pickling of connections not supported') # @@ -77,10 +79,23 @@ if sys.platform == 'win32': else: def send_handle(conn, handle, destination_pid): - _multiprocessing.sendfd(conn.fileno(), handle) + with socket.fromfd(conn.fileno(), socket.AF_UNIX, socket.SOCK_STREAM) as s: + s.sendmsg([b'x'], [(socket.SOL_SOCKET, socket.SCM_RIGHTS, + struct.pack("@i", handle))]) def recv_handle(conn): - return _multiprocessing.recvfd(conn.fileno()) + size = struct.calcsize("@i") + with socket.fromfd(conn.fileno(), socket.AF_UNIX, socket.SOCK_STREAM) as s: + msg, ancdata, flags, addr = s.recvmsg(1, socket.CMSG_LEN(size)) + try: + cmsg_level, cmsg_type, cmsg_data = ancdata[0] + if (cmsg_level == socket.SOL_SOCKET and + cmsg_type == socket.SCM_RIGHTS): + return struct.unpack("@i", cmsg_data[:size])[0] + except (ValueError, IndexError, struct.error): + pass + raise RuntimeError('Invalid data received') + # # Support for a per-process server thread which caches pickled handles -- cgit v1.2.1 From f52a051081e30c3421e009d1369e4f6be081fd93 Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Wed, 12 Oct 2011 02:54:14 +0200 Subject: PEP 3151 / issue #12555: reworking the OS and IO exception hierarchy. --- Lib/multiprocessing/connection.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) (limited to 'Lib/multiprocessing') diff --git a/Lib/multiprocessing/connection.py b/Lib/multiprocessing/connection.py index 13d3d777f2..0c9695830e 100644 --- a/Lib/multiprocessing/connection.py +++ b/Lib/multiprocessing/connection.py @@ -321,7 +321,7 @@ if win32: firstchunk = overlapped.getbuffer() assert lenfirstchunk == len(firstchunk) except IOError as e: - if e.errno == win32.ERROR_BROKEN_PIPE: + if e.winerror == win32.ERROR_BROKEN_PIPE: raise EOFError raise buf.write(firstchunk) @@ -669,7 +669,7 @@ if sys.platform == 'win32': try: win32.ConnectNamedPipe(handle, win32.NULL) except WindowsError as e: - if e.args[0] != win32.ERROR_PIPE_CONNECTED: + if e.winerror != win32.ERROR_PIPE_CONNECTED: raise return PipeConnection(handle) @@ -692,8 +692,8 @@ if sys.platform == 'win32': 0, win32.NULL, win32.OPEN_EXISTING, 0, win32.NULL ) except WindowsError as e: - if e.args[0] not in (win32.ERROR_SEM_TIMEOUT, - win32.ERROR_PIPE_BUSY) or _check_timeout(t): + if e.winerror not in (win32.ERROR_SEM_TIMEOUT, + win32.ERROR_PIPE_BUSY) or _check_timeout(t): raise else: break -- cgit v1.2.1 From d5873fb201eca5d043139ba9b22a32b24e64b91d Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Sun, 23 Oct 2011 23:49:42 +0200 Subject: Use InterruptedError instead of checking for EINTR --- Lib/multiprocessing/util.py | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) (limited to 'Lib/multiprocessing') diff --git a/Lib/multiprocessing/util.py b/Lib/multiprocessing/util.py index c48718026b..5c26683bf3 100644 --- a/Lib/multiprocessing/util.py +++ b/Lib/multiprocessing/util.py @@ -327,15 +327,12 @@ class ForkAwareLocal(threading.local): # Automatic retry after EINTR # -def _eintr_retry(func, _errors=(EnvironmentError, select.error)): +def _eintr_retry(func): @functools.wraps(func) def wrapped(*args, **kwargs): while True: try: return func(*args, **kwargs) - except _errors as e: - # select.error has no `errno` attribute - if e.args[0] == errno.EINTR: - continue - raise + except InterruptedError: + continue return wrapped -- cgit v1.2.1 From 538a7efe8a864f96015957cff8d8c211582b7b27 Mon Sep 17 00:00:00 2001 From: Florent Xicluna Date: Fri, 11 Nov 2011 20:05:50 +0100 Subject: Remove unused or redundant imports in concurrent.futures and multiprocessing. --- Lib/multiprocessing/dummy/__init__.py | 2 -- Lib/multiprocessing/forking.py | 8 ++------ Lib/multiprocessing/heap.py | 1 - Lib/multiprocessing/managers.py | 6 +----- Lib/multiprocessing/queues.py | 1 - Lib/multiprocessing/reduction.py | 2 -- Lib/multiprocessing/sharedctypes.py | 1 - Lib/multiprocessing/synchronize.py | 5 +---- Lib/multiprocessing/util.py | 3 +-- 9 files changed, 5 insertions(+), 24 deletions(-) (limited to 'Lib/multiprocessing') diff --git a/Lib/multiprocessing/dummy/__init__.py b/Lib/multiprocessing/dummy/__init__.py index 60add922bf..056acfcccc 100644 --- a/Lib/multiprocessing/dummy/__init__.py +++ b/Lib/multiprocessing/dummy/__init__.py @@ -46,9 +46,7 @@ import threading import sys import weakref import array -import itertools -from multiprocessing import TimeoutError, cpu_count from multiprocessing.dummy.connection import Pipe from threading import Lock, RLock, Semaphore, BoundedSemaphore from threading import Event, Condition diff --git a/Lib/multiprocessing/forking.py b/Lib/multiprocessing/forking.py index a2c61ef27e..47746cb179 100644 --- a/Lib/multiprocessing/forking.py +++ b/Lib/multiprocessing/forking.py @@ -35,7 +35,6 @@ import os import sys import signal -import select from multiprocessing import util, process @@ -101,7 +100,6 @@ else: # if sys.platform != 'win32': - import time import select exit = os._exit @@ -170,7 +168,7 @@ if sys.platform != 'win32': if self.returncode is None: try: os.kill(self.pid, signal.SIGTERM) - except OSError as e: + except OSError: if self.wait(timeout=0.1) is None: raise @@ -186,11 +184,9 @@ else: import _thread import msvcrt import _subprocess - import time - from pickle import dump, load, HIGHEST_PROTOCOL + from pickle import load, HIGHEST_PROTOCOL from _multiprocessing import win32 - from .util import Finalize def dump(obj, file, protocol=None): ForkingPickler(file, protocol).dump(obj) diff --git a/Lib/multiprocessing/heap.py b/Lib/multiprocessing/heap.py index 0a25ef05c7..7366bd2b8b 100644 --- a/Lib/multiprocessing/heap.py +++ b/Lib/multiprocessing/heap.py @@ -34,7 +34,6 @@ import bisect import mmap -import tempfile import os import sys import threading diff --git a/Lib/multiprocessing/managers.py b/Lib/multiprocessing/managers.py index 5a57288caa..f42d353f0d 100644 --- a/Lib/multiprocessing/managers.py +++ b/Lib/multiprocessing/managers.py @@ -39,19 +39,15 @@ __all__ = [ 'BaseManager', 'SyncManager', 'BaseProxy', 'Token' ] # Imports # -import os import sys -import weakref import threading import array import queue from traceback import format_exc -from pickle import PicklingError from multiprocessing import Process, current_process, active_children, Pool, util, connection from multiprocessing.process import AuthenticationString -from multiprocessing.forking import exit, Popen, assert_spawning, ForkingPickler -from multiprocessing.util import Finalize, info +from multiprocessing.forking import exit, Popen, ForkingPickler # # Register some things for pickling diff --git a/Lib/multiprocessing/queues.py b/Lib/multiprocessing/queues.py index cf51307c28..c4f9cda499 100644 --- a/Lib/multiprocessing/queues.py +++ b/Lib/multiprocessing/queues.py @@ -39,7 +39,6 @@ import os import threading import collections import time -import atexit import weakref import errno diff --git a/Lib/multiprocessing/reduction.py b/Lib/multiprocessing/reduction.py index 042a1368e0..dda4a4120b 100644 --- a/Lib/multiprocessing/reduction.py +++ b/Lib/multiprocessing/reduction.py @@ -41,7 +41,6 @@ import socket import threading import struct -import _multiprocessing from multiprocessing import current_process from multiprocessing.forking import Popen, duplicate, close, ForkingPickler from multiprocessing.util import register_after_fork, debug, sub_debug @@ -61,7 +60,6 @@ if not(sys.platform == 'win32' or (hasattr(socket, 'CMSG_LEN') and # if sys.platform == 'win32': - import _subprocess from _multiprocessing import win32 def send_handle(conn, handle, destination_pid): diff --git a/Lib/multiprocessing/sharedctypes.py b/Lib/multiprocessing/sharedctypes.py index 1e694da49d..5826379530 100644 --- a/Lib/multiprocessing/sharedctypes.py +++ b/Lib/multiprocessing/sharedctypes.py @@ -32,7 +32,6 @@ # SUCH DAMAGE. # -import sys import ctypes import weakref diff --git a/Lib/multiprocessing/synchronize.py b/Lib/multiprocessing/synchronize.py index 70ae82569c..e35bbff185 100644 --- a/Lib/multiprocessing/synchronize.py +++ b/Lib/multiprocessing/synchronize.py @@ -37,14 +37,11 @@ __all__ = [ ] import threading -import os import sys -from time import time as _time, sleep as _sleep - import _multiprocessing from multiprocessing.process import current_process -from multiprocessing.util import Finalize, register_after_fork, debug +from multiprocessing.util import register_after_fork, debug from multiprocessing.forking import assert_spawning, Popen # Try to import the mp.synchronize module cleanly, if it fails diff --git a/Lib/multiprocessing/util.py b/Lib/multiprocessing/util.py index 5c26683bf3..0bbb87ed35 100644 --- a/Lib/multiprocessing/util.py +++ b/Lib/multiprocessing/util.py @@ -36,7 +36,6 @@ import functools import itertools import weakref import atexit -import select import threading # we want threading to install it's # cleanup function before multiprocessing does @@ -86,7 +85,7 @@ def get_logger(): Returns logger used by multiprocessing ''' global _logger - import logging, atexit + import logging logging._acquireLock() try: -- cgit v1.2.1 From 58a15909c1c82d6122b9a169be15a2bc98d0bf7b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Charles-Fran=C3=A7ois=20Natali?= Date: Sat, 19 Nov 2011 09:59:43 +0100 Subject: Issue #13215: multiprocessing.Connection: don't hammer the remote end with retries in case of ECONNREFUSED. --- Lib/multiprocessing/connection.py | 16 +--------------- 1 file changed, 1 insertion(+), 15 deletions(-) (limited to 'Lib/multiprocessing') diff --git a/Lib/multiprocessing/connection.py b/Lib/multiprocessing/connection.py index 0c9695830e..c6c6113834 100644 --- a/Lib/multiprocessing/connection.py +++ b/Lib/multiprocessing/connection.py @@ -608,21 +608,7 @@ def SocketClient(address): ''' family = address_type(address) with socket.socket( getattr(socket, family) ) as s: - t = _init_timeout() - - while 1: - try: - s.connect(address) - except socket.error as e: - if e.args[0] != errno.ECONNREFUSED or _check_timeout(t): - debug('failed to connect to address %s', address) - raise - time.sleep(0.01) - else: - break - else: - raise - + s.connect(address) fd = duplicate(s.fileno()) conn = Connection(fd) return conn -- cgit v1.2.1 From 56ec2a8ca7b4c9c27b6714cb3f82e13030e0b240 Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Wed, 21 Dec 2011 11:03:24 +0100 Subject: Issue #12708: Add starmap() and starmap_async() methods (similar to itertools.starmap()) to multiprocessing.Pool. Patch by Hynek Schlawack. --- Lib/multiprocessing/managers.py | 3 ++- Lib/multiprocessing/pool.py | 32 ++++++++++++++++++++++++++++++-- 2 files changed, 32 insertions(+), 3 deletions(-) (limited to 'Lib/multiprocessing') diff --git a/Lib/multiprocessing/managers.py b/Lib/multiprocessing/managers.py index f42d353f0d..5cae4c1548 100644 --- a/Lib/multiprocessing/managers.py +++ b/Lib/multiprocessing/managers.py @@ -1066,11 +1066,12 @@ ArrayProxy = MakeProxyType('ArrayProxy', ( PoolProxy = MakeProxyType('PoolProxy', ( 'apply', 'apply_async', 'close', 'imap', 'imap_unordered', 'join', - 'map', 'map_async', 'terminate' + 'map', 'map_async', 'starmap', 'starmap_async', 'terminate' )) PoolProxy._method_to_typeid_ = { 'apply_async': 'AsyncResult', 'map_async': 'AsyncResult', + 'starmap_async': 'AsyncResult', 'imap': 'Iterator', 'imap_unordered': 'Iterator' } diff --git a/Lib/multiprocessing/pool.py b/Lib/multiprocessing/pool.py index 0c29e644ff..7039d1679e 100644 --- a/Lib/multiprocessing/pool.py +++ b/Lib/multiprocessing/pool.py @@ -64,6 +64,9 @@ job_counter = itertools.count() def mapstar(args): return list(map(*args)) +def starmapstar(args): + return list(itertools.starmap(args[0], args[1])) + # # Code run by worker processes # @@ -248,7 +251,25 @@ class Pool(object): in a list that is returned. ''' assert self._state == RUN - return self.map_async(func, iterable, chunksize).get() + return self._map_async(func, iterable, mapstar, chunksize).get() + + def starmap(self, func, iterable, chunksize=None): + ''' + Like `map()` method but the elements of the `iterable` are expected to + be iterables as well and will be unpacked as arguments. Hence + `func` and (a, b) becomes func(a, b). + ''' + assert self._state == RUN + return self._map_async(func, iterable, starmapstar, chunksize).get() + + def starmap_async(self, func, iterable, chunksize=None, callback=None, + error_callback=None): + ''' + Asynchronous version of `starmap()` method. + ''' + assert self._state == RUN + return self._map_async(func, iterable, starmapstar, chunksize, + callback, error_callback) def imap(self, func, iterable, chunksize=1): ''' @@ -302,6 +323,13 @@ class Pool(object): Asynchronous version of `map()` method. ''' assert self._state == RUN + return self._map_async(func, iterable, mapstar, chunksize) + + def _map_async(self, func, iterable, mapper, chunksize=None, callback=None, + error_callback=None): + ''' + Helper function to implement map, starmap and their async counterparts. + ''' if not hasattr(iterable, '__len__'): iterable = list(iterable) @@ -315,7 +343,7 @@ class Pool(object): task_batches = Pool._get_tasks(func, iterable, chunksize) result = MapResult(self._cache, chunksize, len(iterable), callback, error_callback=error_callback) - self._taskqueue.put((((result._job, i, mapstar, (x,), {}) + self._taskqueue.put((((result._job, i, mapper, (x,), {}) for i, x in enumerate(task_batches)), None)) return result -- cgit v1.2.1 From 1e7e57edb5cd1f0547ef3903cf8f13dee532704f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Charles-Fran=C3=A7ois=20Natali?= Date: Sat, 4 Feb 2012 15:12:08 +0100 Subject: Issue #8184: Fix a potential file descriptor leak when a multiprocessing.Connection socket can't be bound. --- Lib/multiprocessing/connection.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) (limited to 'Lib/multiprocessing') diff --git a/Lib/multiprocessing/connection.py b/Lib/multiprocessing/connection.py index c6c6113834..615f55dd22 100644 --- a/Lib/multiprocessing/connection.py +++ b/Lib/multiprocessing/connection.py @@ -575,10 +575,14 @@ class SocketListener(object): ''' def __init__(self, address, family, backlog=1): self._socket = socket.socket(getattr(socket, family)) - self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - self._socket.bind(address) - self._socket.listen(backlog) - self._address = self._socket.getsockname() + try: + self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + self._socket.bind(address) + self._socket.listen(backlog) + self._address = self._socket.getsockname() + except OSError: + self._socket.close() + raise self._family = family self._last_accepted = None -- cgit v1.2.1 From 818356540ccf1fb471e371ce68951e5638a9bdd9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Charles-Fran=C3=A7ois=20Natali?= Date: Wed, 8 Feb 2012 21:15:58 +0100 Subject: Issue #8184: multiprocessing: On Windows, don't set SO_REUSEADDR on Connection sockets, and set FILE_FLAG_FIRST_PIPE_INSTANCE on named pipes, to make sure two listeners can't bind to the same socket/pipe (or any existing socket/pipe). --- Lib/multiprocessing/connection.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) (limited to 'Lib/multiprocessing') diff --git a/Lib/multiprocessing/connection.py b/Lib/multiprocessing/connection.py index 615f55dd22..8807618ed3 100644 --- a/Lib/multiprocessing/connection.py +++ b/Lib/multiprocessing/connection.py @@ -544,7 +544,8 @@ else: obsize, ibsize = 0, BUFSIZE h1 = win32.CreateNamedPipe( - address, openmode | win32.FILE_FLAG_OVERLAPPED, + address, openmode | win32.FILE_FLAG_OVERLAPPED | + win32.FILE_FLAG_FIRST_PIPE_INSTANCE, win32.PIPE_TYPE_MESSAGE | win32.PIPE_READMODE_MESSAGE | win32.PIPE_WAIT, 1, obsize, ibsize, win32.NMPWAIT_WAIT_FOREVER, win32.NULL @@ -576,7 +577,10 @@ class SocketListener(object): def __init__(self, address, family, backlog=1): self._socket = socket.socket(getattr(socket, family)) try: - self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + # SO_REUSEADDR has different semantics on Windows (issue #2550). + if os.name == 'posix': + self._socket.setsockopt(socket.SOL_SOCKET, + socket.SO_REUSEADDR, 1) self._socket.bind(address) self._socket.listen(backlog) self._address = self._socket.getsockname() @@ -630,7 +634,8 @@ if sys.platform == 'win32': def __init__(self, address, backlog=None): self._address = address handle = win32.CreateNamedPipe( - address, win32.PIPE_ACCESS_DUPLEX, + address, win32.PIPE_ACCESS_DUPLEX | + win32.FILE_FLAG_FIRST_PIPE_INSTANCE, win32.PIPE_TYPE_MESSAGE | win32.PIPE_READMODE_MESSAGE | win32.PIPE_WAIT, win32.PIPE_UNLIMITED_INSTANCES, BUFSIZE, BUFSIZE, -- cgit v1.2.1 From 0e0ccbf2431f5e426916ed2b9ae083c6244285a5 Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Mon, 5 Mar 2012 19:28:37 +0100 Subject: Issue #12328: Fix multiprocessing's use of overlapped I/O on Windows. Also, add a multiprocessing.connection.wait(rlist, timeout=None) function for polling multiple objects at once. Patch by sbt. Complete changelist from sbt's patch: * Adds a wait(rlist, timeout=None) function for polling multiple objects at once. On Unix this is just a wrapper for select(rlist, [], [], timeout=None). * Removes use of the SentinelReady exception and the sentinels argument to certain methods. concurrent.futures.process has been changed to use wait() instead of SentinelReady. * Fixes bugs concerning PipeConnection.poll() and messages of zero length. * Fixes PipeListener.accept() to call ConnectNamedPipe() with overlapped=True. * Fixes Queue.empty() and SimpleQueue.empty() so that they are threadsafe on Windows. * Now PipeConnection.poll() and wait() will not modify the pipe except possibly by consuming a zero length message. (Previously poll() could consume a partial message.) * All of multiprocesing's pipe related blocking functions/methods are now interruptible by SIGINT on Windows. --- Lib/multiprocessing/connection.py | 324 +++++++++++++++++++++++++------------- Lib/multiprocessing/queues.py | 9 +- 2 files changed, 217 insertions(+), 116 deletions(-) (limited to 'Lib/multiprocessing') diff --git a/Lib/multiprocessing/connection.py b/Lib/multiprocessing/connection.py index 8807618ed3..ca0c9731a7 100644 --- a/Lib/multiprocessing/connection.py +++ b/Lib/multiprocessing/connection.py @@ -32,7 +32,7 @@ # SUCH DAMAGE. # -__all__ = [ 'Client', 'Listener', 'Pipe' ] +__all__ = [ 'Client', 'Listener', 'Pipe', 'wait' ] import io import os @@ -58,8 +58,6 @@ except ImportError: raise win32 = None -_select = _eintr_retry(select.select) - # # # @@ -122,15 +120,6 @@ def address_type(address): else: raise ValueError('address type of %r unrecognized' % address) - -class SentinelReady(Exception): - """ - Raised when a sentinel is ready when polling. - """ - def __init__(self, *args): - Exception.__init__(self, *args) - self.sentinels = args[0] - # # Connection classes # @@ -268,11 +257,11 @@ class _ConnectionBase: (offset + size) // itemsize]) return size - def recv(self, sentinels=None): + def recv(self): """Receive a (picklable) object""" self._check_closed() self._check_readable() - buf = self._recv_bytes(sentinels=sentinels) + buf = self._recv_bytes() return pickle.loads(buf.getbuffer()) def poll(self, timeout=0.0): @@ -290,85 +279,80 @@ if win32: Overlapped I/O is used, so the handles must have been created with FILE_FLAG_OVERLAPPED. """ - _buffered = b'' + _got_empty_message = False def _close(self, _CloseHandle=win32.CloseHandle): _CloseHandle(self._handle) def _send_bytes(self, buf): - overlapped = win32.WriteFile(self._handle, buf, overlapped=True) - nwritten, complete = overlapped.GetOverlappedResult(True) - assert complete + ov, err = win32.WriteFile(self._handle, buf, overlapped=True) + try: + if err == win32.ERROR_IO_PENDING: + waitres = win32.WaitForMultipleObjects( + [ov.event], False, INFINITE) + assert waitres == WAIT_OBJECT_0 + except: + ov.cancel() + raise + finally: + nwritten, err = ov.GetOverlappedResult(True) + assert err == 0 assert nwritten == len(buf) - def _recv_bytes(self, maxsize=None, sentinels=()): - if sentinels: - self._poll(-1.0, sentinels) - buf = io.BytesIO() - firstchunk = self._buffered - if firstchunk: - lenfirstchunk = len(firstchunk) - buf.write(firstchunk) - self._buffered = b'' + def _recv_bytes(self, maxsize=None): + if self._got_empty_message: + self._got_empty_message = False + return io.BytesIO() else: - # A reasonable size for the first chunk transfer - bufsize = 128 - if maxsize is not None and maxsize < bufsize: - bufsize = maxsize + bsize = 128 if maxsize is None else min(maxsize, 128) try: - overlapped = win32.ReadFile(self._handle, bufsize, overlapped=True) - lenfirstchunk, complete = overlapped.GetOverlappedResult(True) - firstchunk = overlapped.getbuffer() - assert lenfirstchunk == len(firstchunk) + ov, err = win32.ReadFile(self._handle, bsize, + overlapped=True) + try: + if err == win32.ERROR_IO_PENDING: + waitres = win32.WaitForMultipleObjects( + [ov.event], False, INFINITE) + assert waitres == WAIT_OBJECT_0 + except: + ov.cancel() + raise + finally: + nread, err = ov.GetOverlappedResult(True) + if err == 0: + f = io.BytesIO() + f.write(ov.getbuffer()) + return f + elif err == win32.ERROR_MORE_DATA: + return self._get_more_data(ov, maxsize) except IOError as e: if e.winerror == win32.ERROR_BROKEN_PIPE: raise EOFError - raise - buf.write(firstchunk) - if complete: - return buf - navail, nleft = win32.PeekNamedPipe(self._handle) - if maxsize is not None and lenfirstchunk + nleft > maxsize: - return None - if nleft > 0: - overlapped = win32.ReadFile(self._handle, nleft, overlapped=True) - res, complete = overlapped.GetOverlappedResult(True) - assert res == nleft - assert complete - buf.write(overlapped.getbuffer()) - return buf - - def _poll(self, timeout, sentinels=()): - # Fast non-blocking path - navail, nleft = win32.PeekNamedPipe(self._handle) - if navail > 0: - return True - elif timeout == 0.0: - return False - # Blocking: use overlapped I/O - if timeout < 0.0: - timeout = INFINITE - else: - timeout = int(timeout * 1000 + 0.5) - overlapped = win32.ReadFile(self._handle, 1, overlapped=True) - try: - handles = [overlapped.event] - handles += sentinels - res = win32.WaitForMultipleObjects(handles, False, timeout) - finally: - # Always cancel overlapped I/O in the same thread - # (because CancelIoEx() appears only in Vista) - overlapped.cancel() - if res == WAIT_TIMEOUT: - return False - idx = res - WAIT_OBJECT_0 - if idx == 0: - # I/O was successful, store received data - overlapped.GetOverlappedResult(True) - self._buffered += overlapped.getbuffer() + else: + raise + raise RuntimeError("shouldn't get here; expected KeyboardInterrupt") + + def _poll(self, timeout): + if (self._got_empty_message or + win32.PeekNamedPipe(self._handle)[0] != 0): return True - assert 0 < idx < len(handles) - raise SentinelReady([handles[idx]]) + if timeout < 0: + timeout = None + return bool(wait([self], timeout)) + + def _get_more_data(self, ov, maxsize): + buf = ov.getbuffer() + f = io.BytesIO() + f.write(buf) + left = win32.PeekNamedPipe(self._handle)[1] + assert left > 0 + if maxsize is not None and len(buf) + left > maxsize: + self._bad_message_length() + ov, err = win32.ReadFile(self._handle, left, overlapped=True) + rbytes, err = ov.GetOverlappedResult(True) + assert err == 0 + assert rbytes == left + f.write(ov.getbuffer()) + return f class Connection(_ConnectionBase): @@ -397,17 +381,11 @@ class Connection(_ConnectionBase): break buf = buf[n:] - def _recv(self, size, sentinels=(), read=_read): + def _recv(self, size, read=_read): buf = io.BytesIO() handle = self._handle - if sentinels: - handles = [handle] + sentinels remaining = size while remaining > 0: - if sentinels: - r = _select(handles, [], [])[0] - if handle not in r: - raise SentinelReady(r) chunk = read(handle, remaining) n = len(chunk) if n == 0: @@ -428,17 +406,17 @@ class Connection(_ConnectionBase): if n > 0: self._send(buf) - def _recv_bytes(self, maxsize=None, sentinels=()): - buf = self._recv(4, sentinels) + def _recv_bytes(self, maxsize=None): + buf = self._recv(4) size, = struct.unpack("!i", buf.getvalue()) if maxsize is not None and size > maxsize: return None - return self._recv(size, sentinels) + return self._recv(size) def _poll(self, timeout): if timeout < 0.0: timeout = None - r = _select([self._handle], [], [], timeout)[0] + r = wait([self._handle], timeout) return bool(r) @@ -559,7 +537,8 @@ else: ) overlapped = win32.ConnectNamedPipe(h1, overlapped=True) - overlapped.GetOverlappedResult(True) + _, err = overlapped.GetOverlappedResult(True) + assert err == 0 c1 = PipeConnection(h1, writable=duplex) c2 = PipeConnection(h2, readable=duplex) @@ -633,39 +612,40 @@ if sys.platform == 'win32': ''' def __init__(self, address, backlog=None): self._address = address - handle = win32.CreateNamedPipe( - address, win32.PIPE_ACCESS_DUPLEX | - win32.FILE_FLAG_FIRST_PIPE_INSTANCE, - win32.PIPE_TYPE_MESSAGE | win32.PIPE_READMODE_MESSAGE | - win32.PIPE_WAIT, - win32.PIPE_UNLIMITED_INSTANCES, BUFSIZE, BUFSIZE, - win32.NMPWAIT_WAIT_FOREVER, win32.NULL - ) - self._handle_queue = [handle] - self._last_accepted = None + self._handle_queue = [self._new_handle(first=True)] + self._last_accepted = None sub_debug('listener created with address=%r', self._address) - self.close = Finalize( self, PipeListener._finalize_pipe_listener, args=(self._handle_queue, self._address), exitpriority=0 ) - def accept(self): - newhandle = win32.CreateNamedPipe( - self._address, win32.PIPE_ACCESS_DUPLEX, + def _new_handle(self, first=False): + flags = win32.PIPE_ACCESS_DUPLEX | win32.FILE_FLAG_OVERLAPPED + if first: + flags |= win32.FILE_FLAG_FIRST_PIPE_INSTANCE + return win32.CreateNamedPipe( + self._address, flags, win32.PIPE_TYPE_MESSAGE | win32.PIPE_READMODE_MESSAGE | win32.PIPE_WAIT, win32.PIPE_UNLIMITED_INSTANCES, BUFSIZE, BUFSIZE, win32.NMPWAIT_WAIT_FOREVER, win32.NULL ) - self._handle_queue.append(newhandle) + + def accept(self): + self._handle_queue.append(self._new_handle()) handle = self._handle_queue.pop(0) + ov = win32.ConnectNamedPipe(handle, overlapped=True) try: - win32.ConnectNamedPipe(handle, win32.NULL) - except WindowsError as e: - if e.winerror != win32.ERROR_PIPE_CONNECTED: - raise + res = win32.WaitForMultipleObjects([ov.event], False, INFINITE) + except: + ov.cancel() + win32.CloseHandle(handle) + raise + finally: + _, err = ov.GetOverlappedResult(True) + assert err == 0 return PipeConnection(handle) @staticmethod @@ -684,7 +664,8 @@ if sys.platform == 'win32': win32.WaitNamedPipe(address, 1000) h = win32.CreateFile( address, win32.GENERIC_READ | win32.GENERIC_WRITE, - 0, win32.NULL, win32.OPEN_EXISTING, 0, win32.NULL + 0, win32.NULL, win32.OPEN_EXISTING, + win32.FILE_FLAG_OVERLAPPED, win32.NULL ) except WindowsError as e: if e.winerror not in (win32.ERROR_SEM_TIMEOUT, @@ -773,6 +754,125 @@ def XmlClient(*args, **kwds): import xmlrpc.client as xmlrpclib return ConnectionWrapper(Client(*args, **kwds), _xml_dumps, _xml_loads) +# +# Wait +# + +if sys.platform == 'win32': + + def _exhaustive_wait(handles, timeout): + # Return ALL handles which are currently signalled. (Only + # returning the first signalled might create starvation issues.) + L = list(handles) + ready = [] + while L: + res = win32.WaitForMultipleObjects(L, False, timeout) + if res == WAIT_TIMEOUT: + break + elif WAIT_OBJECT_0 <= res < WAIT_OBJECT_0 + len(L): + res -= WAIT_OBJECT_0 + elif WAIT_ABANDONED_0 <= res < WAIT_ABANDONED_0 + len(L): + res -= WAIT_ABANDONED_0 + else: + raise RuntimeError('Should not get here') + ready.append(L[res]) + L = L[res+1:] + timeout = 0 + return ready + + _ready_errors = {win32.ERROR_BROKEN_PIPE, win32.ERROR_NETNAME_DELETED} + + def wait(object_list, timeout=None): + ''' + Wait till an object in object_list is ready/readable. + + Returns list of those objects in object_list which are ready/readable. + ''' + if timeout is None: + timeout = INFINITE + elif timeout < 0: + timeout = 0 + else: + timeout = int(timeout * 1000 + 0.5) + + object_list = list(object_list) + waithandle_to_obj = {} + ov_list = [] + ready_objects = set() + ready_handles = set() + + try: + for o in object_list: + try: + fileno = getattr(o, 'fileno') + except AttributeError: + waithandle_to_obj[o.__index__()] = o + else: + # start an overlapped read of length zero + try: + ov, err = win32.ReadFile(fileno(), 0, True) + except OSError as e: + err = e.winerror + if err not in _ready_errors: + raise + if err == win32.ERROR_IO_PENDING: + ov_list.append(ov) + waithandle_to_obj[ov.event] = o + else: + # If o.fileno() is an overlapped pipe handle and + # err == 0 then there is a zero length message + # in the pipe, but it HAS NOT been consumed. + ready_objects.add(o) + timeout = 0 + + ready_handles = _exhaustive_wait(waithandle_to_obj.keys(), timeout) + finally: + # request that overlapped reads stop + for ov in ov_list: + ov.cancel() + + # wait for all overlapped reads to stop + for ov in ov_list: + try: + _, err = ov.GetOverlappedResult(True) + except OSError as e: + err = e.winerror + if err not in _ready_errors: + raise + if err != win32.ERROR_OPERATION_ABORTED: + o = waithandle_to_obj[ov.event] + ready_objects.add(o) + if err == 0: + # If o.fileno() is an overlapped pipe handle then + # a zero length message HAS been consumed. + if hasattr(o, '_got_empty_message'): + o._got_empty_message = True + + ready_objects.update(waithandle_to_obj[h] for h in ready_handles) + return [o for o in object_list if o in ready_objects] + +else: + + def wait(object_list, timeout=None): + ''' + Wait till an object in object_list is ready/readable. + + Returns list of those objects in object_list which are ready/readable. + ''' + if timeout is not None: + if timeout <= 0: + return select.select(object_list, [], [], 0)[0] + else: + deadline = time.time() + timeout + while True: + try: + return select.select(object_list, [], [], timeout)[0] + except OSError as e: + if e.errno != errno.EINTR: + raise + if timeout is not None: + timeout = deadline - time.time() + # Late import because of circular import from multiprocessing.forking import duplicate, close diff --git a/Lib/multiprocessing/queues.py b/Lib/multiprocessing/queues.py index c4f9cda499..262fd85733 100644 --- a/Lib/multiprocessing/queues.py +++ b/Lib/multiprocessing/queues.py @@ -44,7 +44,7 @@ import errno from queue import Empty, Full import _multiprocessing -from multiprocessing.connection import Pipe, SentinelReady +from multiprocessing.connection import Pipe from multiprocessing.synchronize import Lock, BoundedSemaphore, Semaphore, Condition from multiprocessing.util import debug, info, Finalize, register_after_fork from multiprocessing.forking import assert_spawning @@ -360,6 +360,7 @@ class SimpleQueue(object): def __init__(self): self._reader, self._writer = Pipe(duplex=False) self._rlock = Lock() + self._poll = self._reader.poll if sys.platform == 'win32': self._wlock = None else: @@ -367,7 +368,7 @@ class SimpleQueue(object): self._make_methods() def empty(self): - return not self._reader.poll() + return not self._poll() def __getstate__(self): assert_spawning(self) @@ -380,10 +381,10 @@ class SimpleQueue(object): def _make_methods(self): recv = self._reader.recv racquire, rrelease = self._rlock.acquire, self._rlock.release - def get(*, sentinels=None): + def get(): racquire() try: - return recv(sentinels) + return recv() finally: rrelease() self.get = get -- cgit v1.2.1 From aca5a53034dfc62c1e55bb4448a1aae9bc3004a9 Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Sat, 17 Mar 2012 00:23:04 +0100 Subject: Issue #14335: multiprocessing's custom Pickler subclass now inherits from the C-accelerated implementation. Patch by sbt. --- Lib/multiprocessing/forking.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) (limited to 'Lib/multiprocessing') diff --git a/Lib/multiprocessing/forking.py b/Lib/multiprocessing/forking.py index b7de5679b1..020508a118 100644 --- a/Lib/multiprocessing/forking.py +++ b/Lib/multiprocessing/forking.py @@ -55,18 +55,18 @@ def assert_spawning(self): # Try making some callable types picklable # -from pickle import _Pickler as Pickler +from pickle import Pickler +from copyreg import dispatch_table + class ForkingPickler(Pickler): - dispatch = Pickler.dispatch.copy() + _extra_reducers = {} + def __init__(self, *args): + Pickler.__init__(self, *args) + self.dispatch_table = dispatch_table.copy() + self.dispatch_table.update(self._extra_reducers) @classmethod def register(cls, type, reduce): - def dispatcher(self, obj): - rv = reduce(obj) - if isinstance(rv, str): - self.save_global(obj, rv) - else: - self.save_reduce(obj=obj, *rv) - cls.dispatch[type] = dispatcher + cls._extra_reducers[type] = reduce def _reduce_method(m): if m.__self__ is None: -- cgit v1.2.1 From a60a6ec82621a711f2c8d93e3ae2b0f99f932f03 Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Sat, 7 Apr 2012 22:38:52 +0200 Subject: Issue #14522: Avoid duplicating socket handles in multiprocessing.connection. Patch by sbt. --- Lib/multiprocessing/connection.py | 15 +++------------ 1 file changed, 3 insertions(+), 12 deletions(-) (limited to 'Lib/multiprocessing') diff --git a/Lib/multiprocessing/connection.py b/Lib/multiprocessing/connection.py index 4e4544314e..90c1ea7abf 100644 --- a/Lib/multiprocessing/connection.py +++ b/Lib/multiprocessing/connection.py @@ -591,10 +591,7 @@ class SocketListener(object): def accept(self): s, self._last_accepted = self._socket.accept() - fd = duplicate(s.fileno()) - conn = Connection(fd) - s.close() - return conn + return Connection(s.detach()) def close(self): self._socket.close() @@ -609,9 +606,7 @@ def SocketClient(address): family = address_type(address) with socket.socket( getattr(socket, family) ) as s: s.connect(address) - fd = duplicate(s.fileno()) - conn = Connection(fd) - return conn + return Connection(s.detach()) # # Definitions for connections based on named pipes @@ -665,7 +660,7 @@ if sys.platform == 'win32': def _finalize_pipe_listener(queue, address): sub_debug('closing listener with address=%r', address) for handle in queue: - close(handle) + win32.CloseHandle(handle) def PipeClient(address): ''' @@ -885,7 +880,3 @@ else: raise if timeout is not None: timeout = deadline - time.time() - - -# Late import because of circular import -from multiprocessing.forking import duplicate, close -- cgit v1.2.1 From cba36f5380a6f1ca4aa9f07e5c2e3544b9261cdd Mon Sep 17 00:00:00 2001 From: Charles-Fran?ois Natali Date: Tue, 17 Apr 2012 18:45:57 +0200 Subject: Issue #14087: multiprocessing: add Condition.wait_for(). Patch by sbt. --- Lib/multiprocessing/managers.py | 19 +++++++++++++++++++ Lib/multiprocessing/synchronize.py | 19 +++++++++++++++++++ 2 files changed, 38 insertions(+) (limited to 'Lib/multiprocessing') diff --git a/Lib/multiprocessing/managers.py b/Lib/multiprocessing/managers.py index eaf912c124..d1c9d4578e 100644 --- a/Lib/multiprocessing/managers.py +++ b/Lib/multiprocessing/managers.py @@ -48,6 +48,7 @@ from traceback import format_exc from multiprocessing import Process, current_process, active_children, Pool, util, connection from multiprocessing.process import AuthenticationString from multiprocessing.forking import exit, Popen, ForkingPickler +from time import time as _time # # Register some things for pickling @@ -996,6 +997,24 @@ class ConditionProxy(AcquirerProxy): return self._callmethod('notify') def notify_all(self): return self._callmethod('notify_all') + def wait_for(self, predicate, timeout=None): + result = predicate() + if result: + return result + if timeout is not None: + endtime = _time() + timeout + else: + endtime = None + waittime = None + while not result: + if endtime is not None: + waittime = endtime - _time() + if waittime <= 0: + break + self.wait(waittime) + result = predicate() + return result + class EventProxy(BaseProxy): _exposed_ = ('is_set', 'set', 'clear', 'wait') diff --git a/Lib/multiprocessing/synchronize.py b/Lib/multiprocessing/synchronize.py index e35bbff185..532ac5c1dd 100644 --- a/Lib/multiprocessing/synchronize.py +++ b/Lib/multiprocessing/synchronize.py @@ -43,6 +43,7 @@ import _multiprocessing from multiprocessing.process import current_process from multiprocessing.util import register_after_fork, debug from multiprocessing.forking import assert_spawning, Popen +from time import time as _time # Try to import the mp.synchronize module cleanly, if it fails # raise ImportError for platforms lacking a working sem_open implementation. @@ -290,6 +291,24 @@ class Condition(object): while self._wait_semaphore.acquire(False): pass + def wait_for(self, predicate, timeout=None): + result = predicate() + if result: + return result + if timeout is not None: + endtime = _time() + timeout + else: + endtime = None + waittime = None + while not result: + if endtime is not None: + waittime = endtime - _time() + if waittime <= 0: + break + self.wait(waittime) + result = predicate() + return result + # # Event # -- cgit v1.2.1 From 03acff1d9238fc0c2806393d79cc8b43368cd4f1 Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Wed, 18 Apr 2012 20:51:15 +0200 Subject: Issue #11750: The Windows API functions scattered in the _subprocess and _multiprocessing.win32 modules now live in a single module "_winapi". Patch by sbt. --- Lib/multiprocessing/connection.py | 124 +++++++++++++++++++------------------- Lib/multiprocessing/forking.py | 31 +++++----- Lib/multiprocessing/heap.py | 6 +- Lib/multiprocessing/reduction.py | 6 +- 4 files changed, 83 insertions(+), 84 deletions(-) (limited to 'Lib/multiprocessing') diff --git a/Lib/multiprocessing/connection.py b/Lib/multiprocessing/connection.py index 90c1ea7abf..3a61e5ee42 100644 --- a/Lib/multiprocessing/connection.py +++ b/Lib/multiprocessing/connection.py @@ -51,12 +51,12 @@ from multiprocessing import current_process, AuthenticationError, BufferTooShort from multiprocessing.util import ( get_temp_dir, Finalize, sub_debug, debug, _eintr_retry) try: - from _multiprocessing import win32 - from _subprocess import WAIT_OBJECT_0, WAIT_TIMEOUT, INFINITE + import _winapi + from _winapi import WAIT_OBJECT_0, WAIT_TIMEOUT, INFINITE except ImportError: if sys.platform == 'win32': raise - win32 = None + _winapi = None # # @@ -282,7 +282,7 @@ class _ConnectionBase: return self._poll(timeout) -if win32: +if _winapi: class PipeConnection(_ConnectionBase): """ @@ -292,14 +292,14 @@ if win32: """ _got_empty_message = False - def _close(self, _CloseHandle=win32.CloseHandle): + def _close(self, _CloseHandle=_winapi.CloseHandle): _CloseHandle(self._handle) def _send_bytes(self, buf): - ov, err = win32.WriteFile(self._handle, buf, overlapped=True) + ov, err = _winapi.WriteFile(self._handle, buf, overlapped=True) try: - if err == win32.ERROR_IO_PENDING: - waitres = win32.WaitForMultipleObjects( + if err == _winapi.ERROR_IO_PENDING: + waitres = _winapi.WaitForMultipleObjects( [ov.event], False, INFINITE) assert waitres == WAIT_OBJECT_0 except: @@ -317,11 +317,11 @@ if win32: else: bsize = 128 if maxsize is None else min(maxsize, 128) try: - ov, err = win32.ReadFile(self._handle, bsize, - overlapped=True) + ov, err = _winapi.ReadFile(self._handle, bsize, + overlapped=True) try: - if err == win32.ERROR_IO_PENDING: - waitres = win32.WaitForMultipleObjects( + if err == _winapi.ERROR_IO_PENDING: + waitres = _winapi.WaitForMultipleObjects( [ov.event], False, INFINITE) assert waitres == WAIT_OBJECT_0 except: @@ -333,10 +333,10 @@ if win32: f = io.BytesIO() f.write(ov.getbuffer()) return f - elif err == win32.ERROR_MORE_DATA: + elif err == _winapi.ERROR_MORE_DATA: return self._get_more_data(ov, maxsize) except IOError as e: - if e.winerror == win32.ERROR_BROKEN_PIPE: + if e.winerror == _winapi.ERROR_BROKEN_PIPE: raise EOFError else: raise @@ -344,7 +344,7 @@ if win32: def _poll(self, timeout): if (self._got_empty_message or - win32.PeekNamedPipe(self._handle)[0] != 0): + _winapi.PeekNamedPipe(self._handle)[0] != 0): return True if timeout < 0: timeout = None @@ -354,11 +354,11 @@ if win32: buf = ov.getbuffer() f = io.BytesIO() f.write(buf) - left = win32.PeekNamedPipe(self._handle)[1] + left = _winapi.PeekNamedPipe(self._handle)[1] assert left > 0 if maxsize is not None and len(buf) + left > maxsize: self._bad_message_length() - ov, err = win32.ReadFile(self._handle, left, overlapped=True) + ov, err = _winapi.ReadFile(self._handle, left, overlapped=True) rbytes, err = ov.GetOverlappedResult(True) assert err == 0 assert rbytes == left @@ -372,11 +372,11 @@ class Connection(_ConnectionBase): a socket handle (Windows). """ - if win32: - def _close(self, _close=win32.closesocket): + if _winapi: + def _close(self, _close=_multiprocessing.closesocket): _close(self._handle) - _write = win32.send - _read = win32.recv + _write = _multiprocessing.send + _read = _multiprocessing.recv else: def _close(self, _close=os.close): _close(self._handle) @@ -526,30 +526,30 @@ else: ''' address = arbitrary_address('AF_PIPE') if duplex: - openmode = win32.PIPE_ACCESS_DUPLEX - access = win32.GENERIC_READ | win32.GENERIC_WRITE + openmode = _winapi.PIPE_ACCESS_DUPLEX + access = _winapi.GENERIC_READ | _winapi.GENERIC_WRITE obsize, ibsize = BUFSIZE, BUFSIZE else: - openmode = win32.PIPE_ACCESS_INBOUND - access = win32.GENERIC_WRITE + openmode = _winapi.PIPE_ACCESS_INBOUND + access = _winapi.GENERIC_WRITE obsize, ibsize = 0, BUFSIZE - h1 = win32.CreateNamedPipe( - address, openmode | win32.FILE_FLAG_OVERLAPPED | - win32.FILE_FLAG_FIRST_PIPE_INSTANCE, - win32.PIPE_TYPE_MESSAGE | win32.PIPE_READMODE_MESSAGE | - win32.PIPE_WAIT, - 1, obsize, ibsize, win32.NMPWAIT_WAIT_FOREVER, win32.NULL + h1 = _winapi.CreateNamedPipe( + address, openmode | _winapi.FILE_FLAG_OVERLAPPED | + _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE, + _winapi.PIPE_TYPE_MESSAGE | _winapi.PIPE_READMODE_MESSAGE | + _winapi.PIPE_WAIT, + 1, obsize, ibsize, _winapi.NMPWAIT_WAIT_FOREVER, _winapi.NULL ) - h2 = win32.CreateFile( - address, access, 0, win32.NULL, win32.OPEN_EXISTING, - win32.FILE_FLAG_OVERLAPPED, win32.NULL + h2 = _winapi.CreateFile( + address, access, 0, _winapi.NULL, _winapi.OPEN_EXISTING, + _winapi.FILE_FLAG_OVERLAPPED, _winapi.NULL ) - win32.SetNamedPipeHandleState( - h2, win32.PIPE_READMODE_MESSAGE, None, None + _winapi.SetNamedPipeHandleState( + h2, _winapi.PIPE_READMODE_MESSAGE, None, None ) - overlapped = win32.ConnectNamedPipe(h1, overlapped=True) + overlapped = _winapi.ConnectNamedPipe(h1, overlapped=True) _, err = overlapped.GetOverlappedResult(True) assert err == 0 @@ -630,26 +630,26 @@ if sys.platform == 'win32': ) def _new_handle(self, first=False): - flags = win32.PIPE_ACCESS_DUPLEX | win32.FILE_FLAG_OVERLAPPED + flags = _winapi.PIPE_ACCESS_DUPLEX | _winapi.FILE_FLAG_OVERLAPPED if first: - flags |= win32.FILE_FLAG_FIRST_PIPE_INSTANCE - return win32.CreateNamedPipe( + flags |= _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE + return _winapi.CreateNamedPipe( self._address, flags, - win32.PIPE_TYPE_MESSAGE | win32.PIPE_READMODE_MESSAGE | - win32.PIPE_WAIT, - win32.PIPE_UNLIMITED_INSTANCES, BUFSIZE, BUFSIZE, - win32.NMPWAIT_WAIT_FOREVER, win32.NULL + _winapi.PIPE_TYPE_MESSAGE | _winapi.PIPE_READMODE_MESSAGE | + _winapi.PIPE_WAIT, + _winapi.PIPE_UNLIMITED_INSTANCES, BUFSIZE, BUFSIZE, + _winapi.NMPWAIT_WAIT_FOREVER, _winapi.NULL ) def accept(self): self._handle_queue.append(self._new_handle()) handle = self._handle_queue.pop(0) - ov = win32.ConnectNamedPipe(handle, overlapped=True) + ov = _winapi.ConnectNamedPipe(handle, overlapped=True) try: - res = win32.WaitForMultipleObjects([ov.event], False, INFINITE) + res = _winapi.WaitForMultipleObjects([ov.event], False, INFINITE) except: ov.cancel() - win32.CloseHandle(handle) + _winapi.CloseHandle(handle) raise finally: _, err = ov.GetOverlappedResult(True) @@ -660,7 +660,7 @@ if sys.platform == 'win32': def _finalize_pipe_listener(queue, address): sub_debug('closing listener with address=%r', address) for handle in queue: - win32.CloseHandle(handle) + _winapi.CloseHandle(handle) def PipeClient(address): ''' @@ -669,23 +669,23 @@ if sys.platform == 'win32': t = _init_timeout() while 1: try: - win32.WaitNamedPipe(address, 1000) - h = win32.CreateFile( - address, win32.GENERIC_READ | win32.GENERIC_WRITE, - 0, win32.NULL, win32.OPEN_EXISTING, - win32.FILE_FLAG_OVERLAPPED, win32.NULL + _winapi.WaitNamedPipe(address, 1000) + h = _winapi.CreateFile( + address, _winapi.GENERIC_READ | _winapi.GENERIC_WRITE, + 0, _winapi.NULL, _winapi.OPEN_EXISTING, + _winapi.FILE_FLAG_OVERLAPPED, _winapi.NULL ) except WindowsError as e: - if e.winerror not in (win32.ERROR_SEM_TIMEOUT, - win32.ERROR_PIPE_BUSY) or _check_timeout(t): + if e.winerror not in (_winapi.ERROR_SEM_TIMEOUT, + _winapi.ERROR_PIPE_BUSY) or _check_timeout(t): raise else: break else: raise - win32.SetNamedPipeHandleState( - h, win32.PIPE_READMODE_MESSAGE, None, None + _winapi.SetNamedPipeHandleState( + h, _winapi.PIPE_READMODE_MESSAGE, None, None ) return PipeConnection(h) @@ -774,7 +774,7 @@ if sys.platform == 'win32': L = list(handles) ready = [] while L: - res = win32.WaitForMultipleObjects(L, False, timeout) + res = _winapi.WaitForMultipleObjects(L, False, timeout) if res == WAIT_TIMEOUT: break elif WAIT_OBJECT_0 <= res < WAIT_OBJECT_0 + len(L): @@ -788,7 +788,7 @@ if sys.platform == 'win32': timeout = 0 return ready - _ready_errors = {win32.ERROR_BROKEN_PIPE, win32.ERROR_NETNAME_DELETED} + _ready_errors = {_winapi.ERROR_BROKEN_PIPE, _winapi.ERROR_NETNAME_DELETED} def wait(object_list, timeout=None): ''' @@ -818,12 +818,12 @@ if sys.platform == 'win32': else: # start an overlapped read of length zero try: - ov, err = win32.ReadFile(fileno(), 0, True) + ov, err = _winapi.ReadFile(fileno(), 0, True) except OSError as e: err = e.winerror if err not in _ready_errors: raise - if err == win32.ERROR_IO_PENDING: + if err == _winapi.ERROR_IO_PENDING: ov_list.append(ov) waithandle_to_obj[ov.event] = o else: @@ -847,7 +847,7 @@ if sys.platform == 'win32': err = e.winerror if err not in _ready_errors: raise - if err != win32.ERROR_OPERATION_ABORTED: + if err != _winapi.ERROR_OPERATION_ABORTED: o = waithandle_to_obj[ov.event] ready_objects.add(o) if err == 0: diff --git a/Lib/multiprocessing/forking.py b/Lib/multiprocessing/forking.py index 020508a118..0cbb741cd2 100644 --- a/Lib/multiprocessing/forking.py +++ b/Lib/multiprocessing/forking.py @@ -181,10 +181,9 @@ if sys.platform != 'win32': else: import _thread import msvcrt - import _subprocess + import _winapi from pickle import load, HIGHEST_PROTOCOL - from _multiprocessing import win32 def dump(obj, file, protocol=None): ForkingPickler(file, protocol).dump(obj) @@ -197,8 +196,8 @@ else: WINEXE = (sys.platform == 'win32' and getattr(sys, 'frozen', False)) WINSERVICE = sys.executable.lower().endswith("pythonservice.exe") - exit = win32.ExitProcess - close = win32.CloseHandle + exit = _winapi.ExitProcess + close = _winapi.CloseHandle # # _python_exe is the assumed path to the python executable. @@ -220,11 +219,11 @@ else: def duplicate(handle, target_process=None, inheritable=False): if target_process is None: - target_process = _subprocess.GetCurrentProcess() - return _subprocess.DuplicateHandle( - _subprocess.GetCurrentProcess(), handle, target_process, - 0, inheritable, _subprocess.DUPLICATE_SAME_ACCESS - ).Detach() + target_process = _winapi.GetCurrentProcess() + return _winapi.DuplicateHandle( + _winapi.GetCurrentProcess(), handle, target_process, + 0, inheritable, _winapi.DUPLICATE_SAME_ACCESS + ) # # We define a Popen class similar to the one from subprocess, but @@ -248,10 +247,10 @@ else: # start process cmd = get_command_line() + [rhandle] cmd = ' '.join('"%s"' % x for x in cmd) - hp, ht, pid, tid = _subprocess.CreateProcess( + hp, ht, pid, tid = _winapi.CreateProcess( _python_exe, cmd, None, None, 1, 0, None, None, None ) - ht.Close() + _winapi.CloseHandle(ht) close(rhandle) # set attributes of self @@ -282,13 +281,13 @@ else: def wait(self, timeout=None): if self.returncode is None: if timeout is None: - msecs = _subprocess.INFINITE + msecs = _winapi.INFINITE else: msecs = max(0, int(timeout * 1000 + 0.5)) - res = _subprocess.WaitForSingleObject(int(self._handle), msecs) - if res == _subprocess.WAIT_OBJECT_0: - code = _subprocess.GetExitCodeProcess(self._handle) + res = _winapi.WaitForSingleObject(int(self._handle), msecs) + if res == _winapi.WAIT_OBJECT_0: + code = _winapi.GetExitCodeProcess(self._handle) if code == TERMINATE: code = -signal.SIGTERM self.returncode = code @@ -301,7 +300,7 @@ else: def terminate(self): if self.returncode is None: try: - _subprocess.TerminateProcess(int(self._handle), TERMINATE) + _winapi.TerminateProcess(int(self._handle), TERMINATE) except WindowsError: if self.wait(timeout=0.1) is None: raise diff --git a/Lib/multiprocessing/heap.py b/Lib/multiprocessing/heap.py index 7366bd2b8b..7e19434b68 100644 --- a/Lib/multiprocessing/heap.py +++ b/Lib/multiprocessing/heap.py @@ -51,7 +51,7 @@ __all__ = ['BufferWrapper'] if sys.platform == 'win32': - from _multiprocessing import win32 + import _winapi class Arena(object): @@ -61,7 +61,7 @@ if sys.platform == 'win32': self.size = size self.name = 'pym-%d-%d' % (os.getpid(), next(Arena._counter)) self.buffer = mmap.mmap(-1, self.size, tagname=self.name) - assert win32.GetLastError() == 0, 'tagname already in use' + assert _winapi.GetLastError() == 0, 'tagname already in use' self._state = (self.size, self.name) def __getstate__(self): @@ -71,7 +71,7 @@ if sys.platform == 'win32': def __setstate__(self, state): self.size, self.name = self._state = state self.buffer = mmap.mmap(-1, self.size, tagname=self.name) - assert win32.GetLastError() == win32.ERROR_ALREADY_EXISTS + assert _winapi.GetLastError() == _winapi.ERROR_ALREADY_EXISTS else: diff --git a/Lib/multiprocessing/reduction.py b/Lib/multiprocessing/reduction.py index dda4a4120b..c80de5948a 100644 --- a/Lib/multiprocessing/reduction.py +++ b/Lib/multiprocessing/reduction.py @@ -60,11 +60,11 @@ if not(sys.platform == 'win32' or (hasattr(socket, 'CMSG_LEN') and # if sys.platform == 'win32': - from _multiprocessing import win32 + import _winapi def send_handle(conn, handle, destination_pid): - process_handle = win32.OpenProcess( - win32.PROCESS_ALL_ACCESS, False, destination_pid + process_handle = _winapi.OpenProcess( + _winapi.PROCESS_ALL_ACCESS, False, destination_pid ) try: new_handle = duplicate(handle, process_handle) -- cgit v1.2.1 From c990773f14c91d90c61770817f4c7a5a8d31a17f Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Tue, 24 Apr 2012 22:56:57 +0200 Subject: Issue #4892: multiprocessing Connections can now be transferred over multiprocessing Connections. Patch by Richard Oudkerk (sbt). --- Lib/multiprocessing/__init__.py | 4 +- Lib/multiprocessing/connection.py | 24 ++- Lib/multiprocessing/forking.py | 19 --- Lib/multiprocessing/reduction.py | 315 +++++++++++++++++++++----------------- 4 files changed, 203 insertions(+), 159 deletions(-) (limited to 'Lib/multiprocessing') diff --git a/Lib/multiprocessing/__init__.py b/Lib/multiprocessing/__init__.py index e01244015a..28380e5579 100644 --- a/Lib/multiprocessing/__init__.py +++ b/Lib/multiprocessing/__init__.py @@ -161,7 +161,9 @@ def allow_connection_pickling(): ''' Install support for sending connections and sockets between processes ''' - from multiprocessing import reduction + # This is undocumented. In previous versions of multiprocessing + # its only effect was to make socket objects inheritable on Windows. + import multiprocessing.connection # # Definitions depending on native semaphores diff --git a/Lib/multiprocessing/connection.py b/Lib/multiprocessing/connection.py index 3a61e5ee42..64d71bc361 100644 --- a/Lib/multiprocessing/connection.py +++ b/Lib/multiprocessing/connection.py @@ -50,6 +50,7 @@ import _multiprocessing from multiprocessing import current_process, AuthenticationError, BufferTooShort from multiprocessing.util import ( get_temp_dir, Finalize, sub_debug, debug, _eintr_retry) +from multiprocessing.forking import ForkingPickler try: import _winapi from _winapi import WAIT_OBJECT_0, WAIT_TIMEOUT, INFINITE @@ -227,8 +228,9 @@ class _ConnectionBase: """Send a (picklable) object""" self._check_closed() self._check_writable() - buf = pickle.dumps(obj, protocol=pickle.HIGHEST_PROTOCOL) - self._send_bytes(memoryview(buf)) + buf = io.BytesIO() + ForkingPickler(buf, pickle.HIGHEST_PROTOCOL).dump(obj) + self._send_bytes(buf.getbuffer()) def recv_bytes(self, maxlength=None): """ @@ -880,3 +882,21 @@ else: raise if timeout is not None: timeout = deadline - time.time() + +# +# Make connection and socket objects sharable if possible +# + +if sys.platform == 'win32': + from . import reduction + ForkingPickler.register(socket.socket, reduction.reduce_socket) + ForkingPickler.register(Connection, reduction.reduce_connection) + ForkingPickler.register(PipeConnection, reduction.reduce_pipe_connection) +else: + try: + from . import reduction + except ImportError: + pass + else: + ForkingPickler.register(socket.socket, reduction.reduce_socket) + ForkingPickler.register(Connection, reduction.reduce_connection) diff --git a/Lib/multiprocessing/forking.py b/Lib/multiprocessing/forking.py index 0cbb741cd2..15fdb0e08b 100644 --- a/Lib/multiprocessing/forking.py +++ b/Lib/multiprocessing/forking.py @@ -407,25 +407,6 @@ else: return d - # - # Make (Pipe)Connection picklable - # - - # Late import because of circular import - from .connection import Connection, PipeConnection - - def reduce_connection(conn): - if not Popen.thread_is_spawning(): - raise RuntimeError( - 'By default %s objects can only be shared between processes\n' - 'using inheritance' % type(conn).__name__ - ) - return type(conn), (Popen.duplicate_for_child(conn.fileno()), - conn.readable, conn.writable) - - ForkingPickler.register(Connection, reduce_connection) - ForkingPickler.register(PipeConnection, reduce_connection) - # # Prepare current process # diff --git a/Lib/multiprocessing/reduction.py b/Lib/multiprocessing/reduction.py index c80de5948a..ce38fe367e 100644 --- a/Lib/multiprocessing/reduction.py +++ b/Lib/multiprocessing/reduction.py @@ -33,7 +33,7 @@ # SUCH DAMAGE. # -__all__ = [] +__all__ = ['reduce_socket', 'reduce_connection', 'send_handle', 'recv_handle'] import os import sys @@ -42,9 +42,8 @@ import threading import struct from multiprocessing import current_process -from multiprocessing.forking import Popen, duplicate, close, ForkingPickler from multiprocessing.util import register_after_fork, debug, sub_debug -from multiprocessing.connection import Client, Listener, Connection +from multiprocessing.util import is_exiting, sub_warning # @@ -60,22 +59,91 @@ if not(sys.platform == 'win32' or (hasattr(socket, 'CMSG_LEN') and # if sys.platform == 'win32': + # Windows + __all__ += ['reduce_pipe_connection'] import _winapi def send_handle(conn, handle, destination_pid): - process_handle = _winapi.OpenProcess( - _winapi.PROCESS_ALL_ACCESS, False, destination_pid - ) - try: - new_handle = duplicate(handle, process_handle) - conn.send(new_handle) - finally: - close(process_handle) + dh = DupHandle(handle, _winapi.DUPLICATE_SAME_ACCESS, destination_pid) + conn.send(dh) def recv_handle(conn): - return conn.recv() + return conn.recv().detach() + + class DupHandle(object): + def __init__(self, handle, access, pid=None): + # duplicate handle for process with given pid + if pid is None: + pid = os.getpid() + proc = _winapi.OpenProcess(_winapi.PROCESS_DUP_HANDLE, False, pid) + try: + self._handle = _winapi.DuplicateHandle( + _winapi.GetCurrentProcess(), + handle, proc, access, False, 0) + finally: + _winapi.CloseHandle(proc) + self._access = access + self._pid = pid + + def detach(self): + # retrieve handle from process which currently owns it + if self._pid == os.getpid(): + return self._handle + proc = _winapi.OpenProcess(_winapi.PROCESS_DUP_HANDLE, False, + self._pid) + try: + return _winapi.DuplicateHandle( + proc, self._handle, _winapi.GetCurrentProcess(), + self._access, False, _winapi.DUPLICATE_CLOSE_SOURCE) + finally: + _winapi.CloseHandle(proc) + + class DupSocket(object): + def __init__(self, sock): + new_sock = sock.dup() + def send(conn, pid): + share = new_sock.share(pid) + conn.send_bytes(share) + self._id = resource_sharer.register(send, new_sock.close) + + def detach(self): + conn = resource_sharer.get_connection(self._id) + try: + share = conn.recv_bytes() + return socket.fromshare(share) + finally: + conn.close() + + def reduce_socket(s): + return rebuild_socket, (DupSocket(s),) + + def rebuild_socket(ds): + return ds.detach() + + def reduce_connection(conn): + handle = conn.fileno() + with socket.fromfd(handle, socket.AF_INET, socket.SOCK_STREAM) as s: + ds = DupSocket(s) + return rebuild_connection, (ds, conn.readable, conn.writable) + + def rebuild_connection(ds, readable, writable): + from .connection import Connection + sock = ds.detach() + return Connection(sock.detach(), readable, writable) + + def reduce_pipe_connection(conn): + access = ((_winapi.FILE_GENERIC_READ if conn.readable else 0) | + (_winapi.FILE_GENERIC_WRITE if conn.writable else 0)) + dh = DupHandle(conn.fileno(), access) + return rebuild_pipe_connection, (dh, conn.readable, conn.writable) + + def rebuild_pipe_connection(dh, readable, writable): + from .connection import PipeConnection + handle = dh.detach() + return PipeConnection(handle, readable, writable) else: + # Unix def send_handle(conn, handle, destination_pid): with socket.fromfd(conn.fileno(), socket.AF_UNIX, socket.SOCK_STREAM) as s: s.sendmsg([b'x'], [(socket.SOL_SOCKET, socket.SCM_RIGHTS, @@ -94,136 +162,109 @@ else: pass raise RuntimeError('Invalid data received') + class DupFd(object): + def __init__(self, fd): + new_fd = os.dup(fd) + def send(conn, pid): + send_handle(conn, new_fd, pid) + def close(): + os.close(new_fd) + self._id = resource_sharer.register(send, close) + + def detach(self): + conn = resource_sharer.get_connection(self._id) + try: + return recv_handle(conn) + finally: + conn.close() -# -# Support for a per-process server thread which caches pickled handles -# - -_cache = set() - -def _reset(obj): - global _lock, _listener, _cache - for h in _cache: - close(h) - _cache.clear() - _lock = threading.Lock() - _listener = None - -_reset(None) -register_after_fork(_reset, _reset) - -def _get_listener(): - global _listener - - if _listener is None: - _lock.acquire() - try: - if _listener is None: - debug('starting listener and thread for sending handles') - _listener = Listener(authkey=current_process().authkey) - t = threading.Thread(target=_serve) - t.daemon = True - t.start() - finally: - _lock.release() - - return _listener - -def _serve(): - from .util import is_exiting, sub_warning - - while 1: - try: - conn = _listener.accept() - handle_wanted, destination_pid = conn.recv() - _cache.remove(handle_wanted) - send_handle(conn, handle_wanted, destination_pid) - close(handle_wanted) - conn.close() - except: - if not is_exiting(): - import traceback - sub_warning( - 'thread for sharing handles raised exception :\n' + - '-'*79 + '\n' + traceback.format_exc() + '-'*79 - ) - -# -# Functions to be used for pickling/unpickling objects with handles -# - -def reduce_handle(handle): - if Popen.thread_is_spawning(): - return (None, Popen.duplicate_for_child(handle), True) - dup_handle = duplicate(handle) - _cache.add(dup_handle) - sub_debug('reducing handle %d', handle) - return (_get_listener().address, dup_handle, False) - -def rebuild_handle(pickled_data): - address, handle, inherited = pickled_data - if inherited: - return handle - sub_debug('rebuilding handle %d', handle) - conn = Client(address, authkey=current_process().authkey) - conn.send((handle, os.getpid())) - new_handle = recv_handle(conn) - conn.close() - return new_handle - -# -# Register `Connection` with `ForkingPickler` -# - -def reduce_connection(conn): - rh = reduce_handle(conn.fileno()) - return rebuild_connection, (rh, conn.readable, conn.writable) - -def rebuild_connection(reduced_handle, readable, writable): - handle = rebuild_handle(reduced_handle) - return Connection( - handle, readable=readable, writable=writable - ) - -ForkingPickler.register(Connection, reduce_connection) - -# -# Register `socket.socket` with `ForkingPickler` -# - -def fromfd(fd, family, type_, proto=0): - s = socket.fromfd(fd, family, type_, proto) - if s.__class__ is not socket.socket: - s = socket.socket(_sock=s) - return s + def reduce_socket(s): + df = DupFd(s.fileno()) + return rebuild_socket, (df, s.family, s.type, s.proto) -def reduce_socket(s): - reduced_handle = reduce_handle(s.fileno()) - return rebuild_socket, (reduced_handle, s.family, s.type, s.proto) + def rebuild_socket(df, family, type, proto): + fd = df.detach() + s = socket.fromfd(fd, family, type, proto) + os.close(fd) + return s -def rebuild_socket(reduced_handle, family, type_, proto): - fd = rebuild_handle(reduced_handle) - _sock = fromfd(fd, family, type_, proto) - close(fd) - return _sock + def reduce_connection(conn): + df = DupFd(conn.fileno()) + return rebuild_connection, (df, conn.readable, conn.writable) -ForkingPickler.register(socket.socket, reduce_socket) + def rebuild_connection(df, readable, writable): + from .connection import Connection + fd = df.detach() + return Connection(fd, readable, writable) # -# Register `_multiprocessing.PipeConnection` with `ForkingPickler` +# Server which shares registered resources with clients # -if sys.platform == 'win32': - from multiprocessing.connection import PipeConnection - - def reduce_pipe_connection(conn): - rh = reduce_handle(conn.fileno()) - return rebuild_pipe_connection, (rh, conn.readable, conn.writable) - - def rebuild_pipe_connection(reduced_handle, readable, writable): - handle = rebuild_handle(reduced_handle) - return PipeConnection( - handle, readable=readable, writable=writable - ) - - ForkingPickler.register(PipeConnection, reduce_pipe_connection) +class ResourceSharer(object): + def __init__(self): + self._key = 0 + self._cache = {} + self._old_locks = [] + self._lock = threading.Lock() + self._listener = None + self._address = None + register_after_fork(self, ResourceSharer._afterfork) + + def register(self, send, close): + with self._lock: + if self._address is None: + self._start() + self._key += 1 + self._cache[self._key] = (send, close) + return (self._address, self._key) + + @staticmethod + def get_connection(ident): + from .connection import Client + address, key = ident + c = Client(address, authkey=current_process().authkey) + c.send((key, os.getpid())) + return c + + def _afterfork(self): + for key, (send, close) in self._cache.items(): + close() + self._cache.clear() + # If self._lock was locked at the time of the fork, it may be broken + # -- see issue 6721. Replace it without letting it be gc'ed. + self._old_locks.append(self._lock) + self._lock = threading.Lock() + if self._listener is not None: + self._listener.close() + self._listener = None + self._address = None + + def _start(self): + from .connection import Listener + assert self._listener is None + debug('starting listener and thread for sending handles') + self._listener = Listener(authkey=current_process().authkey) + self._address = self._listener.address + t = threading.Thread(target=self._serve) + t.daemon = True + t.start() + + def _serve(self): + while 1: + try: + conn = self._listener.accept() + key, destination_pid = conn.recv() + send, close = self._cache.pop(key) + send(conn, destination_pid) + close() + conn.close() + except: + if not is_exiting(): + import traceback + sub_warning( + 'thread for sharing handles raised exception :\n' + + '-'*79 + '\n' + traceback.format_exc() + '-'*79 + ) + +resource_sharer = ResourceSharer() -- cgit v1.2.1 From 4cfa10600b09a04b4773cbf75cc823278e19ada2 Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Fri, 27 Apr 2012 23:51:03 +0200 Subject: Issue #14666: stop multiprocessing's resource-sharing thread after the tests are done. Also, block delivery of signals to that thread. Patch by Richard Oudkerk. This will hopefully fix sporadic freezes on the FreeBSD 9.0 buildbot. --- Lib/multiprocessing/reduction.py | 29 ++++++++++++++++++++++++++++- 1 file changed, 28 insertions(+), 1 deletion(-) (limited to 'Lib/multiprocessing') diff --git a/Lib/multiprocessing/reduction.py b/Lib/multiprocessing/reduction.py index ce38fe367e..cef445b4d8 100644 --- a/Lib/multiprocessing/reduction.py +++ b/Lib/multiprocessing/reduction.py @@ -40,6 +40,7 @@ import sys import socket import threading import struct +import signal from multiprocessing import current_process from multiprocessing.util import register_after_fork, debug, sub_debug @@ -209,6 +210,7 @@ class ResourceSharer(object): self._lock = threading.Lock() self._listener = None self._address = None + self._thread = None register_after_fork(self, ResourceSharer._afterfork) def register(self, send, close): @@ -227,6 +229,24 @@ class ResourceSharer(object): c.send((key, os.getpid())) return c + def stop(self, timeout=None): + from .connection import Client + with self._lock: + if self._address is not None: + c = Client(self._address, authkey=current_process().authkey) + c.send(None) + c.close() + self._thread.join(timeout) + if self._thread.is_alive(): + sub_warn('ResourceSharer thread did not stop when asked') + self._listener.close() + self._thread = None + self._address = None + self._listener = None + for key, (send, close) in self._cache.items(): + close() + self._cache.clear() + def _afterfork(self): for key, (send, close) in self._cache.items(): close() @@ -239,6 +259,7 @@ class ResourceSharer(object): self._listener.close() self._listener = None self._address = None + self._thread = None def _start(self): from .connection import Listener @@ -249,12 +270,18 @@ class ResourceSharer(object): t = threading.Thread(target=self._serve) t.daemon = True t.start() + self._thread = t def _serve(self): + if hasattr(signal, 'pthread_sigmask'): + signal.pthread_sigmask(signal.SIG_BLOCK, range(1, signal.NSIG)) while 1: try: conn = self._listener.accept() - key, destination_pid = conn.recv() + msg = conn.recv() + if msg is None: + break + key, destination_pid = msg send, close = self._cache.pop(key) send(conn, destination_pid) close() -- cgit v1.2.1 From 4ea15375a56d089ca1eb0f4ac8a7e3c3c6c7d6b8 Mon Sep 17 00:00:00 2001 From: Richard Oudkerk Date: Mon, 30 Apr 2012 12:13:55 +0100 Subject: Mark multiprocessing files with "Licensed to PSF under a Contributor Agreement" instead of BSD licence. --- Lib/multiprocessing/__init__.py | 27 +-------------------------- Lib/multiprocessing/connection.py | 27 +-------------------------- Lib/multiprocessing/forking.py | 27 +-------------------------- Lib/multiprocessing/heap.py | 27 +-------------------------- Lib/multiprocessing/managers.py | 27 +-------------------------- Lib/multiprocessing/pool.py | 27 +-------------------------- Lib/multiprocessing/process.py | 27 +-------------------------- Lib/multiprocessing/queues.py | 27 +-------------------------- Lib/multiprocessing/reduction.py | 27 +-------------------------- Lib/multiprocessing/sharedctypes.py | 27 +-------------------------- Lib/multiprocessing/synchronize.py | 27 +-------------------------- Lib/multiprocessing/util.py | 27 +-------------------------- 12 files changed, 12 insertions(+), 312 deletions(-) (limited to 'Lib/multiprocessing') diff --git a/Lib/multiprocessing/__init__.py b/Lib/multiprocessing/__init__.py index 28380e5579..0bf3d9c657 100644 --- a/Lib/multiprocessing/__init__.py +++ b/Lib/multiprocessing/__init__.py @@ -13,32 +13,7 @@ # # # Copyright (c) 2006-2008, R Oudkerk -# All rights reserved. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions -# are met: -# -# 1. Redistributions of source code must retain the above copyright -# notice, this list of conditions and the following disclaimer. -# 2. Redistributions in binary form must reproduce the above copyright -# notice, this list of conditions and the following disclaimer in the -# documentation and/or other materials provided with the distribution. -# 3. Neither the name of author nor the names of any contributors may be -# used to endorse or promote products derived from this software -# without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND -# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE -# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE -# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE -# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL -# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS -# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) -# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT -# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY -# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF -# SUCH DAMAGE. +# Licensed to PSF under a Contributor Agreement. # __version__ = '0.70a1' diff --git a/Lib/multiprocessing/connection.py b/Lib/multiprocessing/connection.py index 64d71bc361..046a7fc600 100644 --- a/Lib/multiprocessing/connection.py +++ b/Lib/multiprocessing/connection.py @@ -4,32 +4,7 @@ # multiprocessing/connection.py # # Copyright (c) 2006-2008, R Oudkerk -# All rights reserved. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions -# are met: -# -# 1. Redistributions of source code must retain the above copyright -# notice, this list of conditions and the following disclaimer. -# 2. Redistributions in binary form must reproduce the above copyright -# notice, this list of conditions and the following disclaimer in the -# documentation and/or other materials provided with the distribution. -# 3. Neither the name of author nor the names of any contributors may be -# used to endorse or promote products derived from this software -# without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND -# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE -# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE -# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE -# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL -# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS -# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) -# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT -# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY -# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF -# SUCH DAMAGE. +# Licensed to PSF under a Contributor Agreement. # __all__ = [ 'Client', 'Listener', 'Pipe', 'wait' ] diff --git a/Lib/multiprocessing/forking.py b/Lib/multiprocessing/forking.py index 15fdb0e08b..ca03e955ff 100644 --- a/Lib/multiprocessing/forking.py +++ b/Lib/multiprocessing/forking.py @@ -4,32 +4,7 @@ # multiprocessing/forking.py # # Copyright (c) 2006-2008, R Oudkerk -# All rights reserved. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions -# are met: -# -# 1. Redistributions of source code must retain the above copyright -# notice, this list of conditions and the following disclaimer. -# 2. Redistributions in binary form must reproduce the above copyright -# notice, this list of conditions and the following disclaimer in the -# documentation and/or other materials provided with the distribution. -# 3. Neither the name of author nor the names of any contributors may be -# used to endorse or promote products derived from this software -# without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND -# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE -# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE -# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE -# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL -# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS -# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) -# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT -# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY -# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF -# SUCH DAMAGE. +# Licensed to PSF under a Contributor Agreement. # import os diff --git a/Lib/multiprocessing/heap.py b/Lib/multiprocessing/heap.py index 7e19434b68..311e402d35 100644 --- a/Lib/multiprocessing/heap.py +++ b/Lib/multiprocessing/heap.py @@ -4,32 +4,7 @@ # multiprocessing/heap.py # # Copyright (c) 2006-2008, R Oudkerk -# All rights reserved. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions -# are met: -# -# 1. Redistributions of source code must retain the above copyright -# notice, this list of conditions and the following disclaimer. -# 2. Redistributions in binary form must reproduce the above copyright -# notice, this list of conditions and the following disclaimer in the -# documentation and/or other materials provided with the distribution. -# 3. Neither the name of author nor the names of any contributors may be -# used to endorse or promote products derived from this software -# without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND -# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE -# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE -# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE -# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL -# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS -# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) -# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT -# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY -# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF -# SUCH DAMAGE. +# Licensed to PSF under a Contributor Agreement. # import bisect diff --git a/Lib/multiprocessing/managers.py b/Lib/multiprocessing/managers.py index d1c9d4578e..f47402a3ca 100644 --- a/Lib/multiprocessing/managers.py +++ b/Lib/multiprocessing/managers.py @@ -5,32 +5,7 @@ # multiprocessing/managers.py # # Copyright (c) 2006-2008, R Oudkerk -# All rights reserved. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions -# are met: -# -# 1. Redistributions of source code must retain the above copyright -# notice, this list of conditions and the following disclaimer. -# 2. Redistributions in binary form must reproduce the above copyright -# notice, this list of conditions and the following disclaimer in the -# documentation and/or other materials provided with the distribution. -# 3. Neither the name of author nor the names of any contributors may be -# used to endorse or promote products derived from this software -# without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND -# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE -# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE -# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE -# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL -# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS -# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) -# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT -# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY -# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF -# SUCH DAMAGE. +# Licensed to PSF under a Contributor Agreement. # __all__ = [ 'BaseManager', 'SyncManager', 'BaseProxy', 'Token' ] diff --git a/Lib/multiprocessing/pool.py b/Lib/multiprocessing/pool.py index 7039d1679e..66d7cc7d99 100644 --- a/Lib/multiprocessing/pool.py +++ b/Lib/multiprocessing/pool.py @@ -4,32 +4,7 @@ # multiprocessing/pool.py # # Copyright (c) 2006-2008, R Oudkerk -# All rights reserved. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions -# are met: -# -# 1. Redistributions of source code must retain the above copyright -# notice, this list of conditions and the following disclaimer. -# 2. Redistributions in binary form must reproduce the above copyright -# notice, this list of conditions and the following disclaimer in the -# documentation and/or other materials provided with the distribution. -# 3. Neither the name of author nor the names of any contributors may be -# used to endorse or promote products derived from this software -# without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND -# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE -# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE -# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE -# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL -# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS -# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) -# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT -# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY -# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF -# SUCH DAMAGE. +# Licensed to PSF under a Contributor Agreement. # __all__ = ['Pool'] diff --git a/Lib/multiprocessing/process.py b/Lib/multiprocessing/process.py index b599f11606..dc81ae6149 100644 --- a/Lib/multiprocessing/process.py +++ b/Lib/multiprocessing/process.py @@ -4,32 +4,7 @@ # multiprocessing/process.py # # Copyright (c) 2006-2008, R Oudkerk -# All rights reserved. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions -# are met: -# -# 1. Redistributions of source code must retain the above copyright -# notice, this list of conditions and the following disclaimer. -# 2. Redistributions in binary form must reproduce the above copyright -# notice, this list of conditions and the following disclaimer in the -# documentation and/or other materials provided with the distribution. -# 3. Neither the name of author nor the names of any contributors may be -# used to endorse or promote products derived from this software -# without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND -# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE -# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE -# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE -# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL -# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS -# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) -# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT -# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY -# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF -# SUCH DAMAGE. +# Licensed to PSF under a Contributor Agreement. # __all__ = ['Process', 'current_process', 'active_children'] diff --git a/Lib/multiprocessing/queues.py b/Lib/multiprocessing/queues.py index 262fd85733..37271fb4eb 100644 --- a/Lib/multiprocessing/queues.py +++ b/Lib/multiprocessing/queues.py @@ -4,32 +4,7 @@ # multiprocessing/queues.py # # Copyright (c) 2006-2008, R Oudkerk -# All rights reserved. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions -# are met: -# -# 1. Redistributions of source code must retain the above copyright -# notice, this list of conditions and the following disclaimer. -# 2. Redistributions in binary form must reproduce the above copyright -# notice, this list of conditions and the following disclaimer in the -# documentation and/or other materials provided with the distribution. -# 3. Neither the name of author nor the names of any contributors may be -# used to endorse or promote products derived from this software -# without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND -# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE -# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE -# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE -# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL -# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS -# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) -# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT -# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY -# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF -# SUCH DAMAGE. +# Licensed to PSF under a Contributor Agreement. # __all__ = ['Queue', 'SimpleQueue', 'JoinableQueue'] diff --git a/Lib/multiprocessing/reduction.py b/Lib/multiprocessing/reduction.py index cef445b4d8..84d2fe9ab2 100644 --- a/Lib/multiprocessing/reduction.py +++ b/Lib/multiprocessing/reduction.py @@ -5,32 +5,7 @@ # multiprocessing/reduction.py # # Copyright (c) 2006-2008, R Oudkerk -# All rights reserved. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions -# are met: -# -# 1. Redistributions of source code must retain the above copyright -# notice, this list of conditions and the following disclaimer. -# 2. Redistributions in binary form must reproduce the above copyright -# notice, this list of conditions and the following disclaimer in the -# documentation and/or other materials provided with the distribution. -# 3. Neither the name of author nor the names of any contributors may be -# used to endorse or promote products derived from this software -# without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND -# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE -# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE -# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE -# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL -# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS -# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) -# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT -# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY -# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF -# SUCH DAMAGE. +# Licensed to PSF under a Contributor Agreement. # __all__ = ['reduce_socket', 'reduce_connection', 'send_handle', 'recv_handle'] diff --git a/Lib/multiprocessing/sharedctypes.py b/Lib/multiprocessing/sharedctypes.py index 5826379530..e473749981 100644 --- a/Lib/multiprocessing/sharedctypes.py +++ b/Lib/multiprocessing/sharedctypes.py @@ -4,32 +4,7 @@ # multiprocessing/sharedctypes.py # # Copyright (c) 2006-2008, R Oudkerk -# All rights reserved. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions -# are met: -# -# 1. Redistributions of source code must retain the above copyright -# notice, this list of conditions and the following disclaimer. -# 2. Redistributions in binary form must reproduce the above copyright -# notice, this list of conditions and the following disclaimer in the -# documentation and/or other materials provided with the distribution. -# 3. Neither the name of author nor the names of any contributors may be -# used to endorse or promote products derived from this software -# without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND -# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE -# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE -# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE -# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL -# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS -# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) -# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT -# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY -# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF -# SUCH DAMAGE. +# Licensed to PSF under a Contributor Agreement. # import ctypes diff --git a/Lib/multiprocessing/synchronize.py b/Lib/multiprocessing/synchronize.py index 532ac5c1dd..2c413a9b61 100644 --- a/Lib/multiprocessing/synchronize.py +++ b/Lib/multiprocessing/synchronize.py @@ -4,32 +4,7 @@ # multiprocessing/synchronize.py # # Copyright (c) 2006-2008, R Oudkerk -# All rights reserved. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions -# are met: -# -# 1. Redistributions of source code must retain the above copyright -# notice, this list of conditions and the following disclaimer. -# 2. Redistributions in binary form must reproduce the above copyright -# notice, this list of conditions and the following disclaimer in the -# documentation and/or other materials provided with the distribution. -# 3. Neither the name of author nor the names of any contributors may be -# used to endorse or promote products derived from this software -# without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND -# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE -# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE -# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE -# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL -# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS -# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) -# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT -# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY -# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF -# SUCH DAMAGE. +# Licensed to PSF under a Contributor Agreement. # __all__ = [ diff --git a/Lib/multiprocessing/util.py b/Lib/multiprocessing/util.py index 0bbb87ed35..da99063e57 100644 --- a/Lib/multiprocessing/util.py +++ b/Lib/multiprocessing/util.py @@ -4,32 +4,7 @@ # multiprocessing/util.py # # Copyright (c) 2006-2008, R Oudkerk -# All rights reserved. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions -# are met: -# -# 1. Redistributions of source code must retain the above copyright -# notice, this list of conditions and the following disclaimer. -# 2. Redistributions in binary form must reproduce the above copyright -# notice, this list of conditions and the following disclaimer in the -# documentation and/or other materials provided with the distribution. -# 3. Neither the name of author nor the names of any contributors may be -# used to endorse or promote products derived from this software -# without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND -# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE -# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE -# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE -# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL -# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS -# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) -# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT -# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY -# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF -# SUCH DAMAGE. +# Licensed to PSF under a Contributor Agreement. # import functools -- cgit v1.2.1 From b0b00711e061fd259f637d77fa8b39e09bf328df Mon Sep 17 00:00:00 2001 From: Richard Oudkerk Date: Thu, 3 May 2012 18:29:02 +0100 Subject: Fix dangling warning for test_multiprocessing --- Lib/multiprocessing/managers.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) (limited to 'Lib/multiprocessing') diff --git a/Lib/multiprocessing/managers.py b/Lib/multiprocessing/managers.py index f47402a3ca..dc8166a997 100644 --- a/Lib/multiprocessing/managers.py +++ b/Lib/multiprocessing/managers.py @@ -548,7 +548,10 @@ class BaseManager(object): ''' Join the manager process (if it has been spawned) ''' - self._process.join(timeout) + if self._process is not None: + self._process.join(timeout) + if not self._process.is_alive(): + self._process = None def _debug_info(self): ''' -- cgit v1.2.1 From cbdd52492c4a8072e513c6acc266fd8309990c4a Mon Sep 17 00:00:00 2001 From: Richard Oudkerk Date: Sat, 5 May 2012 19:45:37 +0100 Subject: Fix for Issue 14725 for 3.3 branch. --- Lib/multiprocessing/connection.py | 26 +++++++++++++++++--------- 1 file changed, 17 insertions(+), 9 deletions(-) (limited to 'Lib/multiprocessing') diff --git a/Lib/multiprocessing/connection.py b/Lib/multiprocessing/connection.py index 046a7fc600..acf43b1fef 100644 --- a/Lib/multiprocessing/connection.py +++ b/Lib/multiprocessing/connection.py @@ -621,16 +621,24 @@ if sys.platform == 'win32': def accept(self): self._handle_queue.append(self._new_handle()) handle = self._handle_queue.pop(0) - ov = _winapi.ConnectNamedPipe(handle, overlapped=True) try: - res = _winapi.WaitForMultipleObjects([ov.event], False, INFINITE) - except: - ov.cancel() - _winapi.CloseHandle(handle) - raise - finally: - _, err = ov.GetOverlappedResult(True) - assert err == 0 + ov = _winapi.ConnectNamedPipe(handle, overlapped=True) + except OSError as e: + if e.winerror != _winapi.ERROR_NO_DATA: + raise + # ERROR_NO_DATA can occur if a client has already connected, + # written data and then disconnected -- see Issue 14725. + else: + try: + res = _winapi.WaitForMultipleObjects( + [ov.event], False, INFINITE) + except: + ov.cancel() + _winapi.CloseHandle(handle) + raise + finally: + _, err = ov.GetOverlappedResult(True) + assert err == 0 return PipeConnection(handle) @staticmethod -- cgit v1.2.1 From f3b92e6600bec551e6c59376423c712c9aca1754 Mon Sep 17 00:00:00 2001 From: Richard Oudkerk Date: Sun, 6 May 2012 16:45:02 +0100 Subject: Make AcquirerProxy.acquire() support timeout argument --- Lib/multiprocessing/managers.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) (limited to 'Lib/multiprocessing') diff --git a/Lib/multiprocessing/managers.py b/Lib/multiprocessing/managers.py index dc8166a997..36bcf8f07a 100644 --- a/Lib/multiprocessing/managers.py +++ b/Lib/multiprocessing/managers.py @@ -957,8 +957,9 @@ class IteratorProxy(BaseProxy): class AcquirerProxy(BaseProxy): _exposed_ = ('acquire', 'release') - def acquire(self, blocking=True): - return self._callmethod('acquire', (blocking,)) + def acquire(self, blocking=True, timeout=None): + args = (blocking,) if timeout is None else (blocking, timeout) + return self._callmethod('acquire', args) def release(self): return self._callmethod('release') def __enter__(self): -- cgit v1.2.1 From 541312e9800470310edca4695ae7be96d7ee96ea Mon Sep 17 00:00:00 2001 From: Richard Oudkerk Date: Thu, 10 May 2012 16:11:12 +0100 Subject: Issue #14753: Make multiprocessing treat negative timeouts as it did in 3.2 In Python 3.2 and earlier, Process.join() and Connection.poll() treated negative timeouts as zero timeouts. Earlier versions from the 3.3 line of development treat them as infinite timeouts. The patch reverts to the old behaviour. --- Lib/multiprocessing/connection.py | 7 +------ Lib/multiprocessing/forking.py | 9 +++------ Lib/multiprocessing/util.py | 15 --------------- 3 files changed, 4 insertions(+), 27 deletions(-) (limited to 'Lib/multiprocessing') diff --git a/Lib/multiprocessing/connection.py b/Lib/multiprocessing/connection.py index acf43b1fef..56f375d237 100644 --- a/Lib/multiprocessing/connection.py +++ b/Lib/multiprocessing/connection.py @@ -23,8 +23,7 @@ import itertools import _multiprocessing from multiprocessing import current_process, AuthenticationError, BufferTooShort -from multiprocessing.util import ( - get_temp_dir, Finalize, sub_debug, debug, _eintr_retry) +from multiprocessing.util import get_temp_dir, Finalize, sub_debug, debug from multiprocessing.forking import ForkingPickler try: import _winapi @@ -323,8 +322,6 @@ if _winapi: if (self._got_empty_message or _winapi.PeekNamedPipe(self._handle)[0] != 0): return True - if timeout < 0: - timeout = None return bool(wait([self], timeout)) def _get_more_data(self, ov, maxsize): @@ -402,8 +399,6 @@ class Connection(_ConnectionBase): return self._recv(size) def _poll(self, timeout): - if timeout < 0.0: - timeout = None r = wait([self._handle], timeout) return bool(r) diff --git a/Lib/multiprocessing/forking.py b/Lib/multiprocessing/forking.py index ca03e955ff..2729afe2bb 100644 --- a/Lib/multiprocessing/forking.py +++ b/Lib/multiprocessing/forking.py @@ -75,12 +75,9 @@ else: # if sys.platform != 'win32': - import select - exit = os._exit duplicate = os.dup close = os.close - _select = util._eintr_retry(select.select) # # We define a Popen class similar to the one from subprocess, but @@ -130,10 +127,10 @@ if sys.platform != 'win32': def wait(self, timeout=None): if self.returncode is None: if timeout is not None: - r = _select([self.sentinel], [], [], timeout)[0] - if not r: + from .connection import wait + if not wait([self.sentinel], timeout): return None - # This shouldn't block if select() returned successfully. + # This shouldn't block if wait() returned successfully. return self.poll(os.WNOHANG if timeout == 0.0 else 0) return self.returncode diff --git a/Lib/multiprocessing/util.py b/Lib/multiprocessing/util.py index da99063e57..9b6dac2006 100644 --- a/Lib/multiprocessing/util.py +++ b/Lib/multiprocessing/util.py @@ -295,18 +295,3 @@ class ForkAwareLocal(threading.local): register_after_fork(self, lambda obj : obj.__dict__.clear()) def __reduce__(self): return type(self), () - - -# -# Automatic retry after EINTR -# - -def _eintr_retry(func): - @functools.wraps(func) - def wrapped(*args, **kwargs): - while True: - try: - return func(*args, **kwargs) - except InterruptedError: - continue - return wrapped -- cgit v1.2.1 From 29abc0eadf771e875c648d907600e9b0ebaf5498 Mon Sep 17 00:00:00 2001 From: Richard Oudkerk Date: Fri, 18 May 2012 14:28:02 +0100 Subject: #12098: Make multiprocessing's child processes inherit sys.flags on Windows Initial patch by Sergey Mezentsev. --- Lib/multiprocessing/forking.py | 3 ++- Lib/multiprocessing/util.py | 31 +++++++++++++++++++++++++++++++ 2 files changed, 33 insertions(+), 1 deletion(-) (limited to 'Lib/multiprocessing') diff --git a/Lib/multiprocessing/forking.py b/Lib/multiprocessing/forking.py index 2729afe2bb..eadc321e77 100644 --- a/Lib/multiprocessing/forking.py +++ b/Lib/multiprocessing/forking.py @@ -324,7 +324,8 @@ else: return [sys.executable, '--multiprocessing-fork'] else: prog = 'from multiprocessing.forking import main; main()' - return [_python_exe, '-c', prog, '--multiprocessing-fork'] + opts = util._args_from_interpreter_flags() + return [_python_exe] + opts + ['-c', prog, '--multiprocessing-fork'] def main(): diff --git a/Lib/multiprocessing/util.py b/Lib/multiprocessing/util.py index 9b6dac2006..7c71081056 100644 --- a/Lib/multiprocessing/util.py +++ b/Lib/multiprocessing/util.py @@ -7,6 +7,7 @@ # Licensed to PSF under a Contributor Agreement. # +import sys import functools import itertools import weakref @@ -295,3 +296,33 @@ class ForkAwareLocal(threading.local): register_after_fork(self, lambda obj : obj.__dict__.clear()) def __reduce__(self): return type(self), () + +# +# Get options for python to produce the same sys.flags +# + +def _args_from_interpreter_flags(): + """Return a list of command-line arguments reproducing the current + settings in sys.flags and sys.warnoptions.""" + flag_opt_map = { + 'debug': 'd', + # 'inspect': 'i', + # 'interactive': 'i', + 'optimize': 'O', + 'dont_write_bytecode': 'B', + 'no_user_site': 's', + 'no_site': 'S', + 'ignore_environment': 'E', + 'verbose': 'v', + 'bytes_warning': 'b', + 'quiet': 'q', + 'hash_randomization': 'R', + } + args = [] + for flag, opt in flag_opt_map.items(): + v = getattr(sys.flags, flag) + if v > 0: + args.append('-' + opt * v) + for opt in sys.warnoptions: + args.append('-W' + opt) + return args -- cgit v1.2.1 From 3a5d148189e26c927f979f51f4a0c84ac81799a5 Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Fri, 18 May 2012 18:33:07 +0200 Subject: Move private function _args_from_interpreter_flags() to subprocess.py, so that it can be imported when threads are disabled. (followup to issue #12098) --- Lib/multiprocessing/util.py | 30 +----------------------------- 1 file changed, 1 insertion(+), 29 deletions(-) (limited to 'Lib/multiprocessing') diff --git a/Lib/multiprocessing/util.py b/Lib/multiprocessing/util.py index 7c71081056..cbb01c925b 100644 --- a/Lib/multiprocessing/util.py +++ b/Lib/multiprocessing/util.py @@ -14,6 +14,7 @@ import weakref import atexit import threading # we want threading to install it's # cleanup function before multiprocessing does +from subprocess import _args_from_interpreter_flags from multiprocessing.process import current_process, active_children @@ -297,32 +298,3 @@ class ForkAwareLocal(threading.local): def __reduce__(self): return type(self), () -# -# Get options for python to produce the same sys.flags -# - -def _args_from_interpreter_flags(): - """Return a list of command-line arguments reproducing the current - settings in sys.flags and sys.warnoptions.""" - flag_opt_map = { - 'debug': 'd', - # 'inspect': 'i', - # 'interactive': 'i', - 'optimize': 'O', - 'dont_write_bytecode': 'B', - 'no_user_site': 's', - 'no_site': 'S', - 'ignore_environment': 'E', - 'verbose': 'v', - 'bytes_warning': 'b', - 'quiet': 'q', - 'hash_randomization': 'R', - } - args = [] - for flag, opt in flag_opt_map.items(): - v = getattr(sys.flags, flag) - if v > 0: - args.append('-' + opt * v) - for opt in sys.warnoptions: - args.append('-W' + opt) - return args -- cgit v1.2.1 From b89a4af14f2afb120dfaa2476815c38a025ec5ef Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Fri, 18 May 2012 18:33:32 +0200 Subject: Fix whitespace --- Lib/multiprocessing/util.py | 1 - 1 file changed, 1 deletion(-) (limited to 'Lib/multiprocessing') diff --git a/Lib/multiprocessing/util.py b/Lib/multiprocessing/util.py index cbb01c925b..f0d7e11b1c 100644 --- a/Lib/multiprocessing/util.py +++ b/Lib/multiprocessing/util.py @@ -297,4 +297,3 @@ class ForkAwareLocal(threading.local): register_after_fork(self, lambda obj : obj.__dict__.clear()) def __reduce__(self): return type(self), () - -- cgit v1.2.1 From 2893b3e09e07b498311af9657caf94f3110564f6 Mon Sep 17 00:00:00 2001 From: Richard Oudkerk Date: Fri, 25 May 2012 13:26:53 +0100 Subject: Issue #12091: simplify ApplyResult and MapResult with threading.Event Patch by Charles-Fran?ois Natali --- Lib/multiprocessing/pool.py | 39 +++++++++------------------------------ 1 file changed, 9 insertions(+), 30 deletions(-) (limited to 'Lib/multiprocessing') diff --git a/Lib/multiprocessing/pool.py b/Lib/multiprocessing/pool.py index 66d7cc7d99..149e32a0e1 100644 --- a/Lib/multiprocessing/pool.py +++ b/Lib/multiprocessing/pool.py @@ -526,32 +526,26 @@ class Pool(object): class ApplyResult(object): def __init__(self, cache, callback, error_callback): - self._cond = threading.Condition(threading.Lock()) + self._event = threading.Event() self._job = next(job_counter) self._cache = cache - self._ready = False self._callback = callback self._error_callback = error_callback cache[self._job] = self def ready(self): - return self._ready + return self._event.is_set() def successful(self): - assert self._ready + assert self.ready() return self._success def wait(self, timeout=None): - self._cond.acquire() - try: - if not self._ready: - self._cond.wait(timeout) - finally: - self._cond.release() + self._event.wait(timeout) def get(self, timeout=None): self.wait(timeout) - if not self._ready: + if not self.ready(): raise TimeoutError if self._success: return self._value @@ -564,12 +558,7 @@ class ApplyResult(object): self._callback(self._value) if self._error_callback and not self._success: self._error_callback(self._value) - self._cond.acquire() - try: - self._ready = True - self._cond.notify() - finally: - self._cond.release() + self._event.set() del self._cache[self._job] # @@ -586,7 +575,7 @@ class MapResult(ApplyResult): self._chunksize = chunksize if chunksize <= 0: self._number_left = 0 - self._ready = True + self._event.set() else: self._number_left = length//chunksize + bool(length % chunksize) @@ -599,24 +588,14 @@ class MapResult(ApplyResult): if self._callback: self._callback(self._value) del self._cache[self._job] - self._cond.acquire() - try: - self._ready = True - self._cond.notify() - finally: - self._cond.release() + self._event.set() else: self._success = False self._value = result if self._error_callback: self._error_callback(self._value) del self._cache[self._job] - self._cond.acquire() - try: - self._ready = True - self._cond.notify() - finally: - self._cond.release() + self._event.set() # # Class whose instances are returned by `Pool.imap()` -- cgit v1.2.1 From cc6e79acf4d6346efa0d059a391406d1eee76ad2 Mon Sep 17 00:00:00 2001 From: Richard Oudkerk Date: Fri, 25 May 2012 13:54:53 +0100 Subject: Issue #14548: Make multiprocessing finalizers check pid before running This protects from possibilty of gc running just after fork. --- Lib/multiprocessing/util.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) (limited to 'Lib/multiprocessing') diff --git a/Lib/multiprocessing/util.py b/Lib/multiprocessing/util.py index f0d7e11b1c..16ded51221 100644 --- a/Lib/multiprocessing/util.py +++ b/Lib/multiprocessing/util.py @@ -9,6 +9,7 @@ import sys import functools +import os import itertools import weakref import atexit @@ -161,6 +162,7 @@ class Finalize(object): self._args = args self._kwargs = kwargs or {} self._key = (exitpriority, next(_finalizer_counter)) + self._pid = os.getpid() _finalizer_registry[self._key] = self @@ -177,9 +179,13 @@ class Finalize(object): except KeyError: sub_debug('finalizer no longer registered') else: - sub_debug('finalizer calling %s with args %s and kwargs %s', - self._callback, self._args, self._kwargs) - res = self._callback(*self._args, **self._kwargs) + if self._pid != os.getpid(): + sub_debug('finalizer ignored because different process') + res = None + else: + sub_debug('finalizer calling %s with args %s and kwargs %s', + self._callback, self._args, self._kwargs) + res = self._callback(*self._args, **self._kwargs) self._weakref = self._callback = self._args = \ self._kwargs = self._key = None return res -- cgit v1.2.1 From ef0d432bcb938873aa4957417adb082166924012 Mon Sep 17 00:00:00 2001 From: Richard Oudkerk Date: Sat, 26 May 2012 22:09:59 +0100 Subject: Make multiprocessing's shared memory use memoryview instead of raw pointer --- Lib/multiprocessing/heap.py | 11 +++-------- Lib/multiprocessing/sharedctypes.py | 3 ++- 2 files changed, 5 insertions(+), 9 deletions(-) (limited to 'Lib/multiprocessing') diff --git a/Lib/multiprocessing/heap.py b/Lib/multiprocessing/heap.py index 311e402d35..4e93c12f81 100644 --- a/Lib/multiprocessing/heap.py +++ b/Lib/multiprocessing/heap.py @@ -205,7 +205,7 @@ class Heap(object): self._lock.release() # -# Class representing a chunk of an mmap -- can be inherited +# Class representing a chunk of an mmap -- can be inherited by child process # class BufferWrapper(object): @@ -218,11 +218,6 @@ class BufferWrapper(object): self._state = (block, size) Finalize(self, BufferWrapper._heap.free, args=(block,)) - def get_address(self): + def create_memoryview(self): (arena, start, stop), size = self._state - address, length = _multiprocessing.address_of_buffer(arena.buffer) - assert size <= length - return address + start - - def get_size(self): - return self._state[1] + return memoryview(arena.buffer)[start:start+size] diff --git a/Lib/multiprocessing/sharedctypes.py b/Lib/multiprocessing/sharedctypes.py index e473749981..6dc160bf75 100644 --- a/Lib/multiprocessing/sharedctypes.py +++ b/Lib/multiprocessing/sharedctypes.py @@ -132,7 +132,8 @@ def rebuild_ctype(type_, wrapper, length): if length is not None: type_ = type_ * length ForkingPickler.register(type_, reduce_ctype) - obj = type_.from_address(wrapper.get_address()) + buf = wrapper.create_memoryview() + obj = type_.from_buffer(buf) obj._wrapper = wrapper return obj -- cgit v1.2.1 From 26731e96801de1e2ef4616ffd1c6a998eb1aec41 Mon Sep 17 00:00:00 2001 From: Richard Oudkerk Date: Tue, 29 May 2012 12:01:45 +0100 Subject: Remove __getslice__, __setslice__, __delslice__ methods from proxies Proxy classes in multiprocessing do not need these methods in Python 3.x. --- Lib/multiprocessing/managers.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) (limited to 'Lib/multiprocessing') diff --git a/Lib/multiprocessing/managers.py b/Lib/multiprocessing/managers.py index 36bcf8f07a..6a7dccb007 100644 --- a/Lib/multiprocessing/managers.py +++ b/Lib/multiprocessing/managers.py @@ -1035,12 +1035,11 @@ class ValueProxy(BaseProxy): BaseListProxy = MakeProxyType('BaseListProxy', ( - '__add__', '__contains__', '__delitem__', '__delslice__', - '__getitem__', '__getslice__', '__len__', '__mul__', - '__reversed__', '__rmul__', '__setitem__', '__setslice__', + '__add__', '__contains__', '__delitem__', '__getitem__', '__len__', + '__mul__', '__reversed__', '__rmul__', '__setitem__', 'append', 'count', 'extend', 'index', 'insert', 'pop', 'remove', 'reverse', 'sort', '__imul__' - )) # XXX __getslice__ and __setslice__ unneeded in Py3.0 + )) class ListProxy(BaseListProxy): def __iadd__(self, value): self._callmethod('extend', (value,)) @@ -1058,8 +1057,8 @@ DictProxy = MakeProxyType('DictProxy', ( ArrayProxy = MakeProxyType('ArrayProxy', ( - '__len__', '__getitem__', '__setitem__', '__getslice__', '__setslice__' - )) # XXX __getslice__ and __setslice__ unneeded in Py3.0 + '__len__', '__getitem__', '__setitem__' + )) PoolProxy = MakeProxyType('PoolProxy', ( -- cgit v1.2.1 From 8d4b1ca672f75f0587a3a8a6973fd2438e485eae Mon Sep 17 00:00:00 2001 From: Richard Oudkerk Date: Tue, 29 May 2012 12:01:47 +0100 Subject: Use Python 3.x-style keyword only arg in Array() Previously a Python 2.x compatible hack was used for multiprocessing.sharedctypes.Array(). Also the documented signature was wrong. --- Lib/multiprocessing/__init__.py | 8 ++++---- Lib/multiprocessing/sharedctypes.py | 7 ++----- 2 files changed, 6 insertions(+), 9 deletions(-) (limited to 'Lib/multiprocessing') diff --git a/Lib/multiprocessing/__init__.py b/Lib/multiprocessing/__init__.py index 0bf3d9c657..02460f0241 100644 --- a/Lib/multiprocessing/__init__.py +++ b/Lib/multiprocessing/__init__.py @@ -228,19 +228,19 @@ def RawArray(typecode_or_type, size_or_initializer): from multiprocessing.sharedctypes import RawArray return RawArray(typecode_or_type, size_or_initializer) -def Value(typecode_or_type, *args, **kwds): +def Value(typecode_or_type, *args, lock=True): ''' Returns a synchronized shared object ''' from multiprocessing.sharedctypes import Value - return Value(typecode_or_type, *args, **kwds) + return Value(typecode_or_type, *args, lock=lock) -def Array(typecode_or_type, size_or_initializer, **kwds): +def Array(typecode_or_type, size_or_initializer, *, lock=True): ''' Returns a synchronized shared array ''' from multiprocessing.sharedctypes import Array - return Array(typecode_or_type, size_or_initializer, **kwds) + return Array(typecode_or_type, size_or_initializer, lock=lock) # # diff --git a/Lib/multiprocessing/sharedctypes.py b/Lib/multiprocessing/sharedctypes.py index 6dc160bf75..a358ed4f12 100644 --- a/Lib/multiprocessing/sharedctypes.py +++ b/Lib/multiprocessing/sharedctypes.py @@ -63,7 +63,7 @@ def RawArray(typecode_or_type, size_or_initializer): result.__init__(*size_or_initializer) return result -def Value(typecode_or_type, *args, lock=None): +def Value(typecode_or_type, *args, lock=True): ''' Return a synchronization wrapper for a Value ''' @@ -76,13 +76,10 @@ def Value(typecode_or_type, *args, lock=None): raise AttributeError("'%r' has no method 'acquire'" % lock) return synchronized(obj, lock) -def Array(typecode_or_type, size_or_initializer, **kwds): +def Array(typecode_or_type, size_or_initializer, *, lock=True): ''' Return a synchronization wrapper for a RawArray ''' - lock = kwds.pop('lock', None) - if kwds: - raise ValueError('unrecognized keyword argument(s): %s' % list(kwds.keys())) obj = RawArray(typecode_or_type, size_or_initializer) if lock is False: return obj -- cgit v1.2.1 From 449b23c41551526824ef0e30f94788c9b21da7ba Mon Sep 17 00:00:00 2001 From: Richard Oudkerk Date: Mon, 4 Jun 2012 18:58:59 +0100 Subject: Make Finalize reserve a reference to os.getpid in case called at shutdown --- Lib/multiprocessing/util.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'Lib/multiprocessing') diff --git a/Lib/multiprocessing/util.py b/Lib/multiprocessing/util.py index 16ded51221..48abe383fa 100644 --- a/Lib/multiprocessing/util.py +++ b/Lib/multiprocessing/util.py @@ -170,7 +170,7 @@ class Finalize(object): # Need to bind these locally because the globals can have # been cleared at shutdown _finalizer_registry=_finalizer_registry, - sub_debug=sub_debug): + sub_debug=sub_debug, getpid=os.getpid): ''' Run the callback unless it has already been called or cancelled ''' @@ -179,7 +179,7 @@ class Finalize(object): except KeyError: sub_debug('finalizer no longer registered') else: - if self._pid != os.getpid(): + if self._pid != getpid(): sub_debug('finalizer ignored because different process') res = None else: -- cgit v1.2.1 From 0f44bc68eeb1327c8ff7f91b3532d93d7ed53b26 Mon Sep 17 00:00:00 2001 From: Richard Oudkerk Date: Mon, 4 Jun 2012 18:59:07 +0100 Subject: Fix potential NameError in multiprocessing.Condition.wait() --- Lib/multiprocessing/synchronize.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) (limited to 'Lib/multiprocessing') diff --git a/Lib/multiprocessing/synchronize.py b/Lib/multiprocessing/synchronize.py index 2c413a9b61..4502a97e99 100644 --- a/Lib/multiprocessing/synchronize.py +++ b/Lib/multiprocessing/synchronize.py @@ -216,7 +216,7 @@ class Condition(object): try: # wait for notification or timeout - ret = self._wait_semaphore.acquire(True, timeout) + return self._wait_semaphore.acquire(True, timeout) finally: # indicate that this thread has woken self._woken_count.release() @@ -224,7 +224,6 @@ class Condition(object): # reacquire lock for i in range(count): self._lock.acquire() - return ret def notify(self): assert self._lock._semlock._is_mine(), 'lock is not owned' -- cgit v1.2.1 From 7befc604342c38556e303830707c752979eba620 Mon Sep 17 00:00:00 2001 From: Richard Oudkerk Date: Mon, 4 Jun 2012 18:59:10 +0100 Subject: Prevent handle leak if CreateProcess() fails in multiprocessing --- Lib/multiprocessing/forking.py | 52 ++++++++++++++++++++++-------------------- 1 file changed, 27 insertions(+), 25 deletions(-) (limited to 'Lib/multiprocessing') diff --git a/Lib/multiprocessing/forking.py b/Lib/multiprocessing/forking.py index eadc321e77..3a474cd7d9 100644 --- a/Lib/multiprocessing/forking.py +++ b/Lib/multiprocessing/forking.py @@ -209,6 +209,9 @@ else: _tls = _thread._local() def __init__(self, process_obj): + cmd = ' '.join('"%s"' % x for x in get_command_line()) + prep_data = get_preparation_data(process_obj._name) + # create pipe for communication with child rfd, wfd = os.pipe() @@ -216,31 +219,30 @@ else: rhandle = duplicate(msvcrt.get_osfhandle(rfd), inheritable=True) os.close(rfd) - # start process - cmd = get_command_line() + [rhandle] - cmd = ' '.join('"%s"' % x for x in cmd) - hp, ht, pid, tid = _winapi.CreateProcess( - _python_exe, cmd, None, None, 1, 0, None, None, None - ) - _winapi.CloseHandle(ht) - close(rhandle) - - # set attributes of self - self.pid = pid - self.returncode = None - self._handle = hp - self.sentinel = int(hp) - - # send information to child - prep_data = get_preparation_data(process_obj._name) - to_child = os.fdopen(wfd, 'wb') - Popen._tls.process_handle = int(hp) - try: - dump(prep_data, to_child, HIGHEST_PROTOCOL) - dump(process_obj, to_child, HIGHEST_PROTOCOL) - finally: - del Popen._tls.process_handle - to_child.close() + with open(wfd, 'wb', closefd=True) as to_child: + # start process + try: + hp, ht, pid, tid = _winapi.CreateProcess( + _python_exe, cmd + (' %s' % rhandle), + None, None, 1, 0, None, None, None + ) + _winapi.CloseHandle(ht) + finally: + close(rhandle) + + # set attributes of self + self.pid = pid + self.returncode = None + self._handle = hp + self.sentinel = int(hp) + + # send information to child + Popen._tls.process_handle = int(hp) + try: + dump(prep_data, to_child, HIGHEST_PROTOCOL) + dump(process_obj, to_child, HIGHEST_PROTOCOL) + finally: + del Popen._tls.process_handle @staticmethod def thread_is_spawning(): -- cgit v1.2.1 From b6145bbf60eff64fceda9bccc929351d4c57b5d5 Mon Sep 17 00:00:00 2001 From: Richard Oudkerk Date: Mon, 11 Jun 2012 17:56:08 +0100 Subject: Issue #3518: Remove references to non-existent BaseManager.from_address() method --- Lib/multiprocessing/managers.py | 4 ---- 1 file changed, 4 deletions(-) (limited to 'Lib/multiprocessing') diff --git a/Lib/multiprocessing/managers.py b/Lib/multiprocessing/managers.py index 6a7dccb007..7059095400 100644 --- a/Lib/multiprocessing/managers.py +++ b/Lib/multiprocessing/managers.py @@ -455,10 +455,6 @@ class BaseManager(object): self._serializer = serializer self._Listener, self._Client = listener_client[serializer] - def __reduce__(self): - return type(self).from_address, \ - (self._address, self._authkey, self._serializer) - def get_server(self): ''' Return server object with serve_forever() method and address attribute -- cgit v1.2.1 From 226745e198ee5a91f8f3a5ca1618cdc0afd14c81 Mon Sep 17 00:00:00 2001 From: Richard Oudkerk Date: Thu, 14 Jun 2012 15:30:10 +0100 Subject: Issue #13841: Make child processes exit using sys.exit() on Windows --- Lib/multiprocessing/forking.py | 6 ++-- Lib/multiprocessing/managers.py | 67 ++++++++++++++++++----------------------- Lib/multiprocessing/util.py | 27 +++++++++-------- 3 files changed, 46 insertions(+), 54 deletions(-) (limited to 'Lib/multiprocessing') diff --git a/Lib/multiprocessing/forking.py b/Lib/multiprocessing/forking.py index 3a474cd7d9..4baf5486c0 100644 --- a/Lib/multiprocessing/forking.py +++ b/Lib/multiprocessing/forking.py @@ -13,7 +13,7 @@ import signal from multiprocessing import util, process -__all__ = ['Popen', 'assert_spawning', 'exit', 'duplicate', 'close', 'ForkingPickler'] +__all__ = ['Popen', 'assert_spawning', 'duplicate', 'close', 'ForkingPickler'] # # Check that the current thread is spawning a child process @@ -75,7 +75,6 @@ else: # if sys.platform != 'win32': - exit = os._exit duplicate = os.dup close = os.close @@ -168,7 +167,6 @@ else: WINEXE = (sys.platform == 'win32' and getattr(sys, 'frozen', False)) WINSERVICE = sys.executable.lower().endswith("pythonservice.exe") - exit = _winapi.ExitProcess close = _winapi.CloseHandle # @@ -349,7 +347,7 @@ else: from_parent.close() exitcode = self._bootstrap() - exit(exitcode) + sys.exit(exitcode) def get_preparation_data(name): diff --git a/Lib/multiprocessing/managers.py b/Lib/multiprocessing/managers.py index 7059095400..817d2321d8 100644 --- a/Lib/multiprocessing/managers.py +++ b/Lib/multiprocessing/managers.py @@ -22,7 +22,7 @@ import queue from traceback import format_exc from multiprocessing import Process, current_process, active_children, Pool, util, connection from multiprocessing.process import AuthenticationString -from multiprocessing.forking import exit, Popen, ForkingPickler +from multiprocessing.forking import Popen, ForkingPickler from time import time as _time # @@ -140,28 +140,38 @@ class Server(object): self.id_to_obj = {'0': (None, ())} self.id_to_refcount = {} self.mutex = threading.RLock() - self.stop = 0 def serve_forever(self): ''' Run the server forever ''' + self.stop_event = threading.Event() current_process()._manager_server = self try: + accepter = threading.Thread(target=self.accepter) + accepter.daemon = True + accepter.start() try: - while 1: - try: - c = self.listener.accept() - except (OSError, IOError): - continue - t = threading.Thread(target=self.handle_request, args=(c,)) - t.daemon = True - t.start() + while not self.stop_event.is_set(): + self.stop_event.wait(1) except (KeyboardInterrupt, SystemExit): pass finally: - self.stop = 999 - self.listener.close() + if sys.stdout != sys.__stdout__: + util.debug('resetting stdout, stderr') + sys.stdout = sys.__stdout__ + sys.stderr = sys.__stderr__ + sys.exit(0) + + def accepter(self): + while True: + try: + c = self.listener.accept() + except (OSError, IOError): + continue + t = threading.Thread(target=self.handle_request, args=(c,)) + t.daemon = True + t.start() def handle_request(self, c): ''' @@ -208,7 +218,7 @@ class Server(object): send = conn.send id_to_obj = self.id_to_obj - while not self.stop: + while not self.stop_event.is_set(): try: methodname = obj = None @@ -318,32 +328,13 @@ class Server(object): Shutdown this process ''' try: - try: - util.debug('manager received shutdown message') - c.send(('#RETURN', None)) - - if sys.stdout != sys.__stdout__: - util.debug('resetting stdout, stderr') - sys.stdout = sys.__stdout__ - sys.stderr = sys.__stderr__ - - util._run_finalizers(0) - - for p in active_children(): - util.debug('terminating a child process of manager') - p.terminate() - - for p in active_children(): - util.debug('terminating a child process of manager') - p.join() - - util._run_finalizers() - util.info('manager exiting with exitcode 0') - except: - import traceback - traceback.print_exc() + util.debug('manager received shutdown message') + c.send(('#RETURN', None)) + except: + import traceback + traceback.print_exc() finally: - exit(0) + self.stop_event.set() def create(self, c, typeid, *args, **kwds): ''' diff --git a/Lib/multiprocessing/util.py b/Lib/multiprocessing/util.py index 48abe383fa..8a6aede162 100644 --- a/Lib/multiprocessing/util.py +++ b/Lib/multiprocessing/util.py @@ -269,21 +269,24 @@ _exiting = False def _exit_function(): global _exiting - info('process shutting down') - debug('running all "atexit" finalizers with priority >= 0') - _run_finalizers(0) + if not _exiting: + _exiting = True - for p in active_children(): - if p._daemonic: - info('calling terminate() for daemon %s', p.name) - p._popen.terminate() + info('process shutting down') + debug('running all "atexit" finalizers with priority >= 0') + _run_finalizers(0) - for p in active_children(): - info('calling join() for process %s', p.name) - p.join() + for p in active_children(): + if p._daemonic: + info('calling terminate() for daemon %s', p.name) + p._popen.terminate() - debug('running the remaining "atexit" finalizers') - _run_finalizers() + for p in active_children(): + info('calling join() for process %s', p.name) + p.join() + + debug('running the remaining "atexit" finalizers') + _run_finalizers() atexit.register(_exit_function) -- cgit v1.2.1 From 0d62535e2243cb367380358f16276cfe498eff46 Mon Sep 17 00:00:00 2001 From: Richard Oudkerk Date: Fri, 15 Jun 2012 18:26:07 +0100 Subject: Issue #14059: Implement multiprocessing.Barrier --- Lib/multiprocessing/__init__.py | 11 ++++++++-- Lib/multiprocessing/dummy/__init__.py | 4 ++-- Lib/multiprocessing/managers.py | 21 ++++++++++++++++++ Lib/multiprocessing/synchronize.py | 40 +++++++++++++++++++++++++++++++++++ 4 files changed, 72 insertions(+), 4 deletions(-) (limited to 'Lib/multiprocessing') diff --git a/Lib/multiprocessing/__init__.py b/Lib/multiprocessing/__init__.py index 02460f0241..1f3e67c9b8 100644 --- a/Lib/multiprocessing/__init__.py +++ b/Lib/multiprocessing/__init__.py @@ -23,8 +23,8 @@ __all__ = [ 'Manager', 'Pipe', 'cpu_count', 'log_to_stderr', 'get_logger', 'allow_connection_pickling', 'BufferTooShort', 'TimeoutError', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Condition', - 'Event', 'Queue', 'SimpleQueue', 'JoinableQueue', 'Pool', 'Value', 'Array', - 'RawValue', 'RawArray', 'SUBDEBUG', 'SUBWARNING', + 'Event', 'Barrier', 'Queue', 'SimpleQueue', 'JoinableQueue', 'Pool', + 'Value', 'Array', 'RawValue', 'RawArray', 'SUBDEBUG', 'SUBWARNING', ] __author__ = 'R. Oudkerk (r.m.oudkerk@gmail.com)' @@ -186,6 +186,13 @@ def Event(): from multiprocessing.synchronize import Event return Event() +def Barrier(parties, action=None, timeout=None): + ''' + Returns a barrier object + ''' + from multiprocessing.synchronize import Barrier + return Barrier(parties, action, timeout) + def Queue(maxsize=0): ''' Returns a queue object diff --git a/Lib/multiprocessing/dummy/__init__.py b/Lib/multiprocessing/dummy/__init__.py index 9bf8f6bf6b..e31fc61572 100644 --- a/Lib/multiprocessing/dummy/__init__.py +++ b/Lib/multiprocessing/dummy/__init__.py @@ -35,7 +35,7 @@ __all__ = [ 'Process', 'current_process', 'active_children', 'freeze_support', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Condition', - 'Event', 'Queue', 'Manager', 'Pipe', 'Pool', 'JoinableQueue' + 'Event', 'Barrier', 'Queue', 'Manager', 'Pipe', 'Pool', 'JoinableQueue' ] # @@ -49,7 +49,7 @@ import array from multiprocessing.dummy.connection import Pipe from threading import Lock, RLock, Semaphore, BoundedSemaphore -from threading import Event, Condition +from threading import Event, Condition, Barrier from queue import Queue # diff --git a/Lib/multiprocessing/managers.py b/Lib/multiprocessing/managers.py index 817d2321d8..cded4f39d5 100644 --- a/Lib/multiprocessing/managers.py +++ b/Lib/multiprocessing/managers.py @@ -993,6 +993,26 @@ class EventProxy(BaseProxy): def wait(self, timeout=None): return self._callmethod('wait', (timeout,)) + +class BarrierProxy(BaseProxy): + _exposed_ = ('__getattribute__', 'wait', 'abort', 'reset') + def wait(self, timeout=None): + return self._callmethod('wait', (timeout,)) + def abort(self): + return self._callmethod('abort') + def reset(self): + return self._callmethod('reset') + @property + def parties(self): + return self._callmethod('__getattribute__', ('parties',)) + @property + def n_waiting(self): + return self._callmethod('__getattribute__', ('n_waiting',)) + @property + def broken(self): + return self._callmethod('__getattribute__', ('broken',)) + + class NamespaceProxy(BaseProxy): _exposed_ = ('__getattribute__', '__setattr__', '__delattr__') def __getattr__(self, key): @@ -1084,6 +1104,7 @@ SyncManager.register('Semaphore', threading.Semaphore, AcquirerProxy) SyncManager.register('BoundedSemaphore', threading.BoundedSemaphore, AcquirerProxy) SyncManager.register('Condition', threading.Condition, ConditionProxy) +SyncManager.register('Barrier', threading.Barrier, BarrierProxy) SyncManager.register('Pool', Pool, PoolProxy) SyncManager.register('list', list, ListProxy) SyncManager.register('dict', dict, DictProxy) diff --git a/Lib/multiprocessing/synchronize.py b/Lib/multiprocessing/synchronize.py index 4502a97e99..22eabe55b8 100644 --- a/Lib/multiprocessing/synchronize.py +++ b/Lib/multiprocessing/synchronize.py @@ -333,3 +333,43 @@ class Event(object): return False finally: self._cond.release() + +# +# Barrier +# + +class Barrier(threading.Barrier): + + def __init__(self, parties, action=None, timeout=None): + import struct + from multiprocessing.heap import BufferWrapper + wrapper = BufferWrapper(struct.calcsize('i') * 2) + cond = Condition() + self.__setstate__((parties, action, timeout, cond, wrapper)) + self._state = 0 + self._count = 0 + + def __setstate__(self, state): + (self._parties, self._action, self._timeout, + self._cond, self._wrapper) = state + self._array = self._wrapper.create_memoryview().cast('i') + + def __getstate__(self): + return (self._parties, self._action, self._timeout, + self._cond, self._wrapper) + + @property + def _state(self): + return self._array[0] + + @_state.setter + def _state(self, value): + self._array[0] = value + + @property + def _count(self): + return self._array[1] + + @_count.setter + def _count(self, value): + self._array[1] = value -- cgit v1.2.1 From a280214f8c3b722cb082c7265448ea2d92eefb46 Mon Sep 17 00:00:00 2001 From: Richard Oudkerk Date: Fri, 15 Jun 2012 20:08:29 +0100 Subject: Increase timeout used when waiting for manager to shutdown cleanly before resorting to terminate() --- Lib/multiprocessing/managers.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'Lib/multiprocessing') diff --git a/Lib/multiprocessing/managers.py b/Lib/multiprocessing/managers.py index cded4f39d5..f6611af9a6 100644 --- a/Lib/multiprocessing/managers.py +++ b/Lib/multiprocessing/managers.py @@ -582,7 +582,7 @@ class BaseManager(object): except Exception: pass - process.join(timeout=0.2) + process.join(timeout=1.0) if process.is_alive(): util.info('manager still alive') if hasattr(process, 'terminate'): -- cgit v1.2.1 From 9d0779e4f5aeea0aeaab24cbebede4ea985b56f3 Mon Sep 17 00:00:00 2001 From: Richard Oudkerk Date: Mon, 18 Jun 2012 17:47:52 +0100 Subject: Issue #15064: Implement context manager protocol for multiprocessing types --- Lib/multiprocessing/connection.py | 18 +++++++++++++++++- Lib/multiprocessing/dummy/connection.py | 12 ++++++++++++ Lib/multiprocessing/pool.py | 6 ++++++ 3 files changed, 35 insertions(+), 1 deletion(-) (limited to 'Lib/multiprocessing') diff --git a/Lib/multiprocessing/connection.py b/Lib/multiprocessing/connection.py index 56f375d237..e5694e3587 100644 --- a/Lib/multiprocessing/connection.py +++ b/Lib/multiprocessing/connection.py @@ -257,6 +257,12 @@ class _ConnectionBase: self._check_readable() return self._poll(timeout) + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_value, exc_tb): + self.close() + if _winapi: @@ -436,6 +442,8 @@ class Listener(object): Returns a `Connection` object. ''' + if self._listener is None: + raise IOError('listener is closed') c = self._listener.accept() if self._authkey: deliver_challenge(c, self._authkey) @@ -446,11 +454,19 @@ class Listener(object): ''' Close the bound socket or named pipe of `self`. ''' - return self._listener.close() + if self._listener is not None: + self._listener.close() + self._listener = None address = property(lambda self: self._listener._address) last_accepted = property(lambda self: self._listener._last_accepted) + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_value, exc_tb): + self.close() + def Client(address, family=None, authkey=None): ''' diff --git a/Lib/multiprocessing/dummy/connection.py b/Lib/multiprocessing/dummy/connection.py index af105794f1..874ec8e432 100644 --- a/Lib/multiprocessing/dummy/connection.py +++ b/Lib/multiprocessing/dummy/connection.py @@ -53,6 +53,12 @@ class Listener(object): address = property(lambda self: self._backlog_queue) + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_value, exc_tb): + self.close() + def Client(address): _in, _out = Queue(), Queue() @@ -85,3 +91,9 @@ class Connection(object): def close(self): pass + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_value, exc_tb): + self.close() diff --git a/Lib/multiprocessing/pool.py b/Lib/multiprocessing/pool.py index 59e547a99c..9e07e32de6 100644 --- a/Lib/multiprocessing/pool.py +++ b/Lib/multiprocessing/pool.py @@ -522,6 +522,12 @@ class Pool(object): debug('cleaning up worker %d' % p.pid) p.join() + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.terminate() + # # Class whose instances are returned by `Pool.apply_async()` # -- cgit v1.2.1 From 6ea60aa205dbcb9a3edaf74801972cc73ec40a43 Mon Sep 17 00:00:00 2001 From: Richard Oudkerk Date: Mon, 18 Jun 2012 21:29:30 +0100 Subject: Issue #15064: Make BaseManager.__enter__() start server if necessary. --- Lib/multiprocessing/managers.py | 3 +++ 1 file changed, 3 insertions(+) (limited to 'Lib/multiprocessing') diff --git a/Lib/multiprocessing/managers.py b/Lib/multiprocessing/managers.py index f6611af9a6..1ab147e29e 100644 --- a/Lib/multiprocessing/managers.py +++ b/Lib/multiprocessing/managers.py @@ -561,6 +561,9 @@ class BaseManager(object): conn.close() def __enter__(self): + if self._state.value == State.INITIAL: + self.start() + assert self._state.value == State.STARTED return self def __exit__(self, exc_type, exc_val, exc_tb): -- cgit v1.2.1 From d5ae6ab8f3a3cc8ee1444cbdc6518cc424d7e0d0 Mon Sep 17 00:00:00 2001 From: Richard Oudkerk Date: Fri, 27 Jul 2012 14:19:00 +0100 Subject: Issue #6056: Make multiprocessing use setblocking(True) on the sockets it uses. Original patch by J Derek Wilson. --- Lib/multiprocessing/connection.py | 5 +++++ 1 file changed, 5 insertions(+) (limited to 'Lib/multiprocessing') diff --git a/Lib/multiprocessing/connection.py b/Lib/multiprocessing/connection.py index e5694e3587..fbbd5d91d3 100644 --- a/Lib/multiprocessing/connection.py +++ b/Lib/multiprocessing/connection.py @@ -497,6 +497,8 @@ if sys.platform != 'win32': ''' if duplex: s1, s2 = socket.socketpair() + s1.setblocking(True) + s2.setblocking(True) c1 = Connection(s1.detach()) c2 = Connection(s2.detach()) else: @@ -561,6 +563,7 @@ class SocketListener(object): if os.name == 'posix': self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + self._socket.setblocking(True) self._socket.bind(address) self._socket.listen(backlog) self._address = self._socket.getsockname() @@ -579,6 +582,7 @@ class SocketListener(object): def accept(self): s, self._last_accepted = self._socket.accept() + s.setblocking(True) return Connection(s.detach()) def close(self): @@ -593,6 +597,7 @@ def SocketClient(address): ''' family = address_type(address) with socket.socket( getattr(socket, family) ) as s: + s.setblocking(True) s.connect(address) return Connection(s.detach()) -- cgit v1.2.1 From 0f8b610f08df2df74cb7db465faf7e3d837c6bd3 Mon Sep 17 00:00:00 2001 From: Richard Oudkerk Date: Wed, 1 Aug 2012 17:44:18 +0100 Subject: Issue #15525: Increase timeout when TerminateProcess() fails --- Lib/multiprocessing/forking.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'Lib/multiprocessing') diff --git a/Lib/multiprocessing/forking.py b/Lib/multiprocessing/forking.py index 4baf5486c0..3beb816593 100644 --- a/Lib/multiprocessing/forking.py +++ b/Lib/multiprocessing/forking.py @@ -273,8 +273,8 @@ else: if self.returncode is None: try: _winapi.TerminateProcess(int(self._handle), TERMINATE) - except WindowsError: - if self.wait(timeout=0.1) is None: + except OSError: + if self.wait(timeout=1.0) is None: raise # -- cgit v1.2.1 From 20bd3d67ae76a18614734d2e3b4a36b8c081649d Mon Sep 17 00:00:00 2001 From: Richard Oudkerk Date: Thu, 16 Aug 2012 16:48:55 +0100 Subject: Issue #14669: Fix pickling of connections and sockets on MacOSX by sending/receiving an acknowledgment after file descriptor transfer. TestPicklingConnection has been reenabled for MacOSX. --- Lib/multiprocessing/reduction.py | 8 ++++++++ 1 file changed, 8 insertions(+) (limited to 'Lib/multiprocessing') diff --git a/Lib/multiprocessing/reduction.py b/Lib/multiprocessing/reduction.py index 84d2fe9ab2..656fa8ff6b 100644 --- a/Lib/multiprocessing/reduction.py +++ b/Lib/multiprocessing/reduction.py @@ -120,16 +120,24 @@ if sys.platform == 'win32': else: # Unix + + # On MacOSX we should acknowledge receipt of fds -- see Issue14669 + ACKNOWLEDGE = sys.platform == 'darwin' + def send_handle(conn, handle, destination_pid): with socket.fromfd(conn.fileno(), socket.AF_UNIX, socket.SOCK_STREAM) as s: s.sendmsg([b'x'], [(socket.SOL_SOCKET, socket.SCM_RIGHTS, struct.pack("@i", handle))]) + if ACKNOWLEDGE and conn.recv_bytes() != b'ACK': + raise RuntimeError('did not receive acknowledgement of fd') def recv_handle(conn): size = struct.calcsize("@i") with socket.fromfd(conn.fileno(), socket.AF_UNIX, socket.SOCK_STREAM) as s: msg, ancdata, flags, addr = s.recvmsg(1, socket.CMSG_LEN(size)) try: + if ACKNOWLEDGE: + conn.send_bytes(b'ACK') cmsg_level, cmsg_type, cmsg_data = ancdata[0] if (cmsg_level == socket.SOL_SOCKET and cmsg_type == socket.SCM_RIGHTS): -- cgit v1.2.1 From 760cb577780b1af1bf3667f757249fde9e7d5125 Mon Sep 17 00:00:00 2001 From: Alexander Belopolsky Date: Sun, 9 Sep 2012 13:20:58 -0400 Subject: Issue #15881: Fixed atexit hook in multiprocessing. --- Lib/multiprocessing/util.py | 41 ++++++++++++++++++++++++++++++++--------- 1 file changed, 32 insertions(+), 9 deletions(-) (limited to 'Lib/multiprocessing') diff --git a/Lib/multiprocessing/util.py b/Lib/multiprocessing/util.py index 8a6aede162..bc2d65639e 100644 --- a/Lib/multiprocessing/util.py +++ b/Lib/multiprocessing/util.py @@ -235,6 +235,12 @@ def _run_finalizers(minpriority=None): Finalizers with highest priority are called first; finalizers with the same priority will be called in reverse order of creation. ''' + if _finalizer_registry is None: + # This function may be called after this module's globals are + # destroyed. See the _exit_function function in this module for more + # notes. + return + if minpriority is None: f = lambda p : p[0][0] is not None else: @@ -266,7 +272,13 @@ def is_exiting(): _exiting = False -def _exit_function(): +def _exit_function(info=info, debug=debug, _run_finalizers=_run_finalizers, + active_children=active_children, + current_process=current_process): + # We hold on to references to functions in the arglist due to the + # situation described below, where this function is called after this + # module's globals are destroyed. + global _exiting if not _exiting: @@ -276,14 +288,25 @@ def _exit_function(): debug('running all "atexit" finalizers with priority >= 0') _run_finalizers(0) - for p in active_children(): - if p._daemonic: - info('calling terminate() for daemon %s', p.name) - p._popen.terminate() - - for p in active_children(): - info('calling join() for process %s', p.name) - p.join() + if current_process() is not None: + # We check if the current process is None here because if + # it's None, any call to ``active_children()`` will throw an + # AttributeError (active_children winds up trying to get + # attributes from util._current_process). This happens in a + # variety of shutdown circumstances that are not well-understood + # because module-scope variables are not apparently supposed to + # be destroyed until after this function is called. However, + # they are indeed destroyed before this function is called. See + # issues #9775 and #15881. Also related: #4106, #9205, and #9207. + + for p in active_children(): + if p._daemonic: + info('calling terminate() for daemon %s', p.name) + p._popen.terminate() + + for p in active_children(): + info('calling join() for process %s', p.name) + p.join() debug('running the remaining "atexit" finalizers') _run_finalizers() -- cgit v1.2.1 From aee340b1832d04c951c2397cbdf3340d0c9d371d Mon Sep 17 00:00:00 2001 From: Alexander Belopolsky Date: Sun, 9 Sep 2012 13:25:06 -0400 Subject: Fixed whitespace --- Lib/multiprocessing/util.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'Lib/multiprocessing') diff --git a/Lib/multiprocessing/util.py b/Lib/multiprocessing/util.py index bc2d65639e..3bb954670f 100644 --- a/Lib/multiprocessing/util.py +++ b/Lib/multiprocessing/util.py @@ -240,7 +240,7 @@ def _run_finalizers(minpriority=None): # destroyed. See the _exit_function function in this module for more # notes. return - + if minpriority is None: f = lambda p : p[0][0] is not None else: -- cgit v1.2.1 From 5a2ecf2e6eb27484e3fc32525ead22f28babdf9b Mon Sep 17 00:00:00 2001 From: Richard Oudkerk Date: Thu, 13 Sep 2012 17:27:15 +0100 Subject: Issue #15881: Clarify comment in exit function --- Lib/multiprocessing/util.py | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) (limited to 'Lib/multiprocessing') diff --git a/Lib/multiprocessing/util.py b/Lib/multiprocessing/util.py index 3bb954670f..7495813c9a 100644 --- a/Lib/multiprocessing/util.py +++ b/Lib/multiprocessing/util.py @@ -290,14 +290,17 @@ def _exit_function(info=info, debug=debug, _run_finalizers=_run_finalizers, if current_process() is not None: # We check if the current process is None here because if - # it's None, any call to ``active_children()`` will throw an - # AttributeError (active_children winds up trying to get - # attributes from util._current_process). This happens in a - # variety of shutdown circumstances that are not well-understood - # because module-scope variables are not apparently supposed to - # be destroyed until after this function is called. However, - # they are indeed destroyed before this function is called. See - # issues #9775 and #15881. Also related: #4106, #9205, and #9207. + # it's None, any call to ``active_children()`` will throw + # an AttributeError (active_children winds up trying to + # get attributes from util._current_process). One + # situation where this can happen is if someone has + # manipulated sys.modules, causing this module to be + # garbage collected. The destructor for the module type + # then replaces all values in the module dict with None. + # For instance, after setuptools runs a test it replaces + # sys.modules with a copy created earlier. See issues + # #9775 and #15881. Also related: #4106, #9205, and + # #9207. for p in active_children(): if p._daemonic: -- cgit v1.2.1 From fdf06d04a28428ee85f4c55185e6adcbfe31dc37 Mon Sep 17 00:00:00 2001 From: Benjamin Peterson Date: Tue, 25 Sep 2012 12:45:42 -0400 Subject: raise a ValueError instead of an AssertionError when pool is an invalid state --- Lib/multiprocessing/pool.py | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) (limited to 'Lib/multiprocessing') diff --git a/Lib/multiprocessing/pool.py b/Lib/multiprocessing/pool.py index 9e07e32de6..ec57939c45 100644 --- a/Lib/multiprocessing/pool.py +++ b/Lib/multiprocessing/pool.py @@ -225,7 +225,6 @@ class Pool(object): Apply `func` to each element in `iterable`, collecting the results in a list that is returned. ''' - assert self._state == RUN return self._map_async(func, iterable, mapstar, chunksize).get() def starmap(self, func, iterable, chunksize=None): @@ -234,7 +233,6 @@ class Pool(object): be iterables as well and will be unpacked as arguments. Hence `func` and (a, b) becomes func(a, b). ''' - assert self._state == RUN return self._map_async(func, iterable, starmapstar, chunksize).get() def starmap_async(self, func, iterable, chunksize=None, callback=None, @@ -242,7 +240,6 @@ class Pool(object): ''' Asynchronous version of `starmap()` method. ''' - assert self._state == RUN return self._map_async(func, iterable, starmapstar, chunksize, callback, error_callback) @@ -250,7 +247,8 @@ class Pool(object): ''' Equivalent of `map()` -- can be MUCH slower than `Pool.map()`. ''' - assert self._state == RUN + if self._state != RUN: + raise ValueError("Pool not running") if chunksize == 1: result = IMapIterator(self._cache) self._taskqueue.put((((result._job, i, func, (x,), {}) @@ -268,7 +266,8 @@ class Pool(object): ''' Like `imap()` method but ordering of results is arbitrary. ''' - assert self._state == RUN + if self._state != RUN: + raise ValueError("Pool not running") if chunksize == 1: result = IMapUnorderedIterator(self._cache) self._taskqueue.put((((result._job, i, func, (x,), {}) @@ -287,7 +286,8 @@ class Pool(object): ''' Asynchronous version of `apply()` method. ''' - assert self._state == RUN + if self._state != RUN: + raise ValueError("Pool not running") result = ApplyResult(self._cache, callback, error_callback) self._taskqueue.put(([(result._job, None, func, args, kwds)], None)) return result @@ -297,7 +297,6 @@ class Pool(object): ''' Asynchronous version of `map()` method. ''' - assert self._state == RUN return self._map_async(func, iterable, mapstar, chunksize) def _map_async(self, func, iterable, mapper, chunksize=None, callback=None, @@ -305,6 +304,8 @@ class Pool(object): ''' Helper function to implement map, starmap and their async counterparts. ''' + if self._state != RUN: + raise ValueError("Pool not running") if not hasattr(iterable, '__len__'): iterable = list(iterable) -- cgit v1.2.1 From 2945c88477899ec41e5bf3521471b0fe31e188fd Mon Sep 17 00:00:00 2001 From: Hynek Schlawack Date: Sat, 27 Oct 2012 12:53:02 +0200 Subject: #16307: Fix multiprocessing.Pool.map_async not calling its callbacks Patch by Janne Karila. --- Lib/multiprocessing/pool.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'Lib/multiprocessing') diff --git a/Lib/multiprocessing/pool.py b/Lib/multiprocessing/pool.py index ec57939c45..7f73b441c2 100644 --- a/Lib/multiprocessing/pool.py +++ b/Lib/multiprocessing/pool.py @@ -297,7 +297,8 @@ class Pool(object): ''' Asynchronous version of `map()` method. ''' - return self._map_async(func, iterable, mapstar, chunksize) + return self._map_async(func, iterable, mapstar, chunksize, callback, + error_callback) def _map_async(self, func, iterable, mapper, chunksize=None, callback=None, error_callback=None): -- cgit v1.2.1 From 60aed913ddb0168f45b780fdc4828e68d7875a1e Mon Sep 17 00:00:00 2001 From: Richard Oudkerk Date: Thu, 15 Nov 2012 18:16:35 +0000 Subject: Issue #16481: multiprocessing no longer leaks process handles on Windows. --- Lib/multiprocessing/forking.py | 1 + 1 file changed, 1 insertion(+) (limited to 'Lib/multiprocessing') diff --git a/Lib/multiprocessing/forking.py b/Lib/multiprocessing/forking.py index af6580dc5d..c5501a2f75 100644 --- a/Lib/multiprocessing/forking.py +++ b/Lib/multiprocessing/forking.py @@ -233,6 +233,7 @@ else: self.returncode = None self._handle = hp self.sentinel = int(hp) + util.Finalize(self, _winapi.CloseHandle, (self.sentinel,)) # send information to child Popen._tls.process_handle = int(hp) -- cgit v1.2.1