diff options
author | Victor Stinner <victor.stinner@gmail.com> | 2014-12-05 00:38:12 +0100 |
---|---|---|
committer | Victor Stinner <victor.stinner@gmail.com> | 2014-12-05 00:38:12 +0100 |
commit | 97a31fe993354b79b4821c742f5cde14dd5f5021 (patch) | |
tree | e1b85c3ab0ebd406d8eca5a67c8c3b162029b457 | |
parent | 470eab045caa79b9ac94f194c57a6184440d37f1 (diff) | |
parent | f814e5c11ce268017f448db01b51bc627d369fe3 (diff) | |
download | trollius-97a31fe993354b79b4821c742f5cde14dd5f5021.tar.gz |
Merge Tulip into Trollius
-rw-r--r-- | tests/test_events.py | 103 | ||||
-rw-r--r-- | tests/test_selector_events.py | 2 | ||||
-rw-r--r-- | tests/test_subprocess.py | 37 | ||||
-rw-r--r-- | tests/test_windows_events.py | 2 | ||||
-rw-r--r-- | trollius/base_events.py | 4 | ||||
-rw-r--r-- | trollius/futures.py | 5 | ||||
-rw-r--r-- | trollius/py33_exceptions.py | 7 | ||||
-rw-r--r-- | trollius/py3_ssl.py | 31 | ||||
-rw-r--r-- | trollius/selector_events.py | 2 | ||||
-rw-r--r-- | trollius/subprocess.py | 17 | ||||
-rw-r--r-- | trollius/tasks.py | 9 | ||||
-rw-r--r-- | trollius/unix_events.py | 1 |
12 files changed, 152 insertions, 68 deletions
diff --git a/tests/test_events.py b/tests/test_events.py index 39b30b2..b2e6750 100644 --- a/tests/test_events.py +++ b/tests/test_events.py @@ -20,9 +20,6 @@ try: import ssl except ImportError: ssl = None - HAS_SNI = False -else: - HAS_SNI = getattr(ssl, 'HAS_SNI', False) try: import concurrent @@ -243,7 +240,8 @@ class EventLoopTestsMixin(object): def tearDown(self): # just in case if we have transport close callbacks - test_utils.run_briefly(self.loop) + if not self.loop.is_closed(): + test_utils.run_briefly(self.loop) self.loop.close() gc.collect() @@ -876,15 +874,14 @@ class EventLoopTestsMixin(object): server.close() @test_utils.skipIf(ssl is None, 'No ssl module') - @test_utils.skipUnless(HAS_SNI, 'No SNI support in ssl module') + @test_utils.skipIf(asyncio.BACKPORT_SSL_CONTEXT, 'need ssl.SSLContext') def test_create_server_ssl_verify_failed(self): proto = MyProto(loop=self.loop) server, host, port = self._make_ssl_server( lambda: proto, SIGNED_CERTFILE) sslcontext_client = asyncio.SSLContext(ssl.PROTOCOL_SSLv23) - if not asyncio.BACKPORT_SSL_CONTEXT: - sslcontext_client.options |= ssl.OP_NO_SSLv2 + sslcontext_client.options |= ssl.OP_NO_SSLv2 sslcontext_client.verify_mode = ssl.CERT_REQUIRED if hasattr(sslcontext_client, 'check_hostname'): sslcontext_client.check_hostname = True @@ -902,16 +899,15 @@ class EventLoopTestsMixin(object): server.close() @test_utils.skipIf(ssl is None, 'No ssl module') - @test_utils.skipUnless(HAS_SNI, 'No SNI support in ssl module') @test_utils.skipUnless(hasattr(socket, 'AF_UNIX'), 'No UNIX Sockets') + @test_utils.skipIf(asyncio.BACKPORT_SSL_CONTEXT, 'need ssl.SSLContext') def test_create_unix_server_ssl_verify_failed(self): proto = MyProto(loop=self.loop) server, path = self._make_ssl_unix_server( lambda: proto, SIGNED_CERTFILE) sslcontext_client = asyncio.SSLContext(ssl.PROTOCOL_SSLv23) - if not asyncio.BACKPORT_SSL_CONTEXT: - sslcontext_client.options |= ssl.OP_NO_SSLv2 + sslcontext_client.options |= ssl.OP_NO_SSLv2 sslcontext_client.verify_mode = ssl.CERT_REQUIRED if hasattr(sslcontext_client, 'check_hostname'): sslcontext_client.check_hostname = True @@ -930,7 +926,6 @@ class EventLoopTestsMixin(object): server.close() @test_utils.skipIf(ssl is None, 'No ssl module') - @test_utils.skipUnless(HAS_SNI, 'No SNI support in ssl module') def test_create_server_ssl_match_failed(self): proto = MyProto(loop=self.loop) server, host, port = self._make_ssl_server( @@ -939,9 +934,9 @@ class EventLoopTestsMixin(object): sslcontext_client = asyncio.SSLContext(ssl.PROTOCOL_SSLv23) if not asyncio.BACKPORT_SSL_CONTEXT: sslcontext_client.options |= ssl.OP_NO_SSLv2 - sslcontext_client.verify_mode = ssl.CERT_REQUIRED - sslcontext_client.load_verify_locations( - cafile=SIGNING_CA) + sslcontext_client.verify_mode = ssl.CERT_REQUIRED + sslcontext_client.load_verify_locations( + cafile=SIGNING_CA) if hasattr(sslcontext_client, 'check_hostname'): sslcontext_client.check_hostname = True @@ -952,20 +947,21 @@ class EventLoopTestsMixin(object): err_msg = "hostname '127.0.0.1' doesn't match u'localhost'" # incorrect server_hostname - f_c = self.loop.create_connection(MyProto, host, port, - ssl=sslcontext_client) - with test_utils.disable_logger(): - with self.assertRaisesRegex( - ssl.CertificateError, - err_msg): - self.loop.run_until_complete(f_c) + if not asyncio.BACKPORT_SSL_CONTEXT: + f_c = self.loop.create_connection(MyProto, host, port, + ssl=sslcontext_client) + with test_utils.disable_logger(): + with self.assertRaisesRegex( + ssl.CertificateError, + err_msg): + self.loop.run_until_complete(f_c) + + # close connection + proto.transport.close() - # close connection - proto.transport.close() server.close() @test_utils.skipIf(ssl is None, 'No ssl module') - @test_utils.skipUnless(HAS_SNI, 'No SNI support in ssl module') @test_utils.skipUnless(hasattr(socket, 'AF_UNIX'), 'No UNIX Sockets') def test_create_unix_server_ssl_verified(self): proto = MyProto(loop=self.loop) @@ -975,8 +971,8 @@ class EventLoopTestsMixin(object): sslcontext_client = asyncio.SSLContext(ssl.PROTOCOL_SSLv23) if not asyncio.BACKPORT_SSL_CONTEXT: sslcontext_client.options |= ssl.OP_NO_SSLv2 - sslcontext_client.verify_mode = ssl.CERT_REQUIRED - sslcontext_client.load_verify_locations(cafile=SIGNING_CA) + sslcontext_client.verify_mode = ssl.CERT_REQUIRED + sslcontext_client.load_verify_locations(cafile=SIGNING_CA) if hasattr(sslcontext_client, 'check_hostname'): sslcontext_client.check_hostname = True @@ -992,7 +988,6 @@ class EventLoopTestsMixin(object): server.close() @test_utils.skipIf(ssl is None, 'No ssl module') - @test_utils.skipUnless(HAS_SNI, 'No SNI support in ssl module') def test_create_server_ssl_verified(self): proto = MyProto(loop=self.loop) server, host, port = self._make_ssl_server( @@ -1001,20 +996,22 @@ class EventLoopTestsMixin(object): sslcontext_client = asyncio.SSLContext(ssl.PROTOCOL_SSLv23) if not asyncio.BACKPORT_SSL_CONTEXT: sslcontext_client.options |= ssl.OP_NO_SSLv2 - sslcontext_client.verify_mode = ssl.CERT_REQUIRED - sslcontext_client.load_verify_locations(cafile=SIGNING_CA) + sslcontext_client.verify_mode = ssl.CERT_REQUIRED + sslcontext_client.load_verify_locations(cafile=SIGNING_CA) if hasattr(sslcontext_client, 'check_hostname'): sslcontext_client.check_hostname = True - # Connection succeeds with correct CA and server hostname. - f_c = self.loop.create_connection(MyProto, host, port, - ssl=sslcontext_client, - server_hostname='localhost') - client, pr = self.loop.run_until_complete(f_c) + if not asyncio.BACKPORT_SSL_CONTEXT: + # Connection succeeds with correct CA and server hostname. + f_c = self.loop.create_connection(MyProto, host, port, + ssl=sslcontext_client, + server_hostname='localhost') + client, pr = self.loop.run_until_complete(f_c) + + # close connection + proto.transport.close() + client.close() - # close connection - proto.transport.close() - client.close() server.close() def test_create_server_sock(self): @@ -1478,6 +1475,38 @@ class EventLoopTestsMixin(object): with self.assertRaises(RuntimeError): self.loop.run_until_complete(coro) + def test_close(self): + self.loop.close() + + @asyncio.coroutine + def test(): + pass + + func = lambda: False + coro = test() + self.addCleanup(coro.close) + + # operation blocked when the loop is closed + with self.assertRaises(RuntimeError): + self.loop.run_forever() + with self.assertRaises(RuntimeError): + fut = asyncio.Future(loop=self.loop) + self.loop.run_until_complete(fut) + with self.assertRaises(RuntimeError): + self.loop.call_soon(func) + with self.assertRaises(RuntimeError): + self.loop.call_soon_threadsafe(func) + with self.assertRaises(RuntimeError): + self.loop.call_later(1.0, func) + with self.assertRaises(RuntimeError): + self.loop.call_at(self.loop.time() + .0, func) + with self.assertRaises(RuntimeError): + self.loop.run_in_executor(None, func) + with self.assertRaises(RuntimeError): + self.loop.create_task(coro) + with self.assertRaises(RuntimeError): + self.loop.add_signal_handler(signal.SIGTERM, func) + class SubprocessTestsMixin(object): diff --git a/tests/test_selector_events.py b/tests/test_selector_events.py index 7dc1b5a..38a8c83 100644 --- a/tests/test_selector_events.py +++ b/tests/test_selector_events.py @@ -1430,7 +1430,7 @@ class SelectorSslTransportTests(test_utils.TestCase): self.assertEqual(tr._conn_lost, 1) self.assertEqual(1, self.loop.remove_reader_count[1]) - @test_utils.skipIf(ssl is None or not HAS_SNI, 'No SNI support') + @test_utils.skipIf(ssl is None, 'No SSL support') def test_server_hostname(self): _SelectorSslTransport( self.loop, self.sock, self.protocol, self.sslcontext, diff --git a/tests/test_subprocess.py b/tests/test_subprocess.py index e0ed2c5..bf08d85 100644 --- a/tests/test_subprocess.py +++ b/tests/test_subprocess.py @@ -7,6 +7,7 @@ import sys import unittest from trollius import From, Return from trollius import test_support as support +from trollius.test_utils import mock if sys.platform != 'win32': from trollius import unix_events from trollius.py33_exceptions import BrokenPipeError, ConnectionResetError @@ -176,6 +177,42 @@ class SubprocessMixin(object): self.loop.run_until_complete(proc.communicate(large_data)) self.loop.run_until_complete(proc.wait()) + def test_pause_reading(self): + limit = 10 + size = (limit * 2 + 1) + + @asyncio.coroutine + def test_pause_reading(): + code = '\n'.join(( + 'import sys', + 'sys.stdout.write("x" * %s)' % size, + 'sys.stdout.flush()', + )) + proc = yield From(asyncio.create_subprocess_exec( + sys.executable, '-c', code, + stdin=asyncio.subprocess.PIPE, + stdout=asyncio.subprocess.PIPE, + limit=limit, + loop=self.loop)) + stdout_transport = proc._transport.get_pipe_transport(1) + stdout_transport.pause_reading = mock.Mock() + stdout_transport.resume_reading = mock.Mock() + + stdout, stderr = yield From(proc.communicate()) + + # The child process produced more than limit bytes of output, + # the stream reader transport should pause the protocol to not + # allocate too much memory. + raise Return(stdout, stdout_transport) + + # Issue #22685: Ensure that the stream reader pauses the protocol + # when the child process produces too much data + stdout, transport = self.loop.run_until_complete(test_pause_reading()) + + self.assertEqual(stdout, b'x' * size) + self.assertTrue(transport.pause_reading.called) + self.assertTrue(transport.resume_reading.called) + if sys.platform != 'win32': # Unix diff --git a/tests/test_windows_events.py b/tests/test_windows_events.py index 4f9fcac..c4c562b 100644 --- a/tests/test_windows_events.py +++ b/tests/test_windows_events.py @@ -106,7 +106,7 @@ class ProactorTests(test_utils.TestCase): _overlapped.SetEvent(event) - # Wait for for set event; + # Wait for set event; # result should be True immediately fut = self.loop._proactor.wait_for_handle(event, 10) start = self.loop.time() diff --git a/trollius/base_events.py b/trollius/base_events.py index 0f8401e..05953e9 100644 --- a/trollius/base_events.py +++ b/trollius/base_events.py @@ -182,6 +182,7 @@ class BaseEventLoop(events.AbstractEventLoop): Return a task object. """ + self._check_closed() task = tasks.Task(coro, loop=self) if task._source_traceback: del task._source_traceback[-1] @@ -365,6 +366,7 @@ class BaseEventLoop(events.AbstractEventLoop): if (coroutines.iscoroutine(callback) or coroutines.iscoroutinefunction(callback)): raise TypeError("coroutines cannot be used with call_at()") + self._check_closed() if self._debug: self._assert_is_current_event_loop() timer = events.TimerHandle(when, callback, args, self) @@ -395,6 +397,7 @@ class BaseEventLoop(events.AbstractEventLoop): raise TypeError("coroutines cannot be used with call_soon()") if self._debug and check_loop: self._assert_is_current_event_loop() + self._check_closed() handle = events.Handle(callback, args, self) if handle._source_traceback: del handle._source_traceback[-1] @@ -431,6 +434,7 @@ class BaseEventLoop(events.AbstractEventLoop): if (coroutines.iscoroutine(callback) or coroutines.iscoroutinefunction(callback)): raise TypeError("coroutines cannot be used with run_in_executor()") + self._check_closed() if isinstance(callback, events.Handle): assert not args assert not isinstance(callback, events.TimerHandle) diff --git a/trollius/futures.py b/trollius/futures.py index 1196377..867107c 100644 --- a/trollius/futures.py +++ b/trollius/futures.py @@ -64,7 +64,7 @@ class _TracebackLogger(object): the Future is collected, and the helper is present, the helper object is also collected, and its __del__() method will log the traceback. When the Future's result() or exception() method is - called (and a helper object is present), it removes the the helper + called (and a helper object is present), it removes the helper object, after calling its clear() method to prevent it from logging. @@ -138,6 +138,7 @@ class Future(object): _result = None _exception = None _loop = None + _source_traceback = None # Used by Python 2 to raise the exception with the original traceback # in the exception() method in debug mode @@ -160,8 +161,6 @@ class Future(object): self._callbacks = [] if self._loop.get_debug(): self._source_traceback = traceback.extract_stack(sys._getframe(1)) - else: - self._source_traceback = None def _format_callbacks(self): cb = self._callbacks diff --git a/trollius/py33_exceptions.py b/trollius/py33_exceptions.py index 936e76d..94cbfca 100644 --- a/trollius/py33_exceptions.py +++ b/trollius/py33_exceptions.py @@ -8,6 +8,11 @@ import errno import select import socket import sys +try: + import ssl +except ImportError: + ssl = None + from .compat import PY33 if PY33: @@ -121,6 +126,8 @@ if not PY33: try: return func(*args, **kw) except (socket.error, IOError, OSError) as exc: + if ssl is not None and isinstance(exc, ssl.SSLError): + raise if hasattr(exc, 'winerror'): _wrap_error(exc, _MAP_ERRNO, exc.winerror) # _MAP_ERRNO does not contain all Windows errors. diff --git a/trollius/py3_ssl.py b/trollius/py3_ssl.py index e044f06..c592ee6 100644 --- a/trollius/py3_ssl.py +++ b/trollius/py3_ssl.py @@ -95,18 +95,17 @@ except AttributeError: self.suppress_ragged_eofs = suppress_ragged_eofs self._makefile_refs = 0 - def wrap_socket(sock, keyfile=None, certfile=None, - server_side=False, cert_reqs=ssl.CERT_NONE, - ssl_version=ssl.PROTOCOL_SSLv23, ca_certs=None, - do_handshake_on_connect=True, - suppress_ragged_eofs=True): - return BackportSSLSocket(sock, keyfile=keyfile, certfile=certfile, - server_side=server_side, cert_reqs=cert_reqs, - ssl_version=ssl_version, ca_certs=ca_certs, - do_handshake_on_connect=do_handshake_on_connect, - suppress_ragged_eofs=suppress_ragged_eofs) + def wrap_socket(sock, server_hostname=None, **kwargs): + # ignore server_hostname parameter, not supported + kwargs.pop('server_hostname', None) + return BackportSSLSocket(sock, **kwargs) else: - wrap_socket = ssl.wrap_socket + _wrap_socket = ssl.wrap_socket + + def wrap_socket(sock, **kwargs): + # ignore server_hostname parameter, not supported + kwargs.pop('server_hostname', None) + return _wrap_socket(sock, **kwargs) class SSLContext(object): @@ -119,12 +118,12 @@ except AttributeError: self.certfile = certfile self.keyfile = keyfile - def wrap_socket(self, sock, **kw): + def wrap_socket(self, sock, **kwargs): return wrap_socket(sock, - ssl_version=self.protocol, - certfile=self.certfile, - keyfile=self.keyfile, - **kw) + ssl_version=self.protocol, + certfile=self.certfile, + keyfile=self.keyfile, + **kwargs) @property def verify_mode(self): diff --git a/trollius/selector_events.py b/trollius/selector_events.py index 8ff3489..2f13427 100644 --- a/trollius/selector_events.py +++ b/trollius/selector_events.py @@ -727,7 +727,7 @@ class _SelectorSslTransport(_SelectorTransport): 'server_side': server_side, 'do_handshake_on_connect': False, } - if server_hostname and not server_side and getattr(ssl, 'HAS_SNI', False): + if server_hostname and not server_side: wrap_kwargs['server_hostname'] = server_hostname sslsock = sslcontext.wrap_socket(rawsock, **wrap_kwargs) diff --git a/trollius/subprocess.py b/trollius/subprocess.py index 46402f9..cd2398a 100644 --- a/trollius/subprocess.py +++ b/trollius/subprocess.py @@ -44,15 +44,22 @@ class SubprocessStreamProtocol(streams.FlowControlMixin, def connection_made(self, transport): self._transport = transport - if transport.get_pipe_transport(1): + + stdout_transport = transport.get_pipe_transport(1) + if stdout_transport is not None: self.stdout = streams.StreamReader(limit=self._limit, loop=self._loop) - if transport.get_pipe_transport(2): + self.stdout.set_transport(stdout_transport) + + stderr_transport = transport.get_pipe_transport(2) + if stderr_transport is not None: self.stderr = streams.StreamReader(limit=self._limit, loop=self._loop) - stdin = transport.get_pipe_transport(0) - if stdin is not None: - self.stdin = streams.StreamWriter(stdin, + self.stderr.set_transport(stderr_transport) + + stdin_transport = transport.get_pipe_transport(0) + if stdin_transport is not None: + self.stdin = streams.StreamWriter(stdin_transport, protocol=self, reader=None, loop=self._loop) diff --git a/trollius/tasks.py b/trollius/tasks.py index 428d3af..4d91de8 100644 --- a/trollius/tasks.py +++ b/trollius/tasks.py @@ -54,6 +54,10 @@ class Task(futures.Future): # all running event loops. {EventLoop: Task} _current_tasks = {} + # If False, don't log a message if the task is destroyed whereas its + # status is still pending + _log_destroy_pending = True + @classmethod def current_task(cls, loop=None): """Return the currently running task in an event loop or None. @@ -86,9 +90,6 @@ class Task(futures.Future): self._must_cancel = False self._loop.call_soon(self._step) self.__class__._all_tasks.add(self) - # If False, don't log a message if the task is destroyed whereas its - # status is still pending - self._log_destroy_pending = True # On Python 3.3 or older, objects with a destructor that are part of a # reference cycle are never destroyed. That's not the case any more on @@ -122,7 +123,7 @@ class Task(futures.Future): def get_stack(self, limit=None): """Return the list of stack frames for this task's coroutine. - If the coroutine is active, this returns the stack where it is + If the coroutine is not done, this returns the stack where it is suspended. If the coroutine has completed successfully or was cancelled, this returns an empty list. If the coroutine was terminated by an exception, this returns the list of traceback diff --git a/trollius/unix_events.py b/trollius/unix_events.py index d3c9536..c067197 100644 --- a/trollius/unix_events.py +++ b/trollius/unix_events.py @@ -85,6 +85,7 @@ class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop): or coroutines.iscoroutinefunction(callback)): raise TypeError("coroutines cannot be used with add_signal_handler()") self._check_signal(sig) + self._check_closed() try: # set_wakeup_fd() raises ValueError if this is not the # main thread. By calling it early we ensure that an |