diff options
Diffstat (limited to 'Lib/multiprocessing/connection.py')
-rw-r--r-- | Lib/multiprocessing/connection.py | 616 |
1 files changed, 524 insertions, 92 deletions
diff --git a/Lib/multiprocessing/connection.py b/Lib/multiprocessing/connection.py index 8bb0a3b0d8..64d71bc361 100644 --- a/Lib/multiprocessing/connection.py +++ b/Lib/multiprocessing/connection.py @@ -32,21 +32,32 @@ # SUCH DAMAGE. # -__all__ = [ 'Client', 'Listener', 'Pipe' ] +__all__ = [ 'Client', 'Listener', 'Pipe', 'wait' ] +import io import os import sys +import pickle +import select import socket +import struct import errno import time import tempfile import itertools import _multiprocessing -from multiprocessing import current_process, AuthenticationError -from multiprocessing.util import get_temp_dir, Finalize, sub_debug, debug -from multiprocessing.forking import duplicate, close - +from multiprocessing import current_process, AuthenticationError, BufferTooShort +from multiprocessing.util import ( + get_temp_dir, Finalize, sub_debug, debug, _eintr_retry) +from multiprocessing.forking import ForkingPickler +try: + import _winapi + from _winapi import WAIT_OBJECT_0, WAIT_TIMEOUT, INFINITE +except ImportError: + if sys.platform == 'win32': + raise + _winapi = None # # @@ -122,6 +133,307 @@ def address_type(address): raise ValueError('address type of %r unrecognized' % address) # +# Connection classes +# + +class _ConnectionBase: + _handle = None + + def __init__(self, handle, readable=True, writable=True): + handle = handle.__index__() + if handle < 0: + raise ValueError("invalid handle") + if not readable and not writable: + raise ValueError( + "at least one of `readable` and `writable` must be True") + self._handle = handle + self._readable = readable + self._writable = writable + + # XXX should we use util.Finalize instead of a __del__? + + def __del__(self): + if self._handle is not None: + self._close() + + def _check_closed(self): + if self._handle is None: + raise IOError("handle is closed") + + def _check_readable(self): + if not self._readable: + raise IOError("connection is write-only") + + def _check_writable(self): + if not self._writable: + raise IOError("connection is read-only") + + def _bad_message_length(self): + if self._writable: + self._readable = False + else: + self.close() + raise IOError("bad message length") + + @property + def closed(self): + """True if the connection is closed""" + return self._handle is None + + @property + def readable(self): + """True if the connection is readable""" + return self._readable + + @property + def writable(self): + """True if the connection is writable""" + return self._writable + + def fileno(self): + """File descriptor or handle of the connection""" + self._check_closed() + return self._handle + + def close(self): + """Close the connection""" + if self._handle is not None: + try: + self._close() + finally: + self._handle = None + + def send_bytes(self, buf, offset=0, size=None): + """Send the bytes data from a bytes-like object""" + self._check_closed() + self._check_writable() + m = memoryview(buf) + # HACK for byte-indexing of non-bytewise buffers (e.g. array.array) + if m.itemsize > 1: + m = memoryview(bytes(m)) + n = len(m) + if offset < 0: + raise ValueError("offset is negative") + if n < offset: + raise ValueError("buffer length < offset") + if size is None: + size = n - offset + elif size < 0: + raise ValueError("size is negative") + elif offset + size > n: + raise ValueError("buffer length < offset + size") + self._send_bytes(m[offset:offset + size]) + + def send(self, obj): + """Send a (picklable) object""" + self._check_closed() + self._check_writable() + buf = io.BytesIO() + ForkingPickler(buf, pickle.HIGHEST_PROTOCOL).dump(obj) + self._send_bytes(buf.getbuffer()) + + def recv_bytes(self, maxlength=None): + """ + Receive bytes data as a bytes object. + """ + self._check_closed() + self._check_readable() + if maxlength is not None and maxlength < 0: + raise ValueError("negative maxlength") + buf = self._recv_bytes(maxlength) + if buf is None: + self._bad_message_length() + return buf.getvalue() + + def recv_bytes_into(self, buf, offset=0): + """ + Receive bytes data into a writeable buffer-like object. + Return the number of bytes read. + """ + self._check_closed() + self._check_readable() + with memoryview(buf) as m: + # Get bytesize of arbitrary buffer + itemsize = m.itemsize + bytesize = itemsize * len(m) + if offset < 0: + raise ValueError("negative offset") + elif offset > bytesize: + raise ValueError("offset too large") + result = self._recv_bytes() + size = result.tell() + if bytesize < offset + size: + raise BufferTooShort(result.getvalue()) + # Message can fit in dest + result.seek(0) + result.readinto(m[offset // itemsize : + (offset + size) // itemsize]) + return size + + def recv(self): + """Receive a (picklable) object""" + self._check_closed() + self._check_readable() + buf = self._recv_bytes() + return pickle.loads(buf.getbuffer()) + + def poll(self, timeout=0.0): + """Whether there is any input available to be read""" + self._check_closed() + self._check_readable() + return self._poll(timeout) + + +if _winapi: + + class PipeConnection(_ConnectionBase): + """ + Connection class based on a Windows named pipe. + Overlapped I/O is used, so the handles must have been created + with FILE_FLAG_OVERLAPPED. + """ + _got_empty_message = False + + def _close(self, _CloseHandle=_winapi.CloseHandle): + _CloseHandle(self._handle) + + def _send_bytes(self, buf): + ov, err = _winapi.WriteFile(self._handle, buf, overlapped=True) + try: + if err == _winapi.ERROR_IO_PENDING: + waitres = _winapi.WaitForMultipleObjects( + [ov.event], False, INFINITE) + assert waitres == WAIT_OBJECT_0 + except: + ov.cancel() + raise + finally: + nwritten, err = ov.GetOverlappedResult(True) + assert err == 0 + assert nwritten == len(buf) + + def _recv_bytes(self, maxsize=None): + if self._got_empty_message: + self._got_empty_message = False + return io.BytesIO() + else: + bsize = 128 if maxsize is None else min(maxsize, 128) + try: + ov, err = _winapi.ReadFile(self._handle, bsize, + overlapped=True) + try: + if err == _winapi.ERROR_IO_PENDING: + waitres = _winapi.WaitForMultipleObjects( + [ov.event], False, INFINITE) + assert waitres == WAIT_OBJECT_0 + except: + ov.cancel() + raise + finally: + nread, err = ov.GetOverlappedResult(True) + if err == 0: + f = io.BytesIO() + f.write(ov.getbuffer()) + return f + elif err == _winapi.ERROR_MORE_DATA: + return self._get_more_data(ov, maxsize) + except IOError as e: + if e.winerror == _winapi.ERROR_BROKEN_PIPE: + raise EOFError + else: + raise + raise RuntimeError("shouldn't get here; expected KeyboardInterrupt") + + def _poll(self, timeout): + if (self._got_empty_message or + _winapi.PeekNamedPipe(self._handle)[0] != 0): + return True + if timeout < 0: + timeout = None + return bool(wait([self], timeout)) + + def _get_more_data(self, ov, maxsize): + buf = ov.getbuffer() + f = io.BytesIO() + f.write(buf) + left = _winapi.PeekNamedPipe(self._handle)[1] + assert left > 0 + if maxsize is not None and len(buf) + left > maxsize: + self._bad_message_length() + ov, err = _winapi.ReadFile(self._handle, left, overlapped=True) + rbytes, err = ov.GetOverlappedResult(True) + assert err == 0 + assert rbytes == left + f.write(ov.getbuffer()) + return f + + +class Connection(_ConnectionBase): + """ + Connection class based on an arbitrary file descriptor (Unix only), or + a socket handle (Windows). + """ + + if _winapi: + def _close(self, _close=_multiprocessing.closesocket): + _close(self._handle) + _write = _multiprocessing.send + _read = _multiprocessing.recv + else: + def _close(self, _close=os.close): + _close(self._handle) + _write = os.write + _read = os.read + + def _send(self, buf, write=_write): + remaining = len(buf) + while True: + n = write(self._handle, buf) + remaining -= n + if remaining == 0: + break + buf = buf[n:] + + def _recv(self, size, read=_read): + buf = io.BytesIO() + handle = self._handle + remaining = size + while remaining > 0: + chunk = read(handle, remaining) + n = len(chunk) + if n == 0: + if remaining == size: + raise EOFError + else: + raise IOError("got end of file during message") + buf.write(chunk) + remaining -= n + return buf + + def _send_bytes(self, buf): + # For wire compatibility with 3.2 and lower + n = len(buf) + self._send(struct.pack("!i", n)) + # The condition is necessary to avoid "broken pipe" errors + # when sending a 0-length buffer if the other end closed the pipe. + if n > 0: + self._send(buf) + + def _recv_bytes(self, maxsize=None): + buf = self._recv(4) + size, = struct.unpack("!i", buf.getvalue()) + if maxsize is not None and size > maxsize: + return None + return self._recv(size) + + def _poll(self, timeout): + if timeout < 0.0: + timeout = None + r = wait([self._handle], timeout) + return bool(r) + + +# # Public functions # @@ -199,56 +511,52 @@ if sys.platform != 'win32': ''' if duplex: s1, s2 = socket.socketpair() - c1 = _multiprocessing.Connection(os.dup(s1.fileno())) - c2 = _multiprocessing.Connection(os.dup(s2.fileno())) - s1.close() - s2.close() + c1 = Connection(s1.detach()) + c2 = Connection(s2.detach()) else: fd1, fd2 = os.pipe() - c1 = _multiprocessing.Connection(fd1, writable=False) - c2 = _multiprocessing.Connection(fd2, readable=False) + c1 = Connection(fd1, writable=False) + c2 = Connection(fd2, readable=False) return c1, c2 else: - from _multiprocessing import win32 - def Pipe(duplex=True): ''' Returns pair of connection objects at either end of a pipe ''' address = arbitrary_address('AF_PIPE') if duplex: - openmode = win32.PIPE_ACCESS_DUPLEX - access = win32.GENERIC_READ | win32.GENERIC_WRITE + openmode = _winapi.PIPE_ACCESS_DUPLEX + access = _winapi.GENERIC_READ | _winapi.GENERIC_WRITE obsize, ibsize = BUFSIZE, BUFSIZE else: - openmode = win32.PIPE_ACCESS_INBOUND - access = win32.GENERIC_WRITE + openmode = _winapi.PIPE_ACCESS_INBOUND + access = _winapi.GENERIC_WRITE obsize, ibsize = 0, BUFSIZE - h1 = win32.CreateNamedPipe( - address, openmode, - win32.PIPE_TYPE_MESSAGE | win32.PIPE_READMODE_MESSAGE | - win32.PIPE_WAIT, - 1, obsize, ibsize, win32.NMPWAIT_WAIT_FOREVER, win32.NULL + h1 = _winapi.CreateNamedPipe( + address, openmode | _winapi.FILE_FLAG_OVERLAPPED | + _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE, + _winapi.PIPE_TYPE_MESSAGE | _winapi.PIPE_READMODE_MESSAGE | + _winapi.PIPE_WAIT, + 1, obsize, ibsize, _winapi.NMPWAIT_WAIT_FOREVER, _winapi.NULL ) - h2 = win32.CreateFile( - address, access, 0, win32.NULL, win32.OPEN_EXISTING, 0, win32.NULL + h2 = _winapi.CreateFile( + address, access, 0, _winapi.NULL, _winapi.OPEN_EXISTING, + _winapi.FILE_FLAG_OVERLAPPED, _winapi.NULL ) - win32.SetNamedPipeHandleState( - h2, win32.PIPE_READMODE_MESSAGE, None, None + _winapi.SetNamedPipeHandleState( + h2, _winapi.PIPE_READMODE_MESSAGE, None, None ) - try: - win32.ConnectNamedPipe(h1, win32.NULL) - except WindowsError as e: - if e.args[0] != win32.ERROR_PIPE_CONNECTED: - raise + overlapped = _winapi.ConnectNamedPipe(h1, overlapped=True) + _, err = overlapped.GetOverlappedResult(True) + assert err == 0 - c1 = _multiprocessing.PipeConnection(h1, writable=duplex) - c2 = _multiprocessing.PipeConnection(h2, readable=duplex) + c1 = PipeConnection(h1, writable=duplex) + c2 = PipeConnection(h2, readable=duplex) return c1, c2 @@ -263,11 +571,14 @@ class SocketListener(object): def __init__(self, address, family, backlog=1): self._socket = socket.socket(getattr(socket, family)) try: - self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + # SO_REUSEADDR has different semantics on Windows (issue #2550). + if os.name == 'posix': + self._socket.setsockopt(socket.SOL_SOCKET, + socket.SO_REUSEADDR, 1) self._socket.bind(address) self._socket.listen(backlog) self._address = self._socket.getsockname() - except socket.error: + except OSError: self._socket.close() raise self._family = family @@ -282,10 +593,7 @@ class SocketListener(object): def accept(self): s, self._last_accepted = self._socket.accept() - fd = duplicate(s.fileno()) - conn = _multiprocessing.Connection(fd) - s.close() - return conn + return Connection(s.detach()) def close(self): self._socket.close() @@ -299,24 +607,8 @@ def SocketClient(address): ''' family = address_type(address) with socket.socket( getattr(socket, family) ) as s: - t = _init_timeout() - - while 1: - try: - s.connect(address) - except socket.error as e: - if e.args[0] != errno.ECONNREFUSED or _check_timeout(t): - debug('failed to connect to address %s', address) - raise - time.sleep(0.01) - else: - break - else: - raise - - fd = duplicate(s.fileno()) - conn = _multiprocessing.Connection(fd) - return conn + s.connect(address) + return Connection(s.detach()) # # Definitions for connections based on named pipes @@ -330,45 +622,47 @@ if sys.platform == 'win32': ''' def __init__(self, address, backlog=None): self._address = address - handle = win32.CreateNamedPipe( - address, win32.PIPE_ACCESS_DUPLEX, - win32.PIPE_TYPE_MESSAGE | win32.PIPE_READMODE_MESSAGE | - win32.PIPE_WAIT, - win32.PIPE_UNLIMITED_INSTANCES, BUFSIZE, BUFSIZE, - win32.NMPWAIT_WAIT_FOREVER, win32.NULL - ) - self._handle_queue = [handle] - self._last_accepted = None + self._handle_queue = [self._new_handle(first=True)] + self._last_accepted = None sub_debug('listener created with address=%r', self._address) - self.close = Finalize( self, PipeListener._finalize_pipe_listener, args=(self._handle_queue, self._address), exitpriority=0 ) - def accept(self): - newhandle = win32.CreateNamedPipe( - self._address, win32.PIPE_ACCESS_DUPLEX, - win32.PIPE_TYPE_MESSAGE | win32.PIPE_READMODE_MESSAGE | - win32.PIPE_WAIT, - win32.PIPE_UNLIMITED_INSTANCES, BUFSIZE, BUFSIZE, - win32.NMPWAIT_WAIT_FOREVER, win32.NULL + def _new_handle(self, first=False): + flags = _winapi.PIPE_ACCESS_DUPLEX | _winapi.FILE_FLAG_OVERLAPPED + if first: + flags |= _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE + return _winapi.CreateNamedPipe( + self._address, flags, + _winapi.PIPE_TYPE_MESSAGE | _winapi.PIPE_READMODE_MESSAGE | + _winapi.PIPE_WAIT, + _winapi.PIPE_UNLIMITED_INSTANCES, BUFSIZE, BUFSIZE, + _winapi.NMPWAIT_WAIT_FOREVER, _winapi.NULL ) - self._handle_queue.append(newhandle) + + def accept(self): + self._handle_queue.append(self._new_handle()) handle = self._handle_queue.pop(0) + ov = _winapi.ConnectNamedPipe(handle, overlapped=True) try: - win32.ConnectNamedPipe(handle, win32.NULL) - except WindowsError as e: - if e.args[0] != win32.ERROR_PIPE_CONNECTED: - raise - return _multiprocessing.PipeConnection(handle) + res = _winapi.WaitForMultipleObjects([ov.event], False, INFINITE) + except: + ov.cancel() + _winapi.CloseHandle(handle) + raise + finally: + _, err = ov.GetOverlappedResult(True) + assert err == 0 + return PipeConnection(handle) @staticmethod def _finalize_pipe_listener(queue, address): sub_debug('closing listener with address=%r', address) for handle in queue: - close(handle) + _winapi.CloseHandle(handle) def PipeClient(address): ''' @@ -377,24 +671,25 @@ if sys.platform == 'win32': t = _init_timeout() while 1: try: - win32.WaitNamedPipe(address, 1000) - h = win32.CreateFile( - address, win32.GENERIC_READ | win32.GENERIC_WRITE, - 0, win32.NULL, win32.OPEN_EXISTING, 0, win32.NULL + _winapi.WaitNamedPipe(address, 1000) + h = _winapi.CreateFile( + address, _winapi.GENERIC_READ | _winapi.GENERIC_WRITE, + 0, _winapi.NULL, _winapi.OPEN_EXISTING, + _winapi.FILE_FLAG_OVERLAPPED, _winapi.NULL ) except WindowsError as e: - if e.args[0] not in (win32.ERROR_SEM_TIMEOUT, - win32.ERROR_PIPE_BUSY) or _check_timeout(t): + if e.winerror not in (_winapi.ERROR_SEM_TIMEOUT, + _winapi.ERROR_PIPE_BUSY) or _check_timeout(t): raise else: break else: raise - win32.SetNamedPipeHandleState( - h, win32.PIPE_READMODE_MESSAGE, None, None + _winapi.SetNamedPipeHandleState( + h, _winapi.PIPE_READMODE_MESSAGE, None, None ) - return _multiprocessing.PipeConnection(h) + return PipeConnection(h) # # Authentication stuff @@ -451,10 +746,10 @@ class ConnectionWrapper(object): return self._loads(s) def _xml_dumps(obj): - return xmlrpclib.dumps((obj,), None, None, None, 1).encode('utf8') + return xmlrpclib.dumps((obj,), None, None, None, 1).encode('utf-8') def _xml_loads(s): - (obj,), method = xmlrpclib.loads(s.decode('utf8')) + (obj,), method = xmlrpclib.loads(s.decode('utf-8')) return obj class XmlListener(Listener): @@ -468,3 +763,140 @@ def XmlClient(*args, **kwds): global xmlrpclib import xmlrpc.client as xmlrpclib return ConnectionWrapper(Client(*args, **kwds), _xml_dumps, _xml_loads) + +# +# Wait +# + +if sys.platform == 'win32': + + def _exhaustive_wait(handles, timeout): + # Return ALL handles which are currently signalled. (Only + # returning the first signalled might create starvation issues.) + L = list(handles) + ready = [] + while L: + res = _winapi.WaitForMultipleObjects(L, False, timeout) + if res == WAIT_TIMEOUT: + break + elif WAIT_OBJECT_0 <= res < WAIT_OBJECT_0 + len(L): + res -= WAIT_OBJECT_0 + elif WAIT_ABANDONED_0 <= res < WAIT_ABANDONED_0 + len(L): + res -= WAIT_ABANDONED_0 + else: + raise RuntimeError('Should not get here') + ready.append(L[res]) + L = L[res+1:] + timeout = 0 + return ready + + _ready_errors = {_winapi.ERROR_BROKEN_PIPE, _winapi.ERROR_NETNAME_DELETED} + + def wait(object_list, timeout=None): + ''' + Wait till an object in object_list is ready/readable. + + Returns list of those objects in object_list which are ready/readable. + ''' + if timeout is None: + timeout = INFINITE + elif timeout < 0: + timeout = 0 + else: + timeout = int(timeout * 1000 + 0.5) + + object_list = list(object_list) + waithandle_to_obj = {} + ov_list = [] + ready_objects = set() + ready_handles = set() + + try: + for o in object_list: + try: + fileno = getattr(o, 'fileno') + except AttributeError: + waithandle_to_obj[o.__index__()] = o + else: + # start an overlapped read of length zero + try: + ov, err = _winapi.ReadFile(fileno(), 0, True) + except OSError as e: + err = e.winerror + if err not in _ready_errors: + raise + if err == _winapi.ERROR_IO_PENDING: + ov_list.append(ov) + waithandle_to_obj[ov.event] = o + else: + # If o.fileno() is an overlapped pipe handle and + # err == 0 then there is a zero length message + # in the pipe, but it HAS NOT been consumed. + ready_objects.add(o) + timeout = 0 + + ready_handles = _exhaustive_wait(waithandle_to_obj.keys(), timeout) + finally: + # request that overlapped reads stop + for ov in ov_list: + ov.cancel() + + # wait for all overlapped reads to stop + for ov in ov_list: + try: + _, err = ov.GetOverlappedResult(True) + except OSError as e: + err = e.winerror + if err not in _ready_errors: + raise + if err != _winapi.ERROR_OPERATION_ABORTED: + o = waithandle_to_obj[ov.event] + ready_objects.add(o) + if err == 0: + # If o.fileno() is an overlapped pipe handle then + # a zero length message HAS been consumed. + if hasattr(o, '_got_empty_message'): + o._got_empty_message = True + + ready_objects.update(waithandle_to_obj[h] for h in ready_handles) + return [o for o in object_list if o in ready_objects] + +else: + + def wait(object_list, timeout=None): + ''' + Wait till an object in object_list is ready/readable. + + Returns list of those objects in object_list which are ready/readable. + ''' + if timeout is not None: + if timeout <= 0: + return select.select(object_list, [], [], 0)[0] + else: + deadline = time.time() + timeout + while True: + try: + return select.select(object_list, [], [], timeout)[0] + except OSError as e: + if e.errno != errno.EINTR: + raise + if timeout is not None: + timeout = deadline - time.time() + +# +# Make connection and socket objects sharable if possible +# + +if sys.platform == 'win32': + from . import reduction + ForkingPickler.register(socket.socket, reduction.reduce_socket) + ForkingPickler.register(Connection, reduction.reduce_connection) + ForkingPickler.register(PipeConnection, reduction.reduce_pipe_connection) +else: + try: + from . import reduction + except ImportError: + pass + else: + ForkingPickler.register(socket.socket, reduction.reduce_socket) + ForkingPickler.register(Connection, reduction.reduce_connection) |