diff options
Diffstat (limited to 'Lib/multiprocessing/reduction.py')
-rw-r--r-- | Lib/multiprocessing/reduction.py | 407 |
1 files changed, 236 insertions, 171 deletions
diff --git a/Lib/multiprocessing/reduction.py b/Lib/multiprocessing/reduction.py index 6e5e5bc9de..656fa8ff6b 100644 --- a/Lib/multiprocessing/reduction.py +++ b/Lib/multiprocessing/reduction.py @@ -5,53 +5,29 @@ # multiprocessing/reduction.py # # Copyright (c) 2006-2008, R Oudkerk -# All rights reserved. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions -# are met: -# -# 1. Redistributions of source code must retain the above copyright -# notice, this list of conditions and the following disclaimer. -# 2. Redistributions in binary form must reproduce the above copyright -# notice, this list of conditions and the following disclaimer in the -# documentation and/or other materials provided with the distribution. -# 3. Neither the name of author nor the names of any contributors may be -# used to endorse or promote products derived from this software -# without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND -# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE -# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE -# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE -# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL -# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS -# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) -# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT -# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY -# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF -# SUCH DAMAGE. +# Licensed to PSF under a Contributor Agreement. # -__all__ = [] +__all__ = ['reduce_socket', 'reduce_connection', 'send_handle', 'recv_handle'] import os import sys import socket import threading +import struct +import signal -import _multiprocessing from multiprocessing import current_process -from multiprocessing.forking import Popen, duplicate, close, ForkingPickler from multiprocessing.util import register_after_fork, debug, sub_debug -from multiprocessing.connection import Client, Listener +from multiprocessing.util import is_exiting, sub_warning # # # -if not(sys.platform == 'win32' or hasattr(_multiprocessing, 'recvfd')): +if not(sys.platform == 'win32' or (hasattr(socket, 'CMSG_LEN') and + hasattr(socket, 'SCM_RIGHTS'))): raise ImportError('pickling of connections not supported') # @@ -59,157 +35,246 @@ if not(sys.platform == 'win32' or hasattr(_multiprocessing, 'recvfd')): # if sys.platform == 'win32': - import _subprocess - from _multiprocessing import win32 - - def send_handle(conn, handle, destination_pid): - process_handle = win32.OpenProcess( - win32.PROCESS_ALL_ACCESS, False, destination_pid - ) - try: - new_handle = duplicate(handle, process_handle) - conn.send(new_handle) - finally: - close(process_handle) - - def recv_handle(conn): - return conn.recv() + # Windows + __all__ += ['reduce_pipe_connection'] + import _winapi -else: def send_handle(conn, handle, destination_pid): - _multiprocessing.sendfd(conn.fileno(), handle) + dh = DupHandle(handle, _winapi.DUPLICATE_SAME_ACCESS, destination_pid) + conn.send(dh) def recv_handle(conn): - return _multiprocessing.recvfd(conn.fileno()) - -# -# Support for a per-process server thread which caches pickled handles -# - -_cache = set() - -def _reset(obj): - global _lock, _listener, _cache - for h in _cache: - close(h) - _cache.clear() - _lock = threading.Lock() - _listener = None - -_reset(None) -register_after_fork(_reset, _reset) - -def _get_listener(): - global _listener - - if _listener is None: - _lock.acquire() - try: - if _listener is None: - debug('starting listener and thread for sending handles') - _listener = Listener(authkey=current_process().authkey) - t = threading.Thread(target=_serve) - t.daemon = True - t.start() - finally: - _lock.release() - - return _listener - -def _serve(): - from .util import is_exiting, sub_warning - - while 1: - try: - conn = _listener.accept() - handle_wanted, destination_pid = conn.recv() - _cache.remove(handle_wanted) - send_handle(conn, handle_wanted, destination_pid) - close(handle_wanted) - conn.close() - except: - if not is_exiting(): - import traceback - sub_warning( - 'thread for sharing handles raised exception :\n' + - '-'*79 + '\n' + traceback.format_exc() + '-'*79 - ) - -# -# Functions to be used for pickling/unpickling objects with handles -# - -def reduce_handle(handle): - if Popen.thread_is_spawning(): - return (None, Popen.duplicate_for_child(handle), True) - dup_handle = duplicate(handle) - _cache.add(dup_handle) - sub_debug('reducing handle %d', handle) - return (_get_listener().address, dup_handle, False) - -def rebuild_handle(pickled_data): - address, handle, inherited = pickled_data - if inherited: - return handle - sub_debug('rebuilding handle %d', handle) - conn = Client(address, authkey=current_process().authkey) - conn.send((handle, os.getpid())) - new_handle = recv_handle(conn) - conn.close() - return new_handle + return conn.recv().detach() + + class DupHandle(object): + def __init__(self, handle, access, pid=None): + # duplicate handle for process with given pid + if pid is None: + pid = os.getpid() + proc = _winapi.OpenProcess(_winapi.PROCESS_DUP_HANDLE, False, pid) + try: + self._handle = _winapi.DuplicateHandle( + _winapi.GetCurrentProcess(), + handle, proc, access, False, 0) + finally: + _winapi.CloseHandle(proc) + self._access = access + self._pid = pid + + def detach(self): + # retrieve handle from process which currently owns it + if self._pid == os.getpid(): + return self._handle + proc = _winapi.OpenProcess(_winapi.PROCESS_DUP_HANDLE, False, + self._pid) + try: + return _winapi.DuplicateHandle( + proc, self._handle, _winapi.GetCurrentProcess(), + self._access, False, _winapi.DUPLICATE_CLOSE_SOURCE) + finally: + _winapi.CloseHandle(proc) + + class DupSocket(object): + def __init__(self, sock): + new_sock = sock.dup() + def send(conn, pid): + share = new_sock.share(pid) + conn.send_bytes(share) + self._id = resource_sharer.register(send, new_sock.close) + + def detach(self): + conn = resource_sharer.get_connection(self._id) + try: + share = conn.recv_bytes() + return socket.fromshare(share) + finally: + conn.close() + + def reduce_socket(s): + return rebuild_socket, (DupSocket(s),) + + def rebuild_socket(ds): + return ds.detach() + + def reduce_connection(conn): + handle = conn.fileno() + with socket.fromfd(handle, socket.AF_INET, socket.SOCK_STREAM) as s: + ds = DupSocket(s) + return rebuild_connection, (ds, conn.readable, conn.writable) + + def rebuild_connection(ds, readable, writable): + from .connection import Connection + sock = ds.detach() + return Connection(sock.detach(), readable, writable) -# -# Register `_multiprocessing.Connection` with `ForkingPickler` -# - -def reduce_connection(conn): - rh = reduce_handle(conn.fileno()) - return rebuild_connection, (rh, conn.readable, conn.writable) - -def rebuild_connection(reduced_handle, readable, writable): - handle = rebuild_handle(reduced_handle) - return _multiprocessing.Connection( - handle, readable=readable, writable=writable - ) - -ForkingPickler.register(_multiprocessing.Connection, reduce_connection) + def reduce_pipe_connection(conn): + access = ((_winapi.FILE_GENERIC_READ if conn.readable else 0) | + (_winapi.FILE_GENERIC_WRITE if conn.writable else 0)) + dh = DupHandle(conn.fileno(), access) + return rebuild_pipe_connection, (dh, conn.readable, conn.writable) -# -# Register `socket.socket` with `ForkingPickler` -# + def rebuild_pipe_connection(dh, readable, writable): + from .connection import PipeConnection + handle = dh.detach() + return PipeConnection(handle, readable, writable) -def fromfd(fd, family, type_, proto=0): - s = socket.fromfd(fd, family, type_, proto) - if s.__class__ is not socket.socket: - s = socket.socket(_sock=s) - return s +else: + # Unix -def reduce_socket(s): - reduced_handle = reduce_handle(s.fileno()) - return rebuild_socket, (reduced_handle, s.family, s.type, s.proto) + # On MacOSX we should acknowledge receipt of fds -- see Issue14669 + ACKNOWLEDGE = sys.platform == 'darwin' -def rebuild_socket(reduced_handle, family, type_, proto): - fd = rebuild_handle(reduced_handle) - _sock = fromfd(fd, family, type_, proto) - close(fd) - return _sock + def send_handle(conn, handle, destination_pid): + with socket.fromfd(conn.fileno(), socket.AF_UNIX, socket.SOCK_STREAM) as s: + s.sendmsg([b'x'], [(socket.SOL_SOCKET, socket.SCM_RIGHTS, + struct.pack("@i", handle))]) + if ACKNOWLEDGE and conn.recv_bytes() != b'ACK': + raise RuntimeError('did not receive acknowledgement of fd') -ForkingPickler.register(socket.socket, reduce_socket) + def recv_handle(conn): + size = struct.calcsize("@i") + with socket.fromfd(conn.fileno(), socket.AF_UNIX, socket.SOCK_STREAM) as s: + msg, ancdata, flags, addr = s.recvmsg(1, socket.CMSG_LEN(size)) + try: + if ACKNOWLEDGE: + conn.send_bytes(b'ACK') + cmsg_level, cmsg_type, cmsg_data = ancdata[0] + if (cmsg_level == socket.SOL_SOCKET and + cmsg_type == socket.SCM_RIGHTS): + return struct.unpack("@i", cmsg_data[:size])[0] + except (ValueError, IndexError, struct.error): + pass + raise RuntimeError('Invalid data received') + + class DupFd(object): + def __init__(self, fd): + new_fd = os.dup(fd) + def send(conn, pid): + send_handle(conn, new_fd, pid) + def close(): + os.close(new_fd) + self._id = resource_sharer.register(send, close) + + def detach(self): + conn = resource_sharer.get_connection(self._id) + try: + return recv_handle(conn) + finally: + conn.close() + + def reduce_socket(s): + df = DupFd(s.fileno()) + return rebuild_socket, (df, s.family, s.type, s.proto) + + def rebuild_socket(df, family, type, proto): + fd = df.detach() + s = socket.fromfd(fd, family, type, proto) + os.close(fd) + return s + + def reduce_connection(conn): + df = DupFd(conn.fileno()) + return rebuild_connection, (df, conn.readable, conn.writable) + + def rebuild_connection(df, readable, writable): + from .connection import Connection + fd = df.detach() + return Connection(fd, readable, writable) # -# Register `_multiprocessing.PipeConnection` with `ForkingPickler` +# Server which shares registered resources with clients # -if sys.platform == 'win32': - - def reduce_pipe_connection(conn): - rh = reduce_handle(conn.fileno()) - return rebuild_pipe_connection, (rh, conn.readable, conn.writable) - - def rebuild_pipe_connection(reduced_handle, readable, writable): - handle = rebuild_handle(reduced_handle) - return _multiprocessing.PipeConnection( - handle, readable=readable, writable=writable - ) - - ForkingPickler.register(_multiprocessing.PipeConnection, reduce_pipe_connection) +class ResourceSharer(object): + def __init__(self): + self._key = 0 + self._cache = {} + self._old_locks = [] + self._lock = threading.Lock() + self._listener = None + self._address = None + self._thread = None + register_after_fork(self, ResourceSharer._afterfork) + + def register(self, send, close): + with self._lock: + if self._address is None: + self._start() + self._key += 1 + self._cache[self._key] = (send, close) + return (self._address, self._key) + + @staticmethod + def get_connection(ident): + from .connection import Client + address, key = ident + c = Client(address, authkey=current_process().authkey) + c.send((key, os.getpid())) + return c + + def stop(self, timeout=None): + from .connection import Client + with self._lock: + if self._address is not None: + c = Client(self._address, authkey=current_process().authkey) + c.send(None) + c.close() + self._thread.join(timeout) + if self._thread.is_alive(): + sub_warn('ResourceSharer thread did not stop when asked') + self._listener.close() + self._thread = None + self._address = None + self._listener = None + for key, (send, close) in self._cache.items(): + close() + self._cache.clear() + + def _afterfork(self): + for key, (send, close) in self._cache.items(): + close() + self._cache.clear() + # If self._lock was locked at the time of the fork, it may be broken + # -- see issue 6721. Replace it without letting it be gc'ed. + self._old_locks.append(self._lock) + self._lock = threading.Lock() + if self._listener is not None: + self._listener.close() + self._listener = None + self._address = None + self._thread = None + + def _start(self): + from .connection import Listener + assert self._listener is None + debug('starting listener and thread for sending handles') + self._listener = Listener(authkey=current_process().authkey) + self._address = self._listener.address + t = threading.Thread(target=self._serve) + t.daemon = True + t.start() + self._thread = t + + def _serve(self): + if hasattr(signal, 'pthread_sigmask'): + signal.pthread_sigmask(signal.SIG_BLOCK, range(1, signal.NSIG)) + while 1: + try: + conn = self._listener.accept() + msg = conn.recv() + if msg is None: + break + key, destination_pid = msg + send, close = self._cache.pop(key) + send(conn, destination_pid) + close() + conn.close() + except: + if not is_exiting(): + import traceback + sub_warning( + 'thread for sharing handles raised exception :\n' + + '-'*79 + '\n' + traceback.format_exc() + '-'*79 + ) + +resource_sharer = ResourceSharer() |