summaryrefslogtreecommitdiff
path: root/trollius/unix_events.py
diff options
context:
space:
mode:
Diffstat (limited to 'trollius/unix_events.py')
-rw-r--r--trollius/unix_events.py975
1 files changed, 975 insertions, 0 deletions
diff --git a/trollius/unix_events.py b/trollius/unix_events.py
new file mode 100644
index 0000000..d797441
--- /dev/null
+++ b/trollius/unix_events.py
@@ -0,0 +1,975 @@
+"""Selector event loop for Unix with signal handling."""
+from __future__ import absolute_import
+
+import errno
+import os
+import signal
+import socket
+import stat
+import subprocess
+import sys
+import threading
+if not hasattr(os, 'set_blocking') or not hasattr(os, 'set_inheritable'):
+ import fcntl
+
+
+from . import base_events
+from . import base_subprocess
+from . import compat
+from . import constants
+from . import events
+from . import selector_events
+from . import selectors
+from . import transports
+from .compat import flatten_bytes
+from .coroutines import coroutine, From, Return
+from .log import logger
+from .py33_exceptions import (
+ reraise, wrap_error,
+ BlockingIOError, BrokenPipeError, ConnectionResetError,
+ InterruptedError, ChildProcessError)
+
+
+__all__ = ['SelectorEventLoop',
+ 'AbstractChildWatcher', 'SafeChildWatcher',
+ 'FastChildWatcher', 'DefaultEventLoopPolicy',
+ ]
+
+if sys.platform == 'win32': # pragma: no cover
+ raise ImportError('Signals are not really supported on Windows')
+
+
+if compat.PY33:
+ def _sighandler_noop(signum, frame):
+ """Dummy signal handler."""
+ pass
+
+
+class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
+ """Unix event loop.
+
+ Adds signal handling and UNIX Domain Socket support to SelectorEventLoop.
+ """
+
+ def __init__(self, selector=None):
+ super(_UnixSelectorEventLoop, self).__init__(selector)
+ self._signal_handlers = {}
+
+ def _socketpair(self):
+ return socket.socketpair()
+
+ def close(self):
+ super(_UnixSelectorEventLoop, self).close()
+ for sig in list(self._signal_handlers):
+ self.remove_signal_handler(sig)
+
+ # On Python <= 3.2, the C signal handler of Python writes a null byte into
+ # the wakeup file descriptor. We cannot retrieve the signal numbers from
+ # the file descriptor.
+ if compat.PY33:
+ def _process_self_data(self, data):
+ for signum in data:
+ if not signum:
+ # ignore null bytes written by _write_to_self()
+ continue
+ self._handle_signal(signum)
+
+ def add_signal_handler(self, sig, callback, *args):
+ """Add a handler for a signal. UNIX only.
+
+ Raise ValueError if the signal number is invalid or uncatchable.
+ Raise RuntimeError if there is a problem setting up the handler.
+ """
+ self._check_signal(sig)
+ try:
+ # set_wakeup_fd() raises ValueError if this is not the
+ # main thread. By calling it early we ensure that an
+ # event loop running in another thread cannot add a signal
+ # handler.
+ signal.set_wakeup_fd(self._csock.fileno())
+ except (ValueError, OSError) as exc:
+ raise RuntimeError(str(exc))
+
+ handle = events.Handle(callback, args, self)
+ self._signal_handlers[sig] = handle
+
+ try:
+ if compat.PY33:
+ # On Python 3.3 and newer, the C signal handler writes the
+ # signal number into the wakeup file descriptor and then calls
+ # Py_AddPendingCall() to schedule the Python signal handler.
+ #
+ # Register a dummy signal handler to ask Python to write the
+ # signal number into the wakup file descriptor.
+ # _process_self_data() will read signal numbers from this file
+ # descriptor to handle signals.
+ signal.signal(sig, _sighandler_noop)
+ else:
+ # On Python 3.2 and older, the C signal handler first calls
+ # Py_AddPendingCall() to schedule the Python signal handler,
+ # and then write a null byte into the wakeup file descriptor.
+ signal.signal(sig, self._handle_signal)
+
+ # Set SA_RESTART to limit EINTR occurrences.
+ signal.siginterrupt(sig, False)
+ except (RuntimeError, OSError) as exc:
+ # On Python 2, signal.signal(signal.SIGKILL, signal.SIG_IGN) raises
+ # RuntimeError(22, 'Invalid argument'). On Python 3,
+ # OSError(22, 'Invalid argument') is raised instead.
+ exc_type, exc_value, tb = sys.exc_info()
+
+ del self._signal_handlers[sig]
+ if not self._signal_handlers:
+ try:
+ signal.set_wakeup_fd(-1)
+ except (ValueError, OSError) as nexc:
+ logger.info('set_wakeup_fd(-1) failed: %s', nexc)
+
+ if isinstance(exc, RuntimeError) or exc.errno == errno.EINVAL:
+ raise RuntimeError('sig {0} cannot be caught'.format(sig))
+ else:
+ reraise(exc_type, exc_value, tb)
+
+ def _handle_signal(self, sig, frame=None):
+ """Internal helper that is the actual signal handler."""
+ handle = self._signal_handlers.get(sig)
+ if handle is None:
+ return # Assume it's some race condition.
+ if handle._cancelled:
+ self.remove_signal_handler(sig) # Remove it properly.
+ else:
+ self._add_callback_signalsafe(handle)
+
+ def remove_signal_handler(self, sig):
+ """Remove a handler for a signal. UNIX only.
+
+ Return True if a signal handler was removed, False if not.
+ """
+ self._check_signal(sig)
+ try:
+ del self._signal_handlers[sig]
+ except KeyError:
+ return False
+
+ if sig == signal.SIGINT:
+ handler = signal.default_int_handler
+ else:
+ handler = signal.SIG_DFL
+
+ try:
+ signal.signal(sig, handler)
+ except OSError as exc:
+ if exc.errno == errno.EINVAL:
+ raise RuntimeError('sig {0} cannot be caught'.format(sig))
+ else:
+ raise
+
+ if not self._signal_handlers:
+ try:
+ signal.set_wakeup_fd(-1)
+ except (ValueError, OSError) as exc:
+ logger.info('set_wakeup_fd(-1) failed: %s', exc)
+
+ return True
+
+ def _check_signal(self, sig):
+ """Internal helper to validate a signal.
+
+ Raise ValueError if the signal number is invalid or uncatchable.
+ Raise RuntimeError if there is a problem setting up the handler.
+ """
+ if not isinstance(sig, int):
+ raise TypeError('sig must be an int, not {0!r}'.format(sig))
+
+ if not (1 <= sig < signal.NSIG):
+ raise ValueError(
+ 'sig {0} out of range(1, {1})'.format(sig, signal.NSIG))
+
+ def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
+ extra=None):
+ return _UnixReadPipeTransport(self, pipe, protocol, waiter, extra)
+
+ def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
+ extra=None):
+ return _UnixWritePipeTransport(self, pipe, protocol, waiter, extra)
+
+ @coroutine
+ def _make_subprocess_transport(self, protocol, args, shell,
+ stdin, stdout, stderr, bufsize,
+ extra=None, **kwargs):
+ with events.get_child_watcher() as watcher:
+ transp = _UnixSubprocessTransport(self, protocol, args, shell,
+ stdin, stdout, stderr, bufsize,
+ extra=extra, **kwargs)
+ yield From(transp._post_init())
+ watcher.add_child_handler(transp.get_pid(),
+ self._child_watcher_callback, transp)
+
+ raise Return(transp)
+
+ def _child_watcher_callback(self, pid, returncode, transp):
+ self.call_soon_threadsafe(transp._process_exited, returncode)
+
+ @coroutine
+ def create_unix_connection(self, protocol_factory, path,
+ ssl=None, sock=None,
+ server_hostname=None):
+ assert server_hostname is None or isinstance(server_hostname, str)
+ if ssl:
+ if server_hostname is None:
+ raise ValueError(
+ 'you have to pass server_hostname when using ssl')
+ else:
+ if server_hostname is not None:
+ raise ValueError('server_hostname is only meaningful with ssl')
+
+ if path is not None:
+ if sock is not None:
+ raise ValueError(
+ 'path and sock can not be specified at the same time')
+
+ sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0)
+ try:
+ sock.setblocking(False)
+ yield From(self.sock_connect(sock, path))
+ except:
+ sock.close()
+ raise
+
+ else:
+ if sock is None:
+ raise ValueError('no path and sock were specified')
+ sock.setblocking(False)
+
+ transport, protocol = yield From(self._create_connection_transport(
+ sock, protocol_factory, ssl, server_hostname))
+ raise Return(transport, protocol)
+
+ @coroutine
+ def create_unix_server(self, protocol_factory, path=None,
+ sock=None, backlog=100, ssl=None):
+ if isinstance(ssl, bool):
+ raise TypeError('ssl argument must be an SSLContext or None')
+
+ if path is not None:
+ if sock is not None:
+ raise ValueError(
+ 'path and sock can not be specified at the same time')
+
+ sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+
+ try:
+ sock.bind(path)
+ except socket.error as exc:
+ sock.close()
+ if exc.errno == errno.EADDRINUSE:
+ # Let's improve the error message by adding
+ # with what exact address it occurs.
+ msg = 'Address {0!r} is already in use'.format(path)
+ raise OSError(errno.EADDRINUSE, msg)
+ else:
+ raise
+ except:
+ sock.close()
+ raise
+ else:
+ if sock is None:
+ raise ValueError(
+ 'path was not specified, and no sock specified')
+
+ if sock.family != socket.AF_UNIX:
+ raise ValueError(
+ 'A UNIX Domain Socket was expected, got {0!r}'.format(sock))
+
+ server = base_events.Server(self, [sock])
+ sock.listen(backlog)
+ sock.setblocking(False)
+ self._start_serving(protocol_factory, sock, ssl, server)
+ return server
+
+
+if hasattr(os, 'set_blocking'):
+ # Python 3.5 and newer
+ def _set_nonblocking(fd):
+ os.set_blocking(fd, False)
+else:
+ def _set_nonblocking(fd):
+ flags = fcntl.fcntl(fd, fcntl.F_GETFL)
+ flags = flags | os.O_NONBLOCK
+ fcntl.fcntl(fd, fcntl.F_SETFL, flags)
+
+
+class _UnixReadPipeTransport(transports.ReadTransport):
+
+ max_size = 256 * 1024 # max bytes we read in one event loop iteration
+
+ def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
+ super(_UnixReadPipeTransport, self).__init__(extra)
+ self._extra['pipe'] = pipe
+ self._loop = loop
+ self._pipe = pipe
+ self._fileno = pipe.fileno()
+ mode = os.fstat(self._fileno).st_mode
+ if not (stat.S_ISFIFO(mode) or
+ stat.S_ISSOCK(mode) or
+ stat.S_ISCHR(mode)):
+ raise ValueError("Pipe transport is for pipes/sockets only.")
+ _set_nonblocking(self._fileno)
+ self._protocol = protocol
+ self._closing = False
+ self._loop.add_reader(self._fileno, self._read_ready)
+ self._loop.call_soon(self._protocol.connection_made, self)
+ if waiter is not None:
+ # wait until protocol.connection_made() has been called
+ self._loop.call_soon(waiter._set_result_unless_cancelled, None)
+
+ def __repr__(self):
+ info = [self.__class__.__name__, 'fd=%s' % self._fileno]
+ if self._pipe is not None:
+ polling = selector_events._test_selector_event(
+ self._loop._selector,
+ self._fileno, selectors.EVENT_READ)
+ if polling:
+ info.append('polling')
+ else:
+ info.append('idle')
+ else:
+ info.append('closed')
+ return '<%s>' % ' '.join(info)
+
+ def _read_ready(self):
+ try:
+ data = wrap_error(os.read, self._fileno, self.max_size)
+ except (BlockingIOError, InterruptedError):
+ pass
+ except OSError as exc:
+ self._fatal_error(exc, 'Fatal read error on pipe transport')
+ else:
+ if data:
+ self._protocol.data_received(data)
+ else:
+ if self._loop.get_debug():
+ logger.info("%r was closed by peer", self)
+ self._closing = True
+ self._loop.remove_reader(self._fileno)
+ self._loop.call_soon(self._protocol.eof_received)
+ self._loop.call_soon(self._call_connection_lost, None)
+
+ def pause_reading(self):
+ self._loop.remove_reader(self._fileno)
+
+ def resume_reading(self):
+ self._loop.add_reader(self._fileno, self._read_ready)
+
+ def close(self):
+ if not self._closing:
+ self._close(None)
+
+ def _fatal_error(self, exc, message='Fatal error on pipe transport'):
+ # should be called by exception handler only
+ if (isinstance(exc, OSError) and exc.errno == errno.EIO):
+ if self._loop.get_debug():
+ logger.debug("%r: %s", self, message, exc_info=True)
+ else:
+ self._loop.call_exception_handler({
+ 'message': message,
+ 'exception': exc,
+ 'transport': self,
+ 'protocol': self._protocol,
+ })
+ self._close(exc)
+
+ def _close(self, exc):
+ self._closing = True
+ self._loop.remove_reader(self._fileno)
+ self._loop.call_soon(self._call_connection_lost, exc)
+
+ def _call_connection_lost(self, exc):
+ try:
+ self._protocol.connection_lost(exc)
+ finally:
+ self._pipe.close()
+ self._pipe = None
+ self._protocol = None
+ self._loop = None
+
+
+class _UnixWritePipeTransport(transports._FlowControlMixin,
+ transports.WriteTransport):
+
+ def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
+ super(_UnixWritePipeTransport, self).__init__(extra, loop)
+ self._extra['pipe'] = pipe
+ self._pipe = pipe
+ self._fileno = pipe.fileno()
+ mode = os.fstat(self._fileno).st_mode
+ is_socket = stat.S_ISSOCK(mode)
+ if not (is_socket or
+ stat.S_ISFIFO(mode) or
+ stat.S_ISCHR(mode)):
+ raise ValueError("Pipe transport is only for "
+ "pipes, sockets and character devices")
+ _set_nonblocking(self._fileno)
+ self._protocol = protocol
+ self._buffer = []
+ self._conn_lost = 0
+ self._closing = False # Set when close() or write_eof() called.
+
+ # On AIX, the reader trick only works for sockets.
+ # On other platforms it works for pipes and sockets.
+ # (Exception: OS X 10.4? Issue #19294.)
+ if is_socket or not sys.platform.startswith("aix"):
+ self._loop.add_reader(self._fileno, self._read_ready)
+
+ self._loop.call_soon(self._protocol.connection_made, self)
+ if waiter is not None:
+ # wait until protocol.connection_made() has been called
+ self._loop.call_soon(waiter._set_result_unless_cancelled, None)
+
+ def __repr__(self):
+ info = [self.__class__.__name__, 'fd=%s' % self._fileno]
+ if self._pipe is not None:
+ polling = selector_events._test_selector_event(
+ self._loop._selector,
+ self._fileno, selectors.EVENT_WRITE)
+ if polling:
+ info.append('polling')
+ else:
+ info.append('idle')
+
+ bufsize = self.get_write_buffer_size()
+ info.append('bufsize=%s' % bufsize)
+ else:
+ info.append('closed')
+ return '<%s>' % ' '.join(info)
+
+ def get_write_buffer_size(self):
+ return sum(len(data) for data in self._buffer)
+
+ def _read_ready(self):
+ # Pipe was closed by peer.
+ if self._loop.get_debug():
+ logger.info("%r was closed by peer", self)
+ if self._buffer:
+ self._close(BrokenPipeError())
+ else:
+ self._close()
+
+ def write(self, data):
+ data = flatten_bytes(data)
+ if not data:
+ return
+
+ if self._conn_lost or self._closing:
+ if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
+ logger.warning('pipe closed by peer or '
+ 'os.write(pipe, data) raised exception.')
+ self._conn_lost += 1
+ return
+
+ if not self._buffer:
+ # Attempt to send it right away first.
+ try:
+ n = wrap_error(os.write, self._fileno, data)
+ except (BlockingIOError, InterruptedError):
+ n = 0
+ except Exception as exc:
+ self._conn_lost += 1
+ self._fatal_error(exc, 'Fatal write error on pipe transport')
+ return
+ if n == len(data):
+ return
+ elif n > 0:
+ data = data[n:]
+ self._loop.add_writer(self._fileno, self._write_ready)
+
+ self._buffer.append(data)
+ self._maybe_pause_protocol()
+
+ def _write_ready(self):
+ data = b''.join(self._buffer)
+ assert data, 'Data should not be empty'
+
+ del self._buffer[:]
+ try:
+ n = wrap_error(os.write, self._fileno, data)
+ except (BlockingIOError, InterruptedError):
+ self._buffer.append(data)
+ except Exception as exc:
+ self._conn_lost += 1
+ # Remove writer here, _fatal_error() doesn't it
+ # because _buffer is empty.
+ self._loop.remove_writer(self._fileno)
+ self._fatal_error(exc, 'Fatal write error on pipe transport')
+ else:
+ if n == len(data):
+ self._loop.remove_writer(self._fileno)
+ self._maybe_resume_protocol() # May append to buffer.
+ if not self._buffer and self._closing:
+ self._loop.remove_reader(self._fileno)
+ self._call_connection_lost(None)
+ return
+ elif n > 0:
+ data = data[n:]
+
+ self._buffer.append(data) # Try again later.
+
+ def can_write_eof(self):
+ return True
+
+ # TODO: Make the relationships between write_eof(), close(),
+ # abort(), _fatal_error() and _close() more straightforward.
+
+ def write_eof(self):
+ if self._closing:
+ return
+ assert self._pipe
+ self._closing = True
+ if not self._buffer:
+ self._loop.remove_reader(self._fileno)
+ self._loop.call_soon(self._call_connection_lost, None)
+
+ def close(self):
+ if not self._closing:
+ # write_eof is all what we needed to close the write pipe
+ self.write_eof()
+
+ def abort(self):
+ self._close(None)
+
+ def _fatal_error(self, exc, message='Fatal error on pipe transport'):
+ # should be called by exception handler only
+ if isinstance(exc, (BrokenPipeError, ConnectionResetError)):
+ if self._loop.get_debug():
+ logger.debug("%r: %s", self, message, exc_info=True)
+ else:
+ self._loop.call_exception_handler({
+ 'message': message,
+ 'exception': exc,
+ 'transport': self,
+ 'protocol': self._protocol,
+ })
+ self._close(exc)
+
+ def _close(self, exc=None):
+ self._closing = True
+ if self._buffer:
+ self._loop.remove_writer(self._fileno)
+ del self._buffer[:]
+ self._loop.remove_reader(self._fileno)
+ self._loop.call_soon(self._call_connection_lost, exc)
+
+ def _call_connection_lost(self, exc):
+ try:
+ self._protocol.connection_lost(exc)
+ finally:
+ self._pipe.close()
+ self._pipe = None
+ self._protocol = None
+ self._loop = None
+
+
+if hasattr(os, 'set_inheritable'):
+ # Python 3.4 and newer
+ _set_inheritable = os.set_inheritable
+else:
+ def _set_inheritable(fd, inheritable):
+ cloexec_flag = getattr(fcntl, 'FD_CLOEXEC', 1)
+
+ old = fcntl.fcntl(fd, fcntl.F_GETFD)
+ if not inheritable:
+ fcntl.fcntl(fd, fcntl.F_SETFD, old | cloexec_flag)
+ else:
+ fcntl.fcntl(fd, fcntl.F_SETFD, old & ~cloexec_flag)
+
+
+class _UnixSubprocessTransport(base_subprocess.BaseSubprocessTransport):
+
+ def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
+ stdin_w = None
+ if stdin == subprocess.PIPE:
+ # Use a socket pair for stdin, since not all platforms
+ # support selecting read events on the write end of a
+ # socket (which we use in order to detect closing of the
+ # other end). Notably this is needed on AIX, and works
+ # just fine on other platforms.
+ stdin, stdin_w = self._loop._socketpair()
+
+ # Mark the write end of the stdin pipe as non-inheritable
+ _set_inheritable(stdin_w.fileno(), False)
+ self._proc = subprocess.Popen(
+ args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
+ universal_newlines=False, bufsize=bufsize, **kwargs)
+ if stdin_w is not None:
+ # Retrieve the file descriptor from stdin_w, stdin_w should not
+ # "own" the file descriptor anymore: closing stdin_fd file
+ # descriptor must close immediatly the file
+ stdin.close()
+ if hasattr(stdin_w, 'detach'):
+ stdin_fd = stdin_w.detach()
+ self._proc.stdin = os.fdopen(stdin_fd, 'wb', bufsize)
+ else:
+ stdin_dup = os.dup(stdin_w.fileno())
+ stdin_w.close()
+ self._proc.stdin = os.fdopen(stdin_dup, 'wb', bufsize)
+
+
+class AbstractChildWatcher(object):
+ """Abstract base class for monitoring child processes.
+
+ Objects derived from this class monitor a collection of subprocesses and
+ report their termination or interruption by a signal.
+
+ New callbacks are registered with .add_child_handler(). Starting a new
+ process must be done within a 'with' block to allow the watcher to suspend
+ its activity until the new process if fully registered (this is needed to
+ prevent a race condition in some implementations).
+
+ Example:
+ with watcher:
+ proc = subprocess.Popen("sleep 1")
+ watcher.add_child_handler(proc.pid, callback)
+
+ Notes:
+ Implementations of this class must be thread-safe.
+
+ Since child watcher objects may catch the SIGCHLD signal and call
+ waitpid(-1), there should be only one active object per process.
+ """
+
+ def add_child_handler(self, pid, callback, *args):
+ """Register a new child handler.
+
+ Arrange for callback(pid, returncode, *args) to be called when
+ process 'pid' terminates. Specifying another callback for the same
+ process replaces the previous handler.
+
+ Note: callback() must be thread-safe.
+ """
+ raise NotImplementedError()
+
+ def remove_child_handler(self, pid):
+ """Removes the handler for process 'pid'.
+
+ The function returns True if the handler was successfully removed,
+ False if there was nothing to remove."""
+
+ raise NotImplementedError()
+
+ def attach_loop(self, loop):
+ """Attach the watcher to an event loop.
+
+ If the watcher was previously attached to an event loop, then it is
+ first detached before attaching to the new loop.
+
+ Note: loop may be None.
+ """
+ raise NotImplementedError()
+
+ def close(self):
+ """Close the watcher.
+
+ This must be called to make sure that any underlying resource is freed.
+ """
+ raise NotImplementedError()
+
+ def __enter__(self):
+ """Enter the watcher's context and allow starting new processes
+
+ This function must return self"""
+ raise NotImplementedError()
+
+ def __exit__(self, a, b, c):
+ """Exit the watcher's context"""
+ raise NotImplementedError()
+
+
+class BaseChildWatcher(AbstractChildWatcher):
+
+ def __init__(self):
+ self._loop = None
+
+ def close(self):
+ self.attach_loop(None)
+
+ def _do_waitpid(self, expected_pid):
+ raise NotImplementedError()
+
+ def _do_waitpid_all(self):
+ raise NotImplementedError()
+
+ def attach_loop(self, loop):
+ assert loop is None or isinstance(loop, events.AbstractEventLoop)
+
+ if self._loop is not None:
+ self._loop.remove_signal_handler(signal.SIGCHLD)
+
+ self._loop = loop
+ if loop is not None:
+ loop.add_signal_handler(signal.SIGCHLD, self._sig_chld)
+
+ # Prevent a race condition in case a child terminated
+ # during the switch.
+ self._do_waitpid_all()
+
+ def _sig_chld(self):
+ try:
+ self._do_waitpid_all()
+ except Exception as exc:
+ # self._loop should always be available here
+ # as '_sig_chld' is added as a signal handler
+ # in 'attach_loop'
+ self._loop.call_exception_handler({
+ 'message': 'Unknown exception in SIGCHLD handler',
+ 'exception': exc,
+ })
+
+ def _compute_returncode(self, status):
+ if os.WIFSIGNALED(status):
+ # The child process died because of a signal.
+ return -os.WTERMSIG(status)
+ elif os.WIFEXITED(status):
+ # The child process exited (e.g sys.exit()).
+ return os.WEXITSTATUS(status)
+ else:
+ # The child exited, but we don't understand its status.
+ # This shouldn't happen, but if it does, let's just
+ # return that status; perhaps that helps debug it.
+ return status
+
+
+class SafeChildWatcher(BaseChildWatcher):
+ """'Safe' child watcher implementation.
+
+ This implementation avoids disrupting other code spawning processes by
+ polling explicitly each process in the SIGCHLD handler instead of calling
+ os.waitpid(-1).
+
+ This is a safe solution but it has a significant overhead when handling a
+ big number of children (O(n) each time SIGCHLD is raised)
+ """
+
+ def __init__(self):
+ super(SafeChildWatcher, self).__init__()
+ self._callbacks = {}
+
+ def close(self):
+ self._callbacks.clear()
+ super(SafeChildWatcher, self).close()
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, a, b, c):
+ pass
+
+ def add_child_handler(self, pid, callback, *args):
+ self._callbacks[pid] = callback, args
+
+ # Prevent a race condition in case the child is already terminated.
+ self._do_waitpid(pid)
+
+ def remove_child_handler(self, pid):
+ try:
+ del self._callbacks[pid]
+ return True
+ except KeyError:
+ return False
+
+ def _do_waitpid_all(self):
+
+ for pid in list(self._callbacks):
+ self._do_waitpid(pid)
+
+ def _do_waitpid(self, expected_pid):
+ assert expected_pid > 0
+
+ try:
+ pid, status = os.waitpid(expected_pid, os.WNOHANG)
+ except ChildProcessError:
+ # The child process is already reaped
+ # (may happen if waitpid() is called elsewhere).
+ pid = expected_pid
+ returncode = 255
+ logger.warning(
+ "Unknown child process pid %d, will report returncode 255",
+ pid)
+ else:
+ if pid == 0:
+ # The child process is still alive.
+ return
+
+ returncode = self._compute_returncode(status)
+ if self._loop.get_debug():
+ logger.debug('process %s exited with returncode %s',
+ expected_pid, returncode)
+
+ try:
+ callback, args = self._callbacks.pop(pid)
+ except KeyError: # pragma: no cover
+ # May happen if .remove_child_handler() is called
+ # after os.waitpid() returns.
+ if self._loop.get_debug():
+ logger.warning("Child watcher got an unexpected pid: %r",
+ pid, exc_info=True)
+ else:
+ callback(pid, returncode, *args)
+
+
+class FastChildWatcher(BaseChildWatcher):
+ """'Fast' child watcher implementation.
+
+ This implementation reaps every terminated processes by calling
+ os.waitpid(-1) directly, possibly breaking other code spawning processes
+ and waiting for their termination.
+
+ There is no noticeable overhead when handling a big number of children
+ (O(1) each time a child terminates).
+ """
+ def __init__(self):
+ super(FastChildWatcher, self).__init__()
+ self._callbacks = {}
+ self._lock = threading.Lock()
+ self._zombies = {}
+ self._forks = 0
+
+ def close(self):
+ self._callbacks.clear()
+ self._zombies.clear()
+ super(FastChildWatcher, self).close()
+
+ def __enter__(self):
+ with self._lock:
+ self._forks += 1
+
+ return self
+
+ def __exit__(self, a, b, c):
+ with self._lock:
+ self._forks -= 1
+
+ if self._forks or not self._zombies:
+ return
+
+ collateral_victims = str(self._zombies)
+ self._zombies.clear()
+
+ logger.warning(
+ "Caught subprocesses termination from unknown pids: %s",
+ collateral_victims)
+
+ def add_child_handler(self, pid, callback, *args):
+ assert self._forks, "Must use the context manager"
+ with self._lock:
+ try:
+ returncode = self._zombies.pop(pid)
+ except KeyError:
+ # The child is running.
+ self._callbacks[pid] = callback, args
+ return
+
+ # The child is dead already. We can fire the callback.
+ callback(pid, returncode, *args)
+
+ def remove_child_handler(self, pid):
+ try:
+ del self._callbacks[pid]
+ return True
+ except KeyError:
+ return False
+
+ def _do_waitpid_all(self):
+ # Because of signal coalescing, we must keep calling waitpid() as
+ # long as we're able to reap a child.
+ while True:
+ try:
+ pid, status = wrap_error(os.waitpid, -1, os.WNOHANG)
+ except ChildProcessError:
+ # No more child processes exist.
+ return
+ else:
+ if pid == 0:
+ # A child process is still alive.
+ return
+
+ returncode = self._compute_returncode(status)
+
+ with self._lock:
+ try:
+ callback, args = self._callbacks.pop(pid)
+ except KeyError:
+ # unknown child
+ if self._forks:
+ # It may not be registered yet.
+ self._zombies[pid] = returncode
+ if self._loop.get_debug():
+ logger.debug('unknown process %s exited '
+ 'with returncode %s',
+ pid, returncode)
+ continue
+ callback = None
+ else:
+ if self._loop.get_debug():
+ logger.debug('process %s exited with returncode %s',
+ pid, returncode)
+
+ if callback is None:
+ logger.warning(
+ "Caught subprocess termination from unknown pid: "
+ "%d -> %d", pid, returncode)
+ else:
+ callback(pid, returncode, *args)
+
+
+class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
+ """XXX"""
+ _loop_factory = _UnixSelectorEventLoop
+
+ def __init__(self):
+ super(_UnixDefaultEventLoopPolicy, self).__init__()
+ self._watcher = None
+
+ def _init_watcher(self):
+ with events._lock:
+ if self._watcher is None: # pragma: no branch
+ self._watcher = SafeChildWatcher()
+ if isinstance(threading.current_thread(),
+ threading._MainThread):
+ self._watcher.attach_loop(self._local._loop)
+
+ def set_event_loop(self, loop):
+ """Set the event loop.
+
+ As a side effect, if a child watcher was set before, then calling
+ .set_event_loop() from the main thread will call .attach_loop(loop) on
+ the child watcher.
+ """
+
+ super(_UnixDefaultEventLoopPolicy, self).set_event_loop(loop)
+
+ if self._watcher is not None and \
+ isinstance(threading.current_thread(), threading._MainThread):
+ self._watcher.attach_loop(loop)
+
+ def get_child_watcher(self):
+ """Get the watcher for child processes.
+
+ If not yet set, a SafeChildWatcher object is automatically created.
+ """
+ if self._watcher is None:
+ self._init_watcher()
+
+ return self._watcher
+
+ def set_child_watcher(self, watcher):
+ """Set the watcher for child processes."""
+
+ assert watcher is None or isinstance(watcher, AbstractChildWatcher)
+
+ if self._watcher is not None:
+ self._watcher.close()
+
+ self._watcher = watcher
+
+SelectorEventLoop = _UnixSelectorEventLoop
+DefaultEventLoopPolicy = _UnixDefaultEventLoopPolicy