summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--asyncio/base_events.py97
-rw-r--r--asyncio/base_subprocess.py128
-rw-r--r--asyncio/futures.py6
-rw-r--r--asyncio/proactor_events.py16
-rw-r--r--asyncio/protocols.py5
-rw-r--r--asyncio/queues.py8
-rw-r--r--asyncio/selector_events.py115
-rw-r--r--asyncio/sslproto.py34
-rw-r--r--asyncio/subprocess.py40
-rw-r--r--asyncio/tasks.py2
-rw-r--r--asyncio/test_utils.py4
-rw-r--r--asyncio/unix_events.py67
-rw-r--r--asyncio/windows_events.py128
-rw-r--r--asyncio/windows_utils.py8
-rw-r--r--examples/shell.py4
-rw-r--r--examples/subprocess_attach_write_pipe.py2
-rw-r--r--overlapped.c7
-rw-r--r--tests/test_events.py76
-rw-r--r--tests/test_proactor_events.py6
-rw-r--r--tests/test_selector_events.py31
-rw-r--r--tests/test_sslproto.py46
-rw-r--r--tests/test_subprocess.py106
-rw-r--r--tests/test_unix_events.py29
-rw-r--r--tests/test_windows_events.py13
-rw-r--r--tox.ini4
25 files changed, 695 insertions, 287 deletions
diff --git a/asyncio/base_events.py b/asyncio/base_events.py
index e43441e..eb867cd 100644
--- a/asyncio/base_events.py
+++ b/asyncio/base_events.py
@@ -26,6 +26,7 @@ import threading
import time
import traceback
import sys
+import warnings
from . import coroutines
from . import events
@@ -74,7 +75,11 @@ class _StopError(BaseException):
def _check_resolved_address(sock, address):
# Ensure that the address is already resolved to avoid the trap of hanging
# the entire event loop when the address requires doing a DNS lookup.
+ #
+ # getaddrinfo() is slow (around 10 us per call): this function should only
+ # be called in debug mode
family = sock.family
+
if family == socket.AF_INET:
host, port = address
elif family == socket.AF_INET6:
@@ -82,22 +87,34 @@ def _check_resolved_address(sock, address):
else:
return
- type_mask = 0
- if hasattr(socket, 'SOCK_NONBLOCK'):
- type_mask |= socket.SOCK_NONBLOCK
- if hasattr(socket, 'SOCK_CLOEXEC'):
- type_mask |= socket.SOCK_CLOEXEC
- # Use getaddrinfo(flags=AI_NUMERICHOST) to ensure that the address is
- # already resolved.
- try:
- socket.getaddrinfo(host, port,
- family=family,
- type=(sock.type & ~type_mask),
- proto=sock.proto,
- flags=socket.AI_NUMERICHOST)
- except socket.gaierror as err:
- raise ValueError("address must be resolved (IP address), got %r: %s"
- % (address, err))
+ # On Windows, socket.inet_pton() is only available since Python 3.4
+ if hasattr(socket, 'inet_pton'):
+ # getaddrinfo() is slow and has known issue: prefer inet_pton()
+ # if available
+ try:
+ socket.inet_pton(family, host)
+ except OSError as exc:
+ raise ValueError("address must be resolved (IP address), "
+ "got host %r: %s"
+ % (host, exc))
+ else:
+ # Use getaddrinfo(flags=AI_NUMERICHOST) to ensure that the address is
+ # already resolved.
+ type_mask = 0
+ if hasattr(socket, 'SOCK_NONBLOCK'):
+ type_mask |= socket.SOCK_NONBLOCK
+ if hasattr(socket, 'SOCK_CLOEXEC'):
+ type_mask |= socket.SOCK_CLOEXEC
+ try:
+ socket.getaddrinfo(host, port,
+ family=family,
+ type=(sock.type & ~type_mask),
+ proto=sock.proto,
+ flags=socket.AI_NUMERICHOST)
+ except socket.gaierror as err:
+ raise ValueError("address must be resolved (IP address), "
+ "got host %r: %s"
+ % (host, err))
def _raise_stop_error(*args):
raise _StopError
@@ -171,7 +188,7 @@ class BaseEventLoop(events.AbstractEventLoop):
self._internal_fds = 0
# Identifier of the thread running the event loop, or None if the
# event loop is not running
- self._owner = None
+ self._thread_id = None
self._clock_resolution = time.get_clock_info('monotonic').resolution
self._exception_handler = None
self._debug = (not sys.flags.ignore_environment
@@ -179,6 +196,7 @@ class BaseEventLoop(events.AbstractEventLoop):
# In debug mode, if the execution of a callback or a step of a task
# exceed this duration in seconds, the slow callback/task is logged.
self.slow_callback_duration = 0.1
+ self._current_handle = None
def __repr__(self):
return ('<%s running=%s closed=%s debug=%s>'
@@ -251,7 +269,7 @@ class BaseEventLoop(events.AbstractEventLoop):
self._check_closed()
if self.is_running():
raise RuntimeError('Event loop is running.')
- self._owner = threading.get_ident()
+ self._thread_id = threading.get_ident()
try:
while True:
try:
@@ -259,7 +277,7 @@ class BaseEventLoop(events.AbstractEventLoop):
except _StopError:
break
finally:
- self._owner = None
+ self._thread_id = None
def run_until_complete(self, future):
"""Run until the Future is done.
@@ -332,9 +350,19 @@ class BaseEventLoop(events.AbstractEventLoop):
"""Returns True if the event loop was closed."""
return self._closed
+ # On Python 3.3 and older, objects with a destructor part of a reference
+ # cycle are never destroyed. It's not more the case on Python 3.4 thanks
+ # to the PEP 442.
+ if sys.version_info >= (3, 4):
+ def __del__(self):
+ if not self.is_closed():
+ warnings.warn("unclosed event loop %r" % self, ResourceWarning)
+ if not self.is_running():
+ self.close()
+
def is_running(self):
"""Returns True if the event loop is running."""
- return (self._owner is not None)
+ return (self._thread_id is not None)
def time(self):
"""Return the time according to the event loop's clock.
@@ -421,10 +449,10 @@ class BaseEventLoop(events.AbstractEventLoop):
Should only be called when (self._debug == True). The caller is
responsible for checking this condition for performance reasons.
"""
- if self._owner is None:
+ if self._thread_id is None:
return
thread_id = threading.get_ident()
- if thread_id != self._owner:
+ if thread_id != self._thread_id:
raise RuntimeError(
"Non-thread-safe operation invoked on an event loop other "
"than the current one")
@@ -955,6 +983,11 @@ class BaseEventLoop(events.AbstractEventLoop):
else:
exc_info = False
+ if ('source_traceback' not in context
+ and self._current_handle is not None
+ and self._current_handle._source_traceback):
+ context['handle_traceback'] = self._current_handle._source_traceback
+
log_lines = [message]
for key in sorted(context):
if key in {'message', 'exception'}:
@@ -964,6 +997,10 @@ class BaseEventLoop(events.AbstractEventLoop):
tb = ''.join(traceback.format_list(value))
value = 'Object created at (most recent call last):\n'
value += tb.rstrip()
+ elif key == 'handle_traceback':
+ tb = ''.join(traceback.format_list(value))
+ value = 'Handle created at (most recent call last):\n'
+ value += tb.rstrip()
else:
value = repr(value)
log_lines.append('{}: {}'.format(key, value))
@@ -1121,12 +1158,16 @@ class BaseEventLoop(events.AbstractEventLoop):
if handle._cancelled:
continue
if self._debug:
- t0 = self.time()
- handle._run()
- dt = self.time() - t0
- if dt >= self.slow_callback_duration:
- logger.warning('Executing %s took %.3f seconds',
- _format_handle(handle), dt)
+ try:
+ self._current_handle = handle
+ t0 = self.time()
+ handle._run()
+ dt = self.time() - t0
+ if dt >= self.slow_callback_duration:
+ logger.warning('Executing %s took %.3f seconds',
+ _format_handle(handle), dt)
+ finally:
+ self._current_handle = None
else:
handle._run()
handle = None # Needed to break cycles when an exception occurs.
diff --git a/asyncio/base_subprocess.py b/asyncio/base_subprocess.py
index f5e7dfe..f56873f 100644
--- a/asyncio/base_subprocess.py
+++ b/asyncio/base_subprocess.py
@@ -1,6 +1,9 @@
import collections
import subprocess
+import sys
+import warnings
+from . import futures
from . import protocols
from . import transports
from .coroutines import coroutine
@@ -11,26 +14,32 @@ class BaseSubprocessTransport(transports.SubprocessTransport):
def __init__(self, loop, protocol, args, shell,
stdin, stdout, stderr, bufsize,
- extra=None, **kwargs):
+ waiter=None, extra=None, **kwargs):
super().__init__(extra)
+ self._closed = False
self._protocol = protocol
self._loop = loop
+ self._proc = None
self._pid = None
-
+ self._returncode = None
+ self._exit_waiters = []
+ self._pending_calls = collections.deque()
self._pipes = {}
+ self._finished = False
+
if stdin == subprocess.PIPE:
self._pipes[0] = None
if stdout == subprocess.PIPE:
self._pipes[1] = None
if stderr == subprocess.PIPE:
self._pipes[2] = None
- self._pending_calls = collections.deque()
- self._finished = False
- self._returncode = None
+
+ # Create the child process: set the _proc attribute
self._start(args=args, shell=shell, stdin=stdin, stdout=stdout,
stderr=stderr, bufsize=bufsize, **kwargs)
self._pid = self._proc.pid
self._extra['subprocess'] = self._proc
+
if self._loop.get_debug():
if isinstance(args, (bytes, str)):
program = args
@@ -39,10 +48,17 @@ class BaseSubprocessTransport(transports.SubprocessTransport):
logger.debug('process %r created: pid %s',
program, self._pid)
+ self._loop.create_task(self._connect_pipes(waiter))
+
def __repr__(self):
- info = [self.__class__.__name__, 'pid=%s' % self._pid]
+ info = [self.__class__.__name__]
+ if self._closed:
+ info.append('closed')
+ info.append('pid=%s' % self._pid)
if self._returncode is not None:
info.append('returncode=%s' % self._returncode)
+ else:
+ info.append('running')
stdin = self._pipes.get(0)
if stdin is not None:
@@ -70,12 +86,39 @@ class BaseSubprocessTransport(transports.SubprocessTransport):
raise NotImplementedError
def close(self):
+ if self._closed:
+ return
+ self._closed = True
+
for proto in self._pipes.values():
if proto is None:
continue
proto.pipe.close()
- if self._returncode is None:
- self.terminate()
+
+ if (self._proc is not None
+ # the child process finished?
+ and self._returncode is None
+ # the child process finished but the transport was not notified yet?
+ and self._proc.poll() is None
+ ):
+ if self._loop.get_debug():
+ logger.warning('Close running child process: kill %r', self)
+
+ try:
+ self._proc.kill()
+ except ProcessLookupError:
+ pass
+
+ # Don't clear the _proc reference yet: _post_init() may still run
+
+ # On Python 3.3 and older, objects with a destructor part of a reference
+ # cycle are never destroyed. It's not more the case on Python 3.4 thanks
+ # to the PEP 442.
+ if sys.version_info >= (3, 4):
+ def __del__(self):
+ if not self._closed:
+ warnings.warn("unclosed transport %r" % self, ResourceWarning)
+ self.close()
def get_pid(self):
return self._pid
@@ -89,55 +132,40 @@ class BaseSubprocessTransport(transports.SubprocessTransport):
else:
return None
+ def _check_proc(self):
+ if self._proc is None:
+ raise ProcessLookupError()
+
def send_signal(self, signal):
+ self._check_proc()
self._proc.send_signal(signal)
def terminate(self):
+ self._check_proc()
self._proc.terminate()
def kill(self):
+ self._check_proc()
self._proc.kill()
- def _kill_wait(self):
- """Close pipes, kill the subprocess and read its return status.
-
- Function called when an exception is raised during the creation
- of a subprocess.
- """
- if self._loop.get_debug():
- logger.warning('Exception during subprocess creation, '
- 'kill the subprocess %r',
- self,
- exc_info=True)
-
- proc = self._proc
- if proc.stdout:
- proc.stdout.close()
- if proc.stderr:
- proc.stderr.close()
- if proc.stdin:
- proc.stdin.close()
- try:
- proc.kill()
- except ProcessLookupError:
- pass
- self._returncode = proc.wait()
-
@coroutine
- def _post_init(self):
+ def _connect_pipes(self, waiter):
try:
proc = self._proc
loop = self._loop
+
if proc.stdin is not None:
_, pipe = yield from loop.connect_write_pipe(
lambda: WriteSubprocessPipeProto(self, 0),
proc.stdin)
self._pipes[0] = pipe
+
if proc.stdout is not None:
_, pipe = yield from loop.connect_read_pipe(
lambda: ReadSubprocessPipeProto(self, 1),
proc.stdout)
self._pipes[1] = pipe
+
if proc.stderr is not None:
_, pipe = yield from loop.connect_read_pipe(
lambda: ReadSubprocessPipeProto(self, 2),
@@ -146,13 +174,16 @@ class BaseSubprocessTransport(transports.SubprocessTransport):
assert self._pending_calls is not None
- self._loop.call_soon(self._protocol.connection_made, self)
+ loop.call_soon(self._protocol.connection_made, self)
for callback, data in self._pending_calls:
- self._loop.call_soon(callback, *data)
+ loop.call_soon(callback, *data)
self._pending_calls = None
- except:
- self._kill_wait()
- raise
+ except Exception as exc:
+ if waiter is not None and not waiter.cancelled():
+ waiter.set_exception(exc)
+ else:
+ if waiter is not None and not waiter.cancelled():
+ waiter.set_result(None)
def _call(self, cb, *data):
if self._pending_calls is not None:
@@ -177,6 +208,23 @@ class BaseSubprocessTransport(transports.SubprocessTransport):
self._call(self._protocol.process_exited)
self._try_finish()
+ # wake up futures waiting for wait()
+ for waiter in self._exit_waiters:
+ if not waiter.cancelled():
+ waiter.set_result(returncode)
+ self._exit_waiters = None
+
+ def _wait(self):
+ """Wait until the process exit and return the process return code.
+
+ This method is a coroutine."""
+ if self._returncode is not None:
+ return self._returncode
+
+ waiter = futures.Future(loop=self._loop)
+ self._exit_waiters.append(waiter)
+ return (yield from waiter)
+
def _try_finish(self):
assert not self._finished
if self._returncode is None:
@@ -190,9 +238,9 @@ class BaseSubprocessTransport(transports.SubprocessTransport):
try:
self._protocol.connection_lost(exc)
finally:
+ self._loop = None
self._proc = None
self._protocol = None
- self._loop = None
class WriteSubprocessPipeProto(protocols.BaseProtocol):
diff --git a/asyncio/futures.py b/asyncio/futures.py
index 19212a9..2c741fd 100644
--- a/asyncio/futures.py
+++ b/asyncio/futures.py
@@ -195,9 +195,9 @@ class Future:
info = self._repr_info()
return '<%s %s>' % (self.__class__.__name__, ' '.join(info))
- # On Python 3.3 or older, objects with a destructor part of a reference
- # cycle are never destroyed. It's not more the case on Python 3.4 thanks to
- # the PEP 442.
+ # On Python 3.3 and older, objects with a destructor part of a reference
+ # cycle are never destroyed. It's not more the case on Python 3.4 thanks
+ # to the PEP 442.
if _PY34:
def __del__(self):
if not self._log_traceback:
diff --git a/asyncio/proactor_events.py b/asyncio/proactor_events.py
index ed17062..9c2b8f1 100644
--- a/asyncio/proactor_events.py
+++ b/asyncio/proactor_events.py
@@ -7,6 +7,8 @@ proactor is only implemented on Windows with IOCP.
__all__ = ['BaseProactorEventLoop']
import socket
+import sys
+import warnings
from . import base_events
from . import constants
@@ -38,7 +40,7 @@ class _ProactorBasePipeTransport(transports._FlowControlMixin,
self._server._attach()
self._loop.call_soon(self._protocol.connection_made, self)
if waiter is not None:
- # wait until protocol.connection_made() has been called
+ # only wake up the waiter when connection_made() has been called
self._loop.call_soon(waiter._set_result_unless_cancelled, None)
def __repr__(self):
@@ -74,6 +76,15 @@ class _ProactorBasePipeTransport(transports._FlowControlMixin,
self._read_fut.cancel()
self._read_fut = None
+ # On Python 3.3 and older, objects with a destructor part of a reference
+ # cycle are never destroyed. It's not more the case on Python 3.4 thanks
+ # to the PEP 442.
+ if sys.version_info >= (3, 4):
+ def __del__(self):
+ if self._sock is not None:
+ warnings.warn("unclosed transport %r" % self, ResourceWarning)
+ self.close()
+
def _fatal_error(self, exc, message='Fatal error on pipe transport'):
if isinstance(exc, (BrokenPipeError, ConnectionResetError)):
if self._loop.get_debug():
@@ -426,7 +437,8 @@ class BaseProactorEventLoop(base_events.BaseEventLoop):
def sock_connect(self, sock, address):
try:
- base_events._check_resolved_address(sock, address)
+ if self._debug:
+ base_events._check_resolved_address(sock, address)
except ValueError as err:
fut = futures.Future(loop=self)
fut.set_exception(err)
diff --git a/asyncio/protocols.py b/asyncio/protocols.py
index 52fc25c..80fcac9 100644
--- a/asyncio/protocols.py
+++ b/asyncio/protocols.py
@@ -78,6 +78,11 @@ class Protocol(BaseProtocol):
State machine of calls:
start -> CM [-> DR*] [-> ER?] -> CL -> end
+
+ * CM: connection_made()
+ * DR: data_received()
+ * ER: eof_received()
+ * CL: connection_lost()
"""
def data_received(self, data):
diff --git a/asyncio/queues.py b/asyncio/queues.py
index 8680d58..84cdabc 100644
--- a/asyncio/queues.py
+++ b/asyncio/queues.py
@@ -13,12 +13,16 @@ from .tasks import coroutine
class QueueEmpty(Exception):
- 'Exception raised by Queue.get(block=0)/get_nowait().'
+ """Exception raised when Queue.get_nowait() is called on a Queue object
+ which is empty.
+ """
pass
class QueueFull(Exception):
- 'Exception raised by Queue.put(block=0)/put_nowait().'
+ """Exception raised when the Queue.put_nowait() method is called on a Queue
+ object which is full.
+ """
pass
diff --git a/asyncio/selector_events.py b/asyncio/selector_events.py
index 24f8461..a38ed1c 100644
--- a/asyncio/selector_events.py
+++ b/asyncio/selector_events.py
@@ -10,6 +10,8 @@ import collections
import errno
import functools
import socket
+import sys
+import warnings
try:
import ssl
except ImportError: # pragma: no cover
@@ -22,6 +24,7 @@ from . import futures
from . import selectors
from . import transports
from . import sslproto
+from .coroutines import coroutine
from .log import logger
@@ -181,16 +184,47 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
else:
raise # The event loop will catch, log and ignore it.
else:
+ extra = {'peername': addr}
+ accept = self._accept_connection2(protocol_factory, conn, extra,
+ sslcontext, server)
+ self.create_task(accept)
+
+ @coroutine
+ def _accept_connection2(self, protocol_factory, conn, extra,
+ sslcontext=None, server=None):
+ protocol = None
+ transport = None
+ try:
protocol = protocol_factory()
+ waiter = futures.Future(loop=self)
if sslcontext:
- self._make_ssl_transport(
- conn, protocol, sslcontext,
- server_side=True, extra={'peername': addr}, server=server)
+ transport = self._make_ssl_transport(
+ conn, protocol, sslcontext, waiter=waiter,
+ server_side=True, extra=extra, server=server)
else:
- self._make_socket_transport(
- conn, protocol , extra={'peername': addr},
+ transport = self._make_socket_transport(
+ conn, protocol, waiter=waiter, extra=extra,
server=server)
- # It's now up to the protocol to handle the connection.
+
+ try:
+ yield from waiter
+ except:
+ transport.close()
+ raise
+
+ # It's now up to the protocol to handle the connection.
+ except Exception as exc:
+ if self._debug:
+ context = {
+ 'message': ('Error on transport creation '
+ 'for incoming connection'),
+ 'exception': exc,
+ }
+ if protocol is not None:
+ context['protocol'] = protocol
+ if transport is not None:
+ context['transport'] = transport
+ self.call_exception_handler(context)
def add_reader(self, fd, callback, *args):
"""Add a reader callback."""
@@ -278,7 +312,7 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
This method is a coroutine.
"""
- if self.get_debug() and sock.gettimeout() != 0:
+ if self._debug and sock.gettimeout() != 0:
raise ValueError("the socket must be non-blocking")
fut = futures.Future(loop=self)
self._sock_recv(fut, False, sock, n)
@@ -316,7 +350,7 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
This method is a coroutine.
"""
- if self.get_debug() and sock.gettimeout() != 0:
+ if self._debug and sock.gettimeout() != 0:
raise ValueError("the socket must be non-blocking")
fut = futures.Future(loop=self)
if data:
@@ -359,11 +393,12 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
This method is a coroutine.
"""
- if self.get_debug() and sock.gettimeout() != 0:
+ if self._debug and sock.gettimeout() != 0:
raise ValueError("the socket must be non-blocking")
fut = futures.Future(loop=self)
try:
- base_events._check_resolved_address(sock, address)
+ if self._debug:
+ base_events._check_resolved_address(sock, address)
except ValueError as err:
fut.set_exception(err)
else:
@@ -419,7 +454,7 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
This method is a coroutine.
"""
- if self.get_debug() and sock.gettimeout() != 0:
+ if self._debug and sock.gettimeout() != 0:
raise ValueError("the socket must be non-blocking")
fut = futures.Future(loop=self)
self._sock_accept(fut, False, sock)
@@ -467,7 +502,12 @@ class _SelectorTransport(transports._FlowControlMixin,
_buffer_factory = bytearray # Constructs initial value for self._buffer.
- def __init__(self, loop, sock, protocol, extra, server=None):
+ # Attribute used in the destructor: it must be set even if the constructor
+ # is not called (see _SelectorSslTransport which may start by raising an
+ # exception)
+ _sock = None
+
+ def __init__(self, loop, sock, protocol, extra=None, server=None):
super().__init__(extra, loop)
self._extra['socket'] = sock
self._extra['sockname'] = sock.getsockname()
@@ -479,6 +519,7 @@ class _SelectorTransport(transports._FlowControlMixin,
self._sock = sock
self._sock_fd = sock.fileno()
self._protocol = protocol
+ self._protocol_connected = True
self._server = server
self._buffer = self._buffer_factory()
self._conn_lost = 0 # Set when call to connection_lost scheduled.
@@ -526,6 +567,15 @@ class _SelectorTransport(transports._FlowControlMixin,
self._conn_lost += 1
self._loop.call_soon(self._call_connection_lost, None)
+ # On Python 3.3 and older, objects with a destructor part of a reference
+ # cycle are never destroyed. It's not more the case on Python 3.4 thanks
+ # to the PEP 442.
+ if sys.version_info >= (3, 4):
+ def __del__(self):
+ if self._sock is not None:
+ warnings.warn("unclosed transport %r" % self, ResourceWarning)
+ self._sock.close()
+
def _fatal_error(self, exc, message='Fatal error on transport'):
# Should be called from exception handler only.
if isinstance(exc, (BrokenPipeError,
@@ -555,7 +605,8 @@ class _SelectorTransport(transports._FlowControlMixin,
def _call_connection_lost(self, exc):
try:
- self._protocol.connection_lost(exc)
+ if self._protocol_connected:
+ self._protocol.connection_lost(exc)
finally:
self._sock.close()
self._sock = None
@@ -578,10 +629,12 @@ class _SelectorSocketTransport(_SelectorTransport):
self._eof = False
self._paused = False
- self._loop.add_reader(self._sock_fd, self._read_ready)
self._loop.call_soon(self._protocol.connection_made, self)
+ # only start reading when connection_made() has been called
+ self._loop.call_soon(self._loop.add_reader,
+ self._sock_fd, self._read_ready)
if waiter is not None:
- # wait until protocol.connection_made() has been called
+ # only wake up the waiter when connection_made() has been called
self._loop.call_soon(waiter._set_result_unless_cancelled, None)
def pause_reading(self):
@@ -716,6 +769,8 @@ class _SelectorSslTransport(_SelectorTransport):
sslsock = sslcontext.wrap_socket(rawsock, **wrap_kwargs)
super().__init__(loop, sslsock, protocol, extra, server)
+ # the protocol connection is only made after the SSL handshake
+ self._protocol_connected = False
self._server_hostname = server_hostname
self._waiter = waiter
@@ -732,6 +787,16 @@ class _SelectorSslTransport(_SelectorTransport):
start_time = None
self._on_handshake(start_time)
+ def _wakeup_waiter(self, exc=None):
+ if self._waiter is None:
+ return
+ if not self._waiter.cancelled():
+ if exc is not None:
+ self._waiter.set_exception(exc)
+ else:
+ self._waiter.set_result(None)
+ self._waiter = None
+
def _on_handshake(self, start_time):
try:
self._sock.do_handshake()
@@ -750,8 +815,7 @@ class _SelectorSslTransport(_SelectorTransport):
self._loop.remove_reader(self._sock_fd)
self._loop.remove_writer(self._sock_fd)
self._sock.close()
- if self._waiter is not None and not self._waiter.cancelled():
- self._waiter.set_exception(exc)
+ self._wakeup_waiter(exc)
if isinstance(exc, Exception):
return
else:
@@ -774,9 +838,7 @@ class _SelectorSslTransport(_SelectorTransport):
"on matching the hostname",
self, exc_info=True)
self._sock.close()
- if (self._waiter is not None
- and not self._waiter.cancelled()):
- self._waiter.set_exception(exc)
+ self._wakeup_waiter(exc)
return
# Add extra info that becomes available after handshake.
@@ -788,11 +850,10 @@ class _SelectorSslTransport(_SelectorTransport):
self._read_wants_write = False
self._write_wants_read = False
self._loop.add_reader(self._sock_fd, self._read_ready)
+ self._protocol_connected = True
self._loop.call_soon(self._protocol.connection_made, self)
- if self._waiter is not None:
- # wait until protocol.connection_made() has been called
- self._loop.call_soon(self._waiter._set_result_unless_cancelled,
- None)
+ # only wake up the waiter when connection_made() has been called
+ self._loop.call_soon(self._wakeup_waiter)
if self._loop.get_debug():
dt = self._loop.time() - start_time
@@ -921,10 +982,12 @@ class _SelectorDatagramTransport(_SelectorTransport):
waiter=None, extra=None):
super().__init__(loop, sock, protocol, extra)
self._address = address
- self._loop.add_reader(self._sock_fd, self._read_ready)
self._loop.call_soon(self._protocol.connection_made, self)
+ # only start reading when connection_made() has been called
+ self._loop.call_soon(self._loop.add_reader,
+ self._sock_fd, self._read_ready)
if waiter is not None:
- # wait until protocol.connection_made() has been called
+ # only wake up the waiter when connection_made() has been called
self._loop.call_soon(waiter._set_result_unless_cancelled, None)
def get_write_buffer_size(self):
diff --git a/asyncio/sslproto.py b/asyncio/sslproto.py
index 117dc56..235855e 100644
--- a/asyncio/sslproto.py
+++ b/asyncio/sslproto.py
@@ -1,4 +1,6 @@
import collections
+import sys
+import warnings
try:
import ssl
except ImportError: # pragma: no cover
@@ -295,6 +297,7 @@ class _SSLProtocolTransport(transports._FlowControlMixin,
self._loop = loop
self._ssl_protocol = ssl_protocol
self._app_protocol = app_protocol
+ self._closed = False
def get_extra_info(self, name, default=None):
"""Get optional transport information."""
@@ -308,8 +311,18 @@ class _SSLProtocolTransport(transports._FlowControlMixin,
protocol's connection_lost() method will (eventually) called
with None as its argument.
"""
+ self._closed = True
self._ssl_protocol._start_shutdown()
+ # On Python 3.3 and older, objects with a destructor part of a reference
+ # cycle are never destroyed. It's not more the case on Python 3.4 thanks
+ # to the PEP 442.
+ if sys.version_info >= (3, 4):
+ def __del__(self):
+ if not self._closed:
+ warnings.warn("unclosed transport %r" % self, ResourceWarning)
+ self.close()
+
def pause_reading(self):
"""Pause the receiving end.
@@ -408,7 +421,6 @@ class SSLProtocol(protocols.Protocol):
self._write_buffer_size = 0
self._waiter = waiter
- self._closing = False
self._loop = loop
self._app_protocol = app_protocol
self._app_transport = _SSLProtocolTransport(self._loop,
@@ -419,6 +431,16 @@ class SSLProtocol(protocols.Protocol):
self._in_shutdown = False
self._transport = None
+ def _wakeup_waiter(self, exc=None):
+ if self._waiter is None:
+ return
+ if not self._waiter.cancelled():
+ if exc is not None:
+ self._waiter.set_exception(exc)
+ else:
+ self._waiter.set_result(None)
+ self._waiter = None
+
def connection_made(self, transport):
"""Called when the low-level connection is made.
@@ -490,6 +512,9 @@ class SSLProtocol(protocols.Protocol):
try:
if self._loop.get_debug():
logger.debug("%r received EOF", self)
+
+ self._wakeup_waiter(ConnectionResetError)
+
if not self._in_handshake:
keep_open = self._app_protocol.eof_received()
if keep_open:
@@ -553,8 +578,7 @@ class SSLProtocol(protocols.Protocol):
self, exc_info=True)
self._transport.close()
if isinstance(exc, Exception):
- if self._waiter is not None and not self._waiter.cancelled():
- self._waiter.set_exception(exc)
+ self._wakeup_waiter(exc)
return
else:
raise
@@ -569,9 +593,7 @@ class SSLProtocol(protocols.Protocol):
compression=sslobj.compression(),
)
self._app_protocol.connection_made(self._app_transport)
- if self._waiter is not None:
- # wait until protocol.connection_made() has been called
- self._waiter._set_result_unless_cancelled(None)
+ self._wakeup_waiter()
self._session_established = True
# In case transport.write() was already called. Don't call
# immediatly _process_write_backlog(), but schedule it:
diff --git a/asyncio/subprocess.py b/asyncio/subprocess.py
index c848a21..4600a9f 100644
--- a/asyncio/subprocess.py
+++ b/asyncio/subprocess.py
@@ -25,8 +25,6 @@ class SubprocessStreamProtocol(streams.FlowControlMixin,
super().__init__(loop=loop)
self._limit = limit
self.stdin = self.stdout = self.stderr = None
- self.waiter = futures.Future(loop=loop)
- self._waiters = collections.deque()
self._transport = None
def __repr__(self):
@@ -61,9 +59,6 @@ class SubprocessStreamProtocol(streams.FlowControlMixin,
reader=None,
loop=self._loop)
- if not self.waiter.cancelled():
- self.waiter.set_result(None)
-
def pipe_data_received(self, fd, data):
if fd == 1:
reader = self.stdout
@@ -94,16 +89,9 @@ class SubprocessStreamProtocol(streams.FlowControlMixin,
reader.set_exception(exc)
def process_exited(self):
- returncode = self._transport.get_returncode()
self._transport.close()
self._transport = None
- # wake up futures waiting for wait()
- while self._waiters:
- waiter = self._waiters.popleft()
- if not waiter.cancelled():
- waiter.set_result(returncode)
-
class Process:
def __init__(self, transport, protocol, loop):
@@ -124,30 +112,18 @@ class Process:
@coroutine
def wait(self):
- """Wait until the process exit and return the process return code."""
- returncode = self._transport.get_returncode()
- if returncode is not None:
- return returncode
-
- waiter = futures.Future(loop=self._loop)
- self._protocol._waiters.append(waiter)
- yield from waiter
- return waiter.result()
+ """Wait until the process exit and return the process return code.
- def _check_alive(self):
- if self._transport.get_returncode() is not None:
- raise ProcessLookupError()
+ This method is a coroutine."""
+ return (yield from self._transport._wait())
def send_signal(self, signal):
- self._check_alive()
self._transport.send_signal(signal)
def terminate(self):
- self._check_alive()
self._transport.terminate()
def kill(self):
- self._check_alive()
self._transport.kill()
@coroutine
@@ -221,11 +197,6 @@ def create_subprocess_shell(cmd, stdin=None, stdout=None, stderr=None,
protocol_factory,
cmd, stdin=stdin, stdout=stdout,
stderr=stderr, **kwds)
- try:
- yield from protocol.waiter
- except:
- transport._kill_wait()
- raise
return Process(transport, protocol, loop)
@coroutine
@@ -241,9 +212,4 @@ def create_subprocess_exec(program, *args, stdin=None, stdout=None,
program, *args,
stdin=stdin, stdout=stdout,
stderr=stderr, **kwds)
- try:
- yield from protocol.waiter
- except:
- transport._kill_wait()
- raise
return Process(transport, protocol, loop)
diff --git a/asyncio/tasks.py b/asyncio/tasks.py
index 63412a9..4f19a25 100644
--- a/asyncio/tasks.py
+++ b/asyncio/tasks.py
@@ -592,7 +592,7 @@ def gather(*coros_or_futures, loop=None, return_exceptions=False):
fut.exception()
return
- if fut._state == futures._CANCELLED:
+ if fut.cancelled():
res = futures.CancelledError()
if not return_exceptions:
outer.set_exception(res)
diff --git a/asyncio/test_utils.py b/asyncio/test_utils.py
index 6eedc58..8cee95b 100644
--- a/asyncio/test_utils.py
+++ b/asyncio/test_utils.py
@@ -416,6 +416,10 @@ class TestCase(unittest.TestCase):
def tearDown(self):
events.set_event_loop(None)
+ # Detect CPython bug #23353: ensure that yield/yield-from is not used
+ # in an except block of a generator
+ self.assertEqual(sys.exc_info(), (None, None, None))
+
@contextlib.contextmanager
def disable_logger():
diff --git a/asyncio/unix_events.py b/asyncio/unix_events.py
index 97f9add..75e7c9c 100644
--- a/asyncio/unix_events.py
+++ b/asyncio/unix_events.py
@@ -8,6 +8,7 @@ import stat
import subprocess
import sys
import threading
+import warnings
from . import base_events
@@ -15,6 +16,7 @@ from . import base_subprocess
from . import constants
from . import coroutines
from . import events
+from . import futures
from . import selector_events
from . import selectors
from . import transports
@@ -174,16 +176,28 @@ class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
stdin, stdout, stderr, bufsize,
extra=None, **kwargs):
with events.get_child_watcher() as watcher:
+ waiter = futures.Future(loop=self)
transp = _UnixSubprocessTransport(self, protocol, args, shell,
stdin, stdout, stderr, bufsize,
- extra=extra, **kwargs)
- try:
- yield from transp._post_init()
- except:
- transp.close()
- raise
+ waiter=waiter, extra=extra,
+ **kwargs)
+
watcher.add_child_handler(transp.get_pid(),
self._child_watcher_callback, transp)
+ try:
+ yield from waiter
+ except Exception as exc:
+ # Workaround CPython bug #23353: using yield/yield-from in an
+ # except block of a generator doesn't clear properly
+ # sys.exc_info()
+ err = exc
+ else:
+ err = None
+
+ if err is not None:
+ transp.close()
+ yield from transp._wait()
+ raise err
return transp
@@ -298,10 +312,12 @@ class _UnixReadPipeTransport(transports.ReadTransport):
_set_nonblocking(self._fileno)
self._protocol = protocol
self._closing = False
- self._loop.add_reader(self._fileno, self._read_ready)
self._loop.call_soon(self._protocol.connection_made, self)
+ # only start reading when connection_made() has been called
+ self._loop.call_soon(self._loop.add_reader,
+ self._fileno, self._read_ready)
if waiter is not None:
- # wait until protocol.connection_made() has been called
+ # only wake up the waiter when connection_made() has been called
self._loop.call_soon(waiter._set_result_unless_cancelled, None)
def __repr__(self):
@@ -351,6 +367,15 @@ class _UnixReadPipeTransport(transports.ReadTransport):
if not self._closing:
self._close(None)
+ # On Python 3.3 and older, objects with a destructor part of a reference
+ # cycle are never destroyed. It's not more the case on Python 3.4 thanks
+ # to the PEP 442.
+ if sys.version_info >= (3, 4):
+ def __del__(self):
+ if self._pipe is not None:
+ warnings.warn("unclosed transport %r" % self, ResourceWarning)
+ self._pipe.close()
+
def _fatal_error(self, exc, message='Fatal error on pipe transport'):
# should be called by exception handler only
if (isinstance(exc, OSError) and exc.errno == errno.EIO):
@@ -401,15 +426,18 @@ class _UnixWritePipeTransport(transports._FlowControlMixin,
self._conn_lost = 0
self._closing = False # Set when close() or write_eof() called.
- # On AIX, the reader trick only works for sockets.
- # On other platforms it works for pipes and sockets.
- # (Exception: OS X 10.4? Issue #19294.)
+ self._loop.call_soon(self._protocol.connection_made, self)
+
+ # On AIX, the reader trick (to be notified when the read end of the
+ # socket is closed) only works for sockets. On other platforms it
+ # works for pipes and sockets. (Exception: OS X 10.4? Issue #19294.)
if is_socket or not sys.platform.startswith("aix"):
- self._loop.add_reader(self._fileno, self._read_ready)
+ # only start reading when connection_made() has been called
+ self._loop.call_soon(self._loop.add_reader,
+ self._fileno, self._read_ready)
- self._loop.call_soon(self._protocol.connection_made, self)
if waiter is not None:
- # wait until protocol.connection_made() has been called
+ # only wake up the waiter when connection_made() has been called
self._loop.call_soon(waiter._set_result_unless_cancelled, None)
def __repr__(self):
@@ -524,6 +552,15 @@ class _UnixWritePipeTransport(transports._FlowControlMixin,
# write_eof is all what we needed to close the write pipe
self.write_eof()
+ # On Python 3.3 and older, objects with a destructor part of a reference
+ # cycle are never destroyed. It's not more the case on Python 3.4 thanks
+ # to the PEP 442.
+ if sys.version_info >= (3, 4):
+ def __del__(self):
+ if self._pipe is not None:
+ warnings.warn("unclosed transport %r" % self, ResourceWarning)
+ self._pipe.close()
+
def abort(self):
self._close(None)
@@ -750,7 +787,7 @@ class SafeChildWatcher(BaseChildWatcher):
pass
def add_child_handler(self, pid, callback, *args):
- self._callbacks[pid] = callback, args
+ self._callbacks[pid] = (callback, args)
# Prevent a race condition in case the child is already terminated.
self._do_waitpid(pid)
diff --git a/asyncio/windows_events.py b/asyncio/windows_events.py
index 6c7e058..f311e46 100644
--- a/asyncio/windows_events.py
+++ b/asyncio/windows_events.py
@@ -126,14 +126,12 @@ class _BaseWaitHandleFuture(futures.Future):
return
self._registered = False
+ wait_handle = self._wait_handle
+ self._wait_handle = None
try:
- _overlapped.UnregisterWait(self._wait_handle)
+ _overlapped.UnregisterWait(wait_handle)
except OSError as exc:
- self._wait_handle = None
- if exc.winerror == _overlapped.ERROR_IO_PENDING:
- # ERROR_IO_PENDING is not an error, the wait was unregistered
- self._unregister_wait_cb(None)
- elif exc.winerror != _overlapped.ERROR_IO_PENDING:
+ if exc.winerror != _overlapped.ERROR_IO_PENDING:
context = {
'message': 'Failed to unregister the wait handle',
'exception': exc,
@@ -142,9 +140,10 @@ class _BaseWaitHandleFuture(futures.Future):
if self._source_traceback:
context['source_traceback'] = self._source_traceback
self._loop.call_exception_handler(context)
- else:
- self._wait_handle = None
- self._unregister_wait_cb(None)
+ return
+ # ERROR_IO_PENDING means that the unregister is pending
+
+ self._unregister_wait_cb(None)
def cancel(self):
self._unregister_wait()
@@ -209,14 +208,12 @@ class _WaitHandleFuture(_BaseWaitHandleFuture):
return
self._registered = False
+ wait_handle = self._wait_handle
+ self._wait_handle = None
try:
- _overlapped.UnregisterWaitEx(self._wait_handle, self._event)
+ _overlapped.UnregisterWaitEx(wait_handle, self._event)
except OSError as exc:
- self._wait_handle = None
- if exc.winerror == _overlapped.ERROR_IO_PENDING:
- # ERROR_IO_PENDING is not an error, the wait was unregistered
- self._unregister_wait_cb(None)
- elif exc.winerror != _overlapped.ERROR_IO_PENDING:
+ if exc.winerror != _overlapped.ERROR_IO_PENDING:
context = {
'message': 'Failed to unregister the wait handle',
'exception': exc,
@@ -225,11 +222,11 @@ class _WaitHandleFuture(_BaseWaitHandleFuture):
if self._source_traceback:
context['source_traceback'] = self._source_traceback
self._loop.call_exception_handler(context)
- else:
- self._wait_handle = None
- self._event_fut = self._proactor._wait_cancel(
- self._event,
- self._unregister_wait_cb)
+ return
+ # ERROR_IO_PENDING is not an error, the wait was unregistered
+
+ self._event_fut = self._proactor._wait_cancel(self._event,
+ self._unregister_wait_cb)
class PipeServer(object):
@@ -257,7 +254,7 @@ class PipeServer(object):
def _server_pipe_handle(self, first):
# Return a wrapper for a new pipe handle.
- if self._address is None:
+ if self.closed():
return None
flags = _winapi.PIPE_ACCESS_DUPLEX | _winapi.FILE_FLAG_OVERLAPPED
if first:
@@ -273,6 +270,9 @@ class PipeServer(object):
self._free_instances.add(pipe)
return pipe
+ def closed(self):
+ return (self._address is None)
+
def close(self):
if self._accept_pipe_future is not None:
self._accept_pipe_future.cancel()
@@ -325,12 +325,21 @@ class ProactorEventLoop(proactor_events.BaseProactorEventLoop):
if f:
pipe = f.result()
server._free_instances.discard(pipe)
+
+ if server.closed():
+ # A client connected before the server was closed:
+ # drop the client (close the pipe) and exit
+ pipe.close()
+ return
+
protocol = protocol_factory()
self._make_duplex_pipe_transport(
pipe, protocol, extra={'addr': address})
+
pipe = server._get_unconnected_pipe()
if pipe is None:
return
+
f = self._proactor.accept_pipe(pipe)
except OSError as exc:
if pipe and pipe.fileno() != -1:
@@ -357,14 +366,24 @@ class ProactorEventLoop(proactor_events.BaseProactorEventLoop):
def _make_subprocess_transport(self, protocol, args, shell,
stdin, stdout, stderr, bufsize,
extra=None, **kwargs):
+ waiter = futures.Future(loop=self)
transp = _WindowsSubprocessTransport(self, protocol, args, shell,
stdin, stdout, stderr, bufsize,
- extra=extra, **kwargs)
+ waiter=waiter, extra=extra,
+ **kwargs)
try:
- yield from transp._post_init()
- except:
+ yield from waiter
+ except Exception as exc:
+ # Workaround CPython bug #23353: using yield/yield-from in an
+ # except block of a generator doesn't clear properly sys.exc_info()
+ err = exc
+ else:
+ err = None
+
+ if err is not None:
transp.close()
- raise
+ yield from transp._wait()
+ raise err
return transp
@@ -397,13 +416,21 @@ class IocpProactor:
self._results = []
return tmp
+ def _result(self, value):
+ fut = futures.Future(loop=self._loop)
+ fut.set_result(value)
+ return fut
+
def recv(self, conn, nbytes, flags=0):
self._register_with_iocp(conn)
ov = _overlapped.Overlapped(NULL)
- if isinstance(conn, socket.socket):
- ov.WSARecv(conn.fileno(), nbytes, flags)
- else:
- ov.ReadFile(conn.fileno(), nbytes)
+ try:
+ if isinstance(conn, socket.socket):
+ ov.WSARecv(conn.fileno(), nbytes, flags)
+ else:
+ ov.ReadFile(conn.fileno(), nbytes)
+ except BrokenPipeError:
+ return self._result(b'')
def finish_recv(trans, key, ov):
try:
@@ -496,9 +523,7 @@ class IocpProactor:
# ConnectNamePipe() failed with ERROR_PIPE_CONNECTED which means
# that the pipe is connected. There is no need to wait for the
# completion of the connection.
- f = futures.Future(loop=self._loop)
- f.set_result(pipe)
- return f
+ return self._result(pipe)
def finish_accept_pipe(trans, key, ov):
ov.getresult()
@@ -506,28 +531,25 @@ class IocpProactor:
return self._register(ov, pipe, finish_accept_pipe)
- def _connect_pipe(self, fut, address, delay):
- # Unfortunately there is no way to do an overlapped connect to a pipe.
- # Call CreateFile() in a loop until it doesn't fail with
- # ERROR_PIPE_BUSY
- try:
- handle = _overlapped.ConnectPipe(address)
- except OSError as exc:
- if exc.winerror == _overlapped.ERROR_PIPE_BUSY:
- # Polling: retry later
- delay = min(delay * 2, CONNECT_PIPE_MAX_DELAY)
- self._loop.call_later(delay,
- self._connect_pipe, fut, address, delay)
- else:
- fut.set_exception(exc)
- else:
- pipe = windows_utils.PipeHandle(handle)
- fut.set_result(pipe)
-
+ @coroutine
def connect_pipe(self, address):
- fut = futures.Future(loop=self._loop)
- self._connect_pipe(fut, address, CONNECT_PIPE_INIT_DELAY)
- return fut
+ delay = CONNECT_PIPE_INIT_DELAY
+ while True:
+ # Unfortunately there is no way to do an overlapped connect to a pipe.
+ # Call CreateFile() in a loop until it doesn't fail with
+ # ERROR_PIPE_BUSY
+ try:
+ handle = _overlapped.ConnectPipe(address)
+ break
+ except OSError as exc:
+ if exc.winerror != _overlapped.ERROR_PIPE_BUSY:
+ raise
+
+ # ConnectPipe() failed with ERROR_PIPE_BUSY: retry later
+ delay = min(delay * 2, CONNECT_PIPE_MAX_DELAY)
+ yield from tasks.sleep(delay, loop=self._loop)
+
+ return windows_utils.PipeHandle(handle)
def wait_for_handle(self, handle, timeout=None):
"""Wait for a handle.
diff --git a/asyncio/windows_utils.py b/asyncio/windows_utils.py
index e664296..870cd13 100644
--- a/asyncio/windows_utils.py
+++ b/asyncio/windows_utils.py
@@ -14,6 +14,7 @@ import os
import socket
import subprocess
import tempfile
+import warnings
__all__ = ['socketpair', 'pipe', 'Popen', 'PIPE', 'PipeHandle']
@@ -147,6 +148,8 @@ class PipeHandle:
return self._handle
def fileno(self):
+ if self._handle is None:
+ raise ValueError("I/O operatioon on closed pipe")
return self._handle
def close(self, *, CloseHandle=_winapi.CloseHandle):
@@ -154,7 +157,10 @@ class PipeHandle:
CloseHandle(self._handle)
self._handle = None
- __del__ = close
+ def __del__(self):
+ if self._handle is not None:
+ warnings.warn("unclosed %r" % self, ResourceWarning)
+ self.close()
def __enter__(self):
return self
diff --git a/examples/shell.py b/examples/shell.py
index 7dc7caf..f934325 100644
--- a/examples/shell.py
+++ b/examples/shell.py
@@ -36,12 +36,14 @@ def ls(loop):
@asyncio.coroutine
def test_call(*args, timeout=None):
+ proc = yield from asyncio.create_subprocess_exec(*args)
try:
- proc = yield from asyncio.create_subprocess_exec(*args)
exitcode = yield from asyncio.wait_for(proc.wait(), timeout)
print("%s: exit code %s" % (' '.join(args), exitcode))
except asyncio.TimeoutError:
print("timeout! (%.1f sec)" % timeout)
+ proc.kill()
+ yield from proc.wait()
loop = asyncio.get_event_loop()
loop.run_until_complete(cat(loop))
diff --git a/examples/subprocess_attach_write_pipe.py b/examples/subprocess_attach_write_pipe.py
index 8614877..c4e099f 100644
--- a/examples/subprocess_attach_write_pipe.py
+++ b/examples/subprocess_attach_write_pipe.py
@@ -29,7 +29,7 @@ def task():
stdout, stderr = yield from proc.communicate()
print("stdout = %r" % stdout.decode())
- pipe.close()
+ transport.close()
loop.run_until_complete(task())
loop.close()
diff --git a/overlapped.c b/overlapped.c
index 4661152..ef77c88 100644
--- a/overlapped.c
+++ b/overlapped.c
@@ -730,7 +730,7 @@ Overlapped_ReadFile(OverlappedObject *self, PyObject *args)
switch (err) {
case ERROR_BROKEN_PIPE:
mark_as_completed(&self->overlapped);
- Py_RETURN_NONE;
+ return SetFromWindowsErr(err);
case ERROR_SUCCESS:
case ERROR_MORE_DATA:
case ERROR_IO_PENDING:
@@ -789,7 +789,7 @@ Overlapped_WSARecv(OverlappedObject *self, PyObject *args)
switch (err) {
case ERROR_BROKEN_PIPE:
mark_as_completed(&self->overlapped);
- Py_RETURN_NONE;
+ return SetFromWindowsErr(err);
case ERROR_SUCCESS:
case ERROR_MORE_DATA:
case ERROR_IO_PENDING:
@@ -1146,10 +1146,13 @@ ConnectPipe(OverlappedObject *self, PyObject *args)
if (Address == NULL)
return NULL;
+ Py_BEGIN_ALLOW_THREADS
PipeHandle = CreateFileW(Address,
GENERIC_READ | GENERIC_WRITE,
0, NULL, OPEN_EXISTING,
FILE_FLAG_OVERLAPPED, NULL);
+ Py_END_ALLOW_THREADS
+
PyMem_Free(Address);
if (PipeHandle == INVALID_HANDLE_VALUE)
return SetFromWindowsErr(0);
diff --git a/tests/test_events.py b/tests/test_events.py
index a38c90e..8fbba8f 100644
--- a/tests/test_events.py
+++ b/tests/test_events.py
@@ -886,13 +886,18 @@ class EventLoopTestsMixin:
if hasattr(sslcontext_client, 'check_hostname'):
sslcontext_client.check_hostname = True
+
# no CA loaded
f_c = self.loop.create_connection(MyProto, host, port,
ssl=sslcontext_client)
- with test_utils.disable_logger():
- with self.assertRaisesRegex(ssl.SSLError,
- 'certificate verify failed '):
- self.loop.run_until_complete(f_c)
+ with mock.patch.object(self.loop, 'call_exception_handler'):
+ with test_utils.disable_logger():
+ with self.assertRaisesRegex(ssl.SSLError,
+ 'certificate verify failed '):
+ self.loop.run_until_complete(f_c)
+
+ # execute the loop to log the connection error
+ test_utils.run_briefly(self.loop)
# close connection
self.assertIsNone(proto.transport)
@@ -919,15 +924,20 @@ class EventLoopTestsMixin:
f_c = self.loop.create_unix_connection(MyProto, path,
ssl=sslcontext_client,
server_hostname='invalid')
- with test_utils.disable_logger():
- with self.assertRaisesRegex(ssl.SSLError,
- 'certificate verify failed '):
- self.loop.run_until_complete(f_c)
+ with mock.patch.object(self.loop, 'call_exception_handler'):
+ with test_utils.disable_logger():
+ with self.assertRaisesRegex(ssl.SSLError,
+ 'certificate verify failed '):
+ self.loop.run_until_complete(f_c)
+
+ # execute the loop to log the connection error
+ test_utils.run_briefly(self.loop)
# close connection
self.assertIsNone(proto.transport)
server.close()
+
def test_legacy_create_unix_server_ssl_verify_failed(self):
with test_utils.force_legacy_ssl_support():
self.test_create_unix_server_ssl_verify_failed()
@@ -949,11 +959,12 @@ class EventLoopTestsMixin:
# incorrect server_hostname
f_c = self.loop.create_connection(MyProto, host, port,
ssl=sslcontext_client)
- with test_utils.disable_logger():
- with self.assertRaisesRegex(
- ssl.CertificateError,
- "hostname '127.0.0.1' doesn't match 'localhost'"):
- self.loop.run_until_complete(f_c)
+ with mock.patch.object(self.loop, 'call_exception_handler'):
+ with test_utils.disable_logger():
+ with self.assertRaisesRegex(
+ ssl.CertificateError,
+ "hostname '127.0.0.1' doesn't match 'localhost'"):
+ self.loop.run_until_complete(f_c)
# close connection
proto.transport.close()
@@ -1426,6 +1437,10 @@ class EventLoopTestsMixin:
'selector': self.loop._selector.__class__.__name__})
def test_sock_connect_address(self):
+ # In debug mode, sock_connect() must ensure that the address is already
+ # resolved (call _check_resolved_address())
+ self.loop.set_debug(True)
+
addresses = [(socket.AF_INET, ('www.python.org', 80))]
if support.IPV6_ENABLED:
addresses.extend((
@@ -1540,9 +1555,10 @@ class SubprocessTestsMixin:
stdin = transp.get_pipe_transport(0)
stdin.write(b'Python The Winner')
self.loop.run_until_complete(proto.got_data[1].wait())
- transp.close()
+ with test_utils.disable_logger():
+ transp.close()
self.loop.run_until_complete(proto.completed)
- self.check_terminated(proto.returncode)
+ self.check_killed(proto.returncode)
self.assertEqual(b'Python The Winner', proto.data[1])
def test_subprocess_interactive(self):
@@ -1556,21 +1572,20 @@ class SubprocessTestsMixin:
self.loop.run_until_complete(proto.connected)
self.assertEqual('CONNECTED', proto.state)
- try:
- stdin = transp.get_pipe_transport(0)
- stdin.write(b'Python ')
- self.loop.run_until_complete(proto.got_data[1].wait())
- proto.got_data[1].clear()
- self.assertEqual(b'Python ', proto.data[1])
-
- stdin.write(b'The Winner')
- self.loop.run_until_complete(proto.got_data[1].wait())
- self.assertEqual(b'Python The Winner', proto.data[1])
- finally:
- transp.close()
+ stdin = transp.get_pipe_transport(0)
+ stdin.write(b'Python ')
+ self.loop.run_until_complete(proto.got_data[1].wait())
+ proto.got_data[1].clear()
+ self.assertEqual(b'Python ', proto.data[1])
+ stdin.write(b'The Winner')
+ self.loop.run_until_complete(proto.got_data[1].wait())
+ self.assertEqual(b'Python The Winner', proto.data[1])
+
+ with test_utils.disable_logger():
+ transp.close()
self.loop.run_until_complete(proto.completed)
- self.check_terminated(proto.returncode)
+ self.check_killed(proto.returncode)
def test_subprocess_shell(self):
connect = self.loop.subprocess_shell(
@@ -1728,9 +1743,10 @@ class SubprocessTestsMixin:
# GetLastError()==ERROR_INVALID_NAME on Windows!?! (Using
# WriteFile() we get ERROR_BROKEN_PIPE as expected.)
self.assertEqual(b'ERR:OSError', proto.data[2])
- transp.close()
+ with test_utils.disable_logger():
+ transp.close()
self.loop.run_until_complete(proto.completed)
- self.check_terminated(proto.returncode)
+ self.check_killed(proto.returncode)
def test_subprocess_wait_no_same_group(self):
# start the new process in a new session
diff --git a/tests/test_proactor_events.py b/tests/test_proactor_events.py
index 33a8a67..fcd9ab1 100644
--- a/tests/test_proactor_events.py
+++ b/tests/test_proactor_events.py
@@ -499,8 +499,12 @@ class BaseProactorEventLoopTests(test_utils.TestCase):
self.proactor.accept.assert_called_with(self.sock)
def test_socketpair(self):
+ class EventLoop(BaseProactorEventLoop):
+ # override the destructor to not log a ResourceWarning
+ def __del__(self):
+ pass
self.assertRaises(
- NotImplementedError, BaseProactorEventLoop, self.proactor)
+ NotImplementedError, EventLoop, self.proactor)
def test_make_socket_transport(self):
tr = self.loop._make_socket_transport(self.sock, asyncio.Protocol())
diff --git a/tests/test_selector_events.py b/tests/test_selector_events.py
index ad86ada..f64e40d 100644
--- a/tests/test_selector_events.py
+++ b/tests/test_selector_events.py
@@ -59,6 +59,7 @@ class BaseSelectorEventLoopTests(test_utils.TestCase):
def test_make_socket_transport(self):
m = mock.Mock()
self.loop.add_reader = mock.Mock()
+ self.loop.add_reader._is_coroutine = False
transport = self.loop._make_socket_transport(m, asyncio.Protocol())
self.assertIsInstance(transport, _SelectorSocketTransport)
close_transport(transport)
@@ -67,6 +68,7 @@ class BaseSelectorEventLoopTests(test_utils.TestCase):
def test_make_ssl_transport(self):
m = mock.Mock()
self.loop.add_reader = mock.Mock()
+ self.loop.add_reader._is_coroutine = False
self.loop.add_writer = mock.Mock()
self.loop.remove_reader = mock.Mock()
self.loop.remove_writer = mock.Mock()
@@ -770,20 +772,24 @@ class SelectorSocketTransportTests(test_utils.TestCase):
return transport
def test_ctor(self):
- tr = self.socket_transport()
+ waiter = asyncio.Future(loop=self.loop)
+ tr = self.socket_transport(waiter=waiter)
+ self.loop.run_until_complete(waiter)
+
self.loop.assert_reader(7, tr._read_ready)
test_utils.run_briefly(self.loop)
self.protocol.connection_made.assert_called_with(tr)
def test_ctor_with_waiter(self):
- fut = asyncio.Future(loop=self.loop)
+ waiter = asyncio.Future(loop=self.loop)
+ self.socket_transport(waiter=waiter)
+ self.loop.run_until_complete(waiter)
- self.socket_transport(waiter=fut)
- test_utils.run_briefly(self.loop)
- self.assertIsNone(fut.result())
+ self.assertIsNone(waiter.result())
def test_pause_resume_reading(self):
tr = self.socket_transport()
+ test_utils.run_briefly(self.loop)
self.assertFalse(tr._paused)
self.loop.assert_reader(7, tr._read_ready)
tr.pause_reading()
@@ -1421,7 +1427,7 @@ class SelectorSslTransportTests(test_utils.TestCase):
self.assertFalse(tr.can_write_eof())
self.assertRaises(NotImplementedError, tr.write_eof)
- def test_close(self):
+ def check_close(self):
tr = self._make_one()
tr.close()
@@ -1433,6 +1439,19 @@ class SelectorSslTransportTests(test_utils.TestCase):
self.assertEqual(tr._conn_lost, 1)
self.assertEqual(1, self.loop.remove_reader_count[1])
+ test_utils.run_briefly(self.loop)
+
+ def test_close(self):
+ self.check_close()
+ self.assertTrue(self.protocol.connection_made.called)
+ self.assertTrue(self.protocol.connection_lost.called)
+
+ def test_close_not_connected(self):
+ self.sslsock.do_handshake.side_effect = ssl.SSLWantReadError
+ self.check_close()
+ self.assertFalse(self.protocol.connection_made.called)
+ self.assertFalse(self.protocol.connection_lost.called)
+
@unittest.skipIf(ssl is None, 'No SSL support')
def test_server_hostname(self):
self.ssl_transport(server_hostname='localhost')
diff --git a/tests/test_sslproto.py b/tests/test_sslproto.py
index 812dedb..a72967e 100644
--- a/tests/test_sslproto.py
+++ b/tests/test_sslproto.py
@@ -2,26 +2,48 @@
import unittest
from unittest import mock
+try:
+ import ssl
+except ImportError:
+ ssl = None
import asyncio
from asyncio import sslproto
from asyncio import test_utils
+@unittest.skipIf(ssl is None, 'No ssl module')
class SslProtoHandshakeTests(test_utils.TestCase):
def setUp(self):
self.loop = asyncio.new_event_loop()
self.set_event_loop(self.loop)
+ def ssl_protocol(self, waiter=None):
+ sslcontext = test_utils.dummy_ssl_context()
+ app_proto = asyncio.Protocol()
+ proto = sslproto.SSLProtocol(self.loop, app_proto, sslcontext, waiter)
+ self.addCleanup(proto._app_transport.close)
+ return proto
+
+ def connection_made(self, ssl_proto, do_handshake=None):
+ transport = mock.Mock()
+ sslpipe = mock.Mock()
+ sslpipe.shutdown.return_value = b''
+ if do_handshake:
+ sslpipe.do_handshake.side_effect = do_handshake
+ else:
+ def mock_handshake(callback):
+ return []
+ sslpipe.do_handshake.side_effect = mock_handshake
+ with mock.patch('asyncio.sslproto._SSLPipe', return_value=sslpipe):
+ ssl_proto.connection_made(transport)
+
def test_cancel_handshake(self):
# Python issue #23197: cancelling an handshake must not raise an
# exception or log an error, even if the handshake failed
- sslcontext = test_utils.dummy_ssl_context()
- app_proto = asyncio.Protocol()
waiter = asyncio.Future(loop=self.loop)
- ssl_proto = sslproto.SSLProtocol(self.loop, app_proto, sslcontext,
- waiter)
+ ssl_proto = self.ssl_protocol(waiter)
handshake_fut = asyncio.Future(loop=self.loop)
def do_handshake(callback):
@@ -31,18 +53,18 @@ class SslProtoHandshakeTests(test_utils.TestCase):
return []
waiter.cancel()
- transport = mock.Mock()
- sslpipe = mock.Mock()
- sslpipe.shutdown.return_value = b''
- sslpipe.do_handshake.side_effect = do_handshake
- with mock.patch('asyncio.sslproto._SSLPipe', return_value=sslpipe):
- ssl_proto.connection_made(transport)
+ self.connection_made(ssl_proto, do_handshake)
with test_utils.disable_logger():
self.loop.run_until_complete(handshake_fut)
- # Close the transport
- ssl_proto._app_transport.close()
+ def test_eof_received_waiter(self):
+ waiter = asyncio.Future(loop=self.loop)
+ ssl_proto = self.ssl_protocol(waiter)
+ self.connection_made(ssl_proto)
+ ssl_proto.eof_received()
+ test_utils.run_briefly(self.loop)
+ self.assertIsInstance(waiter.exception(), ConnectionResetError)
if __name__ == '__main__':
diff --git a/tests/test_subprocess.py b/tests/test_subprocess.py
index ecc2c9d..de0b08a 100644
--- a/tests/test_subprocess.py
+++ b/tests/test_subprocess.py
@@ -4,6 +4,7 @@ import unittest
from unittest import mock
import asyncio
+from asyncio import base_subprocess
from asyncio import subprocess
from asyncio import test_utils
try:
@@ -23,6 +24,56 @@ PROGRAM_CAT = [
'data = sys.stdin.buffer.read()',
'sys.stdout.buffer.write(data)'))]
+class TestSubprocessTransport(base_subprocess.BaseSubprocessTransport):
+ def _start(self, *args, **kwargs):
+ self._proc = mock.Mock()
+ self._proc.stdin = None
+ self._proc.stdout = None
+ self._proc.stderr = None
+
+
+class SubprocessTransportTests(test_utils.TestCase):
+ def setUp(self):
+ self.loop = self.new_test_loop()
+ self.set_event_loop(self.loop)
+
+
+ def create_transport(self, waiter=None):
+ protocol = mock.Mock()
+ protocol.connection_made._is_coroutine = False
+ protocol.process_exited._is_coroutine = False
+ transport = TestSubprocessTransport(
+ self.loop, protocol, ['test'], False,
+ None, None, None, 0, waiter=waiter)
+ return (transport, protocol)
+
+ def test_proc_exited(self):
+ waiter = asyncio.Future(loop=self.loop)
+ transport, protocol = self.create_transport(waiter)
+ transport._process_exited(6)
+ self.loop.run_until_complete(waiter)
+
+ self.assertEqual(transport.get_returncode(), 6)
+
+ self.assertTrue(protocol.connection_made.called)
+ self.assertTrue(protocol.process_exited.called)
+ self.assertTrue(protocol.connection_lost.called)
+ self.assertEqual(protocol.connection_lost.call_args[0], (None,))
+
+ self.assertFalse(transport._closed)
+ self.assertIsNone(transport._loop)
+ self.assertIsNone(transport._proc)
+ self.assertIsNone(transport._protocol)
+
+ # methods must raise ProcessLookupError if the process exited
+ self.assertRaises(ProcessLookupError,
+ transport.send_signal, signal.SIGTERM)
+ self.assertRaises(ProcessLookupError, transport.terminate)
+ self.assertRaises(ProcessLookupError, transport.kill)
+
+ transport.close()
+
+
class SubprocessMixin:
def test_stdin_stdout(self):
@@ -298,6 +349,61 @@ class SubprocessMixin:
self.loop.run_until_complete(cancel_make_transport())
test_utils.run_briefly(self.loop)
+ def test_close_kill_running(self):
+ @asyncio.coroutine
+ def kill_running():
+ create = self.loop.subprocess_exec(asyncio.SubprocessProtocol,
+ *PROGRAM_BLOCKED)
+ transport, protocol = yield from create
+ proc = transport.get_extra_info('subprocess')
+ proc.kill = mock.Mock()
+ returncode = transport.get_returncode()
+ transport.close()
+ return (returncode, proc.kill.called)
+
+ # Ignore "Close running child process: kill ..." log
+ with test_utils.disable_logger():
+ returncode, killed = self.loop.run_until_complete(kill_running())
+ self.assertIsNone(returncode)
+
+ # transport.close() must kill the process if it is still running
+ self.assertTrue(killed)
+ test_utils.run_briefly(self.loop)
+
+ def test_close_dont_kill_finished(self):
+ @asyncio.coroutine
+ def kill_running():
+ create = self.loop.subprocess_exec(asyncio.SubprocessProtocol,
+ *PROGRAM_BLOCKED)
+ transport, protocol = yield from create
+ proc = transport.get_extra_info('subprocess')
+
+ # kill the process (but asyncio is not notified immediatly)
+ proc.kill()
+ proc.wait()
+
+ proc.kill = mock.Mock()
+ proc_returncode = proc.poll()
+ transport_returncode = transport.get_returncode()
+ transport.close()
+ return (proc_returncode, transport_returncode, proc.kill.called)
+
+ # Ignore "Unknown child process pid ..." log of SafeChildWatcher,
+ # emitted because the test already consumes the exit status:
+ # proc.wait()
+ with test_utils.disable_logger():
+ result = self.loop.run_until_complete(kill_running())
+ test_utils.run_briefly(self.loop)
+
+ proc_returncode, transport_return_code, killed = result
+
+ self.assertIsNotNone(proc_returncode)
+ self.assertIsNone(transport_return_code)
+
+ # transport.close() must not kill the process if it finished, even if
+ # the transport was not notified yet
+ self.assertFalse(killed)
+
if sys.platform != 'win32':
# Unix
diff --git a/tests/test_unix_events.py b/tests/test_unix_events.py
index 126196d..41249ff 100644
--- a/tests/test_unix_events.py
+++ b/tests/test_unix_events.py
@@ -350,16 +350,13 @@ class UnixReadPipeTransportTests(test_utils.TestCase):
return transport
def test_ctor(self):
- tr = self.read_pipe_transport()
- self.loop.assert_reader(5, tr._read_ready)
- test_utils.run_briefly(self.loop)
- self.protocol.connection_made.assert_called_with(tr)
+ waiter = asyncio.Future(loop=self.loop)
+ tr = self.read_pipe_transport(waiter=waiter)
+ self.loop.run_until_complete(waiter)
- def test_ctor_with_waiter(self):
- fut = asyncio.Future(loop=self.loop)
- tr = self.read_pipe_transport(waiter=fut)
- test_utils.run_briefly(self.loop)
- self.assertIsNone(fut.result())
+ self.protocol.connection_made.assert_called_with(tr)
+ self.loop.assert_reader(5, tr._read_ready)
+ self.assertIsNone(waiter.result())
@mock.patch('os.read')
def test__read_ready(self, m_read):
@@ -502,17 +499,13 @@ class UnixWritePipeTransportTests(test_utils.TestCase):
return transport
def test_ctor(self):
- tr = self.write_pipe_transport()
- self.loop.assert_reader(5, tr._read_ready)
- test_utils.run_briefly(self.loop)
- self.protocol.connection_made.assert_called_with(tr)
+ waiter = asyncio.Future(loop=self.loop)
+ tr = self.write_pipe_transport(waiter=waiter)
+ self.loop.run_until_complete(waiter)
- def test_ctor_with_waiter(self):
- fut = asyncio.Future(loop=self.loop)
- tr = self.write_pipe_transport(waiter=fut)
+ self.protocol.connection_made.assert_called_with(tr)
self.loop.assert_reader(5, tr._read_ready)
- test_utils.run_briefly(self.loop)
- self.assertEqual(None, fut.result())
+ self.assertEqual(None, waiter.result())
def test_can_write_eof(self):
tr = self.write_pipe_transport()
diff --git a/tests/test_windows_events.py b/tests/test_windows_events.py
index f9b3dd1..73d8fcd 100644
--- a/tests/test_windows_events.py
+++ b/tests/test_windows_events.py
@@ -1,6 +1,7 @@
import os
import sys
import unittest
+from unittest import mock
if sys.platform != 'win32':
raise unittest.SkipTest('Windows only')
@@ -91,6 +92,18 @@ class ProactorTests(test_utils.TestCase):
return 'done'
+ def test_connect_pipe_cancel(self):
+ exc = OSError()
+ exc.winerror = _overlapped.ERROR_PIPE_BUSY
+ with mock.patch.object(_overlapped, 'ConnectPipe', side_effect=exc) as connect:
+ coro = self.loop._proactor.connect_pipe('pipe_address')
+ task = self.loop.create_task(coro)
+
+ # check that it's possible to cancel connect_pipe()
+ task.cancel()
+ with self.assertRaises(asyncio.CancelledError):
+ self.loop.run_until_complete(task)
+
def test_wait_for_handle(self):
event = _overlapped.CreateEvent(None, True, False, None)
self.addCleanup(_winapi.CloseHandle, event)
diff --git a/tox.ini b/tox.ini
index 6209ff4..3030441 100644
--- a/tox.ini
+++ b/tox.ini
@@ -8,8 +8,8 @@ deps=
setenv =
PYTHONASYNCIODEBUG = 1
commands=
- python runtests.py -r {posargs}
- python run_aiotest.py -r {posargs}
+ python -Wd runtests.py -r {posargs}
+ python -Wd run_aiotest.py -r {posargs}
[testenv:py3_release]
# Run tests in release mode