diff options
author | Victor Stinner <vstinner@redhat.com> | 2015-09-08 22:55:49 +0200 |
---|---|---|
committer | Victor Stinner <vstinner@redhat.com> | 2015-09-08 22:55:49 +0200 |
commit | 0727ad2b6d49cd94ea0fb86ef08c8050208b839a (patch) | |
tree | 07179220ffd8f0851769b2d271731f2c365621ad | |
parent | f25cb291d8439d9f3d44f52811607d3fdb305d1f (diff) | |
parent | 8d79c57726e30fd19d5fadf46375853df7895516 (diff) | |
download | trollius-git-0727ad2b6d49cd94ea0fb86ef08c8050208b839a.tar.gz |
Merge asyncio into trollius
-rw-r--r-- | .travis.yml | 1 | ||||
-rw-r--r-- | tests/test_queues.py | 96 | ||||
-rw-r--r-- | tests/test_streams.py | 28 | ||||
-rw-r--r-- | tests/test_subprocess.py | 19 | ||||
-rw-r--r-- | tests/test_tasks.py | 33 | ||||
-rw-r--r-- | trollius/base_events.py | 4 | ||||
-rw-r--r-- | trollius/base_subprocess.py | 13 | ||||
-rw-r--r-- | trollius/locks.py | 1 | ||||
-rw-r--r-- | trollius/proactor_events.py | 4 | ||||
-rw-r--r-- | trollius/queues.py | 49 | ||||
-rw-r--r-- | trollius/selector_events.py | 3 | ||||
-rw-r--r-- | trollius/sslproto.py | 11 | ||||
-rw-r--r-- | trollius/streams.py | 20 | ||||
-rw-r--r-- | trollius/tasks.py | 7 | ||||
-rw-r--r-- | trollius/test_utils.py | 2 | ||||
-rw-r--r-- | trollius/transports.py | 2 | ||||
-rw-r--r-- | trollius/unix_events.py | 4 |
17 files changed, 249 insertions, 48 deletions
diff --git a/.travis.yml b/.travis.yml index 2c5838b..5a2c7d7 100644 --- a/.travis.yml +++ b/.travis.yml @@ -7,6 +7,7 @@ os: python: - 3.3 - 3.4 + - "nightly" install: - pip install asyncio diff --git a/tests/test_queues.py b/tests/test_queues.py index e75ae4f..75ef988 100644 --- a/tests/test_queues.py +++ b/tests/test_queues.py @@ -173,7 +173,7 @@ class QueueGetTests(_QueueTestBase): q.put_nowait(1) waiter = asyncio.Future(loop=self.loop) - q._putters.append((2, waiter)) + q._putters.append(waiter) res = self.loop.run_until_complete(q.get()) self.assertEqual(1, res) @@ -326,6 +326,99 @@ class QueuePutTests(_QueueTestBase): q.put_nowait(1) self.assertEqual(1, q.get_nowait()) + def test_get_cancel_drop_one_pending_reader(self): + def gen(): + yield 0.01 + yield 0.1 + + loop = self.new_test_loop(gen) + + q = asyncio.Queue(loop=loop) + + reader = loop.create_task(q.get()) + + loop.run_until_complete(asyncio.sleep(0.01, loop=loop)) + + q.put_nowait(1) + q.put_nowait(2) + reader.cancel() + + try: + loop.run_until_complete(reader) + except asyncio.CancelledError: + # try again + reader = loop.create_task(q.get()) + loop.run_until_complete(reader) + + result = reader.result() + # if we get 2, it means 1 got dropped! + self.assertEqual(1, result) + + def test_get_cancel_drop_many_pending_readers(self): + def gen(): + yield 0.01 + yield 0.1 + + loop = self.new_test_loop(gen) + loop.set_debug(True) + + q = asyncio.Queue(loop=loop) + + reader1 = loop.create_task(q.get()) + reader2 = loop.create_task(q.get()) + reader3 = loop.create_task(q.get()) + + loop.run_until_complete(asyncio.sleep(0.01, loop=loop)) + + q.put_nowait(1) + q.put_nowait(2) + reader1.cancel() + + try: + loop.run_until_complete(reader1) + except asyncio.CancelledError: + pass + + loop.run_until_complete(reader3) + + # reader2 will receive `2`, because it was added to the + # queue of pending readers *before* put_nowaits were called. + self.assertEqual(reader2.result(), 2) + # reader3 will receive `1`, because reader1 was cancelled + # before is had a chance to execute, and `2` was already + # pushed to reader2 by second `put_nowait`. + self.assertEqual(reader3.result(), 1) + + def test_put_cancel_drop(self): + + def gen(): + yield 0.01 + yield 0.1 + + loop = self.new_test_loop(gen) + q = asyncio.Queue(1, loop=loop) + + q.put_nowait(1) + + # putting a second item in the queue has to block (qsize=1) + writer = loop.create_task(q.put(2)) + loop.run_until_complete(asyncio.sleep(0.01, loop=loop)) + + value1 = q.get_nowait() + self.assertEqual(value1, 1) + + writer.cancel() + try: + loop.run_until_complete(writer) + except asyncio.CancelledError: + # try again + writer = loop.create_task(q.put(2)) + loop.run_until_complete(writer) + + value2 = q.get_nowait() + self.assertEqual(value2, 2) + self.assertEqual(q.qsize(), 0) + def test_nonblocking_put_exception(self): q = asyncio.Queue(maxsize=1, loop=self.loop) q.put_nowait(1) @@ -379,6 +472,7 @@ class QueuePutTests(_QueueTestBase): test_utils.run_briefly(self.loop) self.assertTrue(put_c.done()) self.assertEqual(q.get_nowait(), 'a') + test_utils.run_briefly(self.loop) self.assertEqual(q.get_nowait(), 'b') self.loop.run_until_complete(put_b) diff --git a/tests/test_streams.py b/tests/test_streams.py index 9ecbb66..390174c 100644 --- a/tests/test_streams.py +++ b/tests/test_streams.py @@ -450,6 +450,8 @@ class StreamReaderTests(test_utils.TestCase): def handle_client(self, client_reader, client_writer): data = yield From(client_reader.readline()) client_writer.write(data) + yield From(client_writer.drain()) + client_writer.close() def start(self): sock = socket.socket() @@ -461,12 +463,8 @@ class StreamReaderTests(test_utils.TestCase): return sock.getsockname() def handle_client_callback(self, client_reader, client_writer): - task = asyncio.Task(client_reader.readline(), loop=self.loop) - - def done(task): - client_writer.write(task.result()) - - task.add_done_callback(done) + self.loop.create_task(self.handle_client(client_reader, + client_writer)) def start_callback(self): sock = socket.socket() @@ -526,6 +524,8 @@ class StreamReaderTests(test_utils.TestCase): def handle_client(self, client_reader, client_writer): data = yield From(client_reader.readline()) client_writer.write(data) + yield From(client_writer.drain()) + client_writer.close() def start(self): self.server = self.loop.run_until_complete( @@ -534,18 +534,14 @@ class StreamReaderTests(test_utils.TestCase): loop=self.loop)) def handle_client_callback(self, client_reader, client_writer): - task = asyncio.Task(client_reader.readline(), loop=self.loop) - - def done(task): - client_writer.write(task.result()) - - task.add_done_callback(done) + self.loop.create_task(self.handle_client(client_reader, + client_writer)) def start_callback(self): - self.server = self.loop.run_until_complete( - asyncio.start_unix_server(self.handle_client_callback, - path=self.path, - loop=self.loop)) + start = asyncio.start_unix_server(self.handle_client_callback, + path=self.path, + loop=self.loop) + self.server = self.loop.run_until_complete(start) def stop(self): if self.server is not None: diff --git a/tests/test_subprocess.py b/tests/test_subprocess.py index a813834..21e003a 100644 --- a/tests/test_subprocess.py +++ b/tests/test_subprocess.py @@ -4,6 +4,7 @@ import trollius as asyncio import os import signal import sys +import warnings from trollius import BrokenPipeError, ConnectionResetError, ProcessLookupError from trollius import From, Return from trollius import base_subprocess @@ -427,6 +428,24 @@ class SubprocessMixin: # the transport was not notified yet self.assertFalse(killed) + def test_popen_error(self): + # Issue #24763: check that the subprocess transport is closed + # when BaseSubprocessTransport fails + if sys.platform == 'win32': + target = 'asyncio.windows_utils.Popen' + else: + target = 'subprocess.Popen' + with mock.patch(target) as popen: + exc = ZeroDivisionError + popen.side_effect = exc + + create = asyncio.create_subprocess_exec(sys.executable, '-c', + 'pass', loop=self.loop) + with warnings.catch_warnings(record=True) as warns: + with self.assertRaises(exc): + self.loop.run_until_complete(create) + self.assertEqual(warns, []) + if sys.platform != 'win32': # Unix diff --git a/tests/test_tasks.py b/tests/test_tasks.py index afa1190..6576ddb 100644 --- a/tests/test_tasks.py +++ b/tests/test_tasks.py @@ -2,8 +2,10 @@ import contextlib import functools +import io import os import re +import six import sys import types import weakref @@ -157,6 +159,37 @@ class TaskTests(test_utils.TestCase): 'function is deprecated, use ensure_'): self.assertIs(f, asyncio.async(f)) + def test_get_stack(self): + non_local = {'T': None} + + @asyncio.coroutine + def foo(): + yield From(bar()) + + @asyncio.coroutine + def bar(): + T = non_local['T'] + # test get_stack() + f = T.get_stack(limit=1) + try: + self.assertEqual(f[0].f_code.co_name, 'foo') + finally: + f = None + + # test print_stack() + file = six.StringIO() + T.print_stack(limit=1, file=file) + file.seek(0) + tb = file.read() + self.assertRegex(tb, r'foo\(\) running') + + @asyncio.coroutine + def runner(): + non_local['T'] = asyncio.ensure_future(foo(), loop=self.loop) + yield From(non_local['T']) + + self.loop.run_until_complete(runner()) + def test_task_repr(self): self.loop.set_debug(False) diff --git a/trollius/base_events.py b/trollius/base_events.py index d4dc448..c5e6eff 100644 --- a/trollius/base_events.py +++ b/trollius/base_events.py @@ -387,7 +387,7 @@ class BaseEventLoop(events.AbstractEventLoop): # On Python 3.3 and older, objects with a destructor part of a reference # cycle are never destroyed. It's not more the case on Python 3.4 thanks # to the PEP 442. - if sys.version_info >= (3, 4): + if compat.PY34: def __del__(self): if not self.is_closed(): warnings.warn("unclosed event loop %r" % self, ResourceWarning) @@ -1225,7 +1225,7 @@ class BaseEventLoop(events.AbstractEventLoop): return enabled = bool(enabled) - if self._coroutine_wrapper_set is enabled: + if self._coroutine_wrapper_set == enabled: return wrapper = coroutines.debug_wrapper diff --git a/trollius/base_subprocess.py b/trollius/base_subprocess.py index 13cff7a..ffd6e76 100644 --- a/trollius/base_subprocess.py +++ b/trollius/base_subprocess.py @@ -1,8 +1,8 @@ import collections import subprocess -import sys import warnings +from . import compat from . import futures from . import protocols from . import transports @@ -36,8 +36,13 @@ class BaseSubprocessTransport(transports.SubprocessTransport): self._pipes[2] = None # Create the child process: set the _proc attribute - self._start(args=args, shell=shell, stdin=stdin, stdout=stdout, - stderr=stderr, bufsize=bufsize, **kwargs) + try: + self._start(args=args, shell=shell, stdin=stdin, stdout=stdout, + stderr=stderr, bufsize=bufsize, **kwargs) + except: + self.close() + raise + self._pid = self._proc.pid self._extra['subprocess'] = self._proc @@ -112,7 +117,7 @@ class BaseSubprocessTransport(transports.SubprocessTransport): # On Python 3.3 and older, objects with a destructor part of a reference # cycle are never destroyed. It's not more the case on Python 3.4 thanks # to the PEP 442. - if sys.version_info >= (3, 4): + if compat.PY34: def __del__(self): if not self._closed: warnings.warn("unclosed transport %r" % self, ResourceWarning) diff --git a/trollius/locks.py b/trollius/locks.py index 0d6de65..03b4daa 100644 --- a/trollius/locks.py +++ b/trollius/locks.py @@ -3,7 +3,6 @@ __all__ = ['Lock', 'Event', 'Condition', 'Semaphore', 'BoundedSemaphore'] import collections -import sys from . import compat from . import events diff --git a/trollius/proactor_events.py b/trollius/proactor_events.py index 49d8bc3..66b4caf 100644 --- a/trollius/proactor_events.py +++ b/trollius/proactor_events.py @@ -7,10 +7,10 @@ proactor is only implemented on Windows with IOCP. __all__ = ['BaseProactorEventLoop'] import socket -import sys import warnings from . import base_events +from . import compat from . import constants from . import futures from . import sslproto @@ -82,7 +82,7 @@ class _ProactorBasePipeTransport(transports._FlowControlMixin, # On Python 3.3 and older, objects with a destructor part of a reference # cycle are never destroyed. It's not more the case on Python 3.4 thanks # to the PEP 442. - if sys.version_info >= (3, 4): + if compat.PY34: def __del__(self): if self._sock is not None: warnings.warn("unclosed transport %r" % self, ResourceWarning) diff --git a/trollius/queues.py b/trollius/queues.py index 9c77c60..18167ab 100644 --- a/trollius/queues.py +++ b/trollius/queues.py @@ -47,7 +47,7 @@ class Queue(object): # Futures. self._getters = collections.deque() - # Pairs of (item, Future). + # Futures self._putters = collections.deque() self._unfinished_tasks = 0 self._finished = locks.Event(loop=self._loop) @@ -98,7 +98,7 @@ class Queue(object): def _consume_done_putters(self): # Delete waiters at the head of the put() queue who've timed out. - while self._putters and self._putters[0][1].done(): + while self._putters and self._putters[0].done(): self._putters.popleft() def qsize(self): @@ -148,8 +148,9 @@ class Queue(object): elif self._maxsize > 0 and self._maxsize <= self.qsize(): waiter = futures.Future(loop=self._loop) - self._putters.append((item, waiter)) + self._putters.append(waiter) yield From(waiter) + self._put(item) else: self.__put_internal(item) @@ -186,8 +187,7 @@ class Queue(object): self._consume_done_putters() if self._putters: assert self.full(), 'queue not full, why are putters waiting?' - item, putter = self._putters.popleft() - self.__put_internal(item) + putter = self._putters.popleft() # When a getter runs and frees up a slot so this putter can # run, we need to defer the put for a tick to ensure that @@ -201,10 +201,40 @@ class Queue(object): raise Return(self._get()) else: waiter = futures.Future(loop=self._loop) - self._getters.append(waiter) - result = yield From(waiter) - raise Return(result) + try: + value = (yield From(waiter)) + raise Return(value) + except futures.CancelledError: + # if we get CancelledError, it means someone cancelled this + # get() coroutine. But there is a chance that the waiter + # already is ready and contains an item that has just been + # removed from the queue. In this case, we need to put the item + # back into the front of the queue. This get() must either + # succeed without fault or, if it gets cancelled, it must be as + # if it never happened. + if waiter.done(): + self._put_it_back(waiter.result()) + raise + + def _put_it_back(self, item): + """ + This is called when we have a waiter to get() an item and this waiter + gets cancelled. In this case, we put the item back: wake up another + waiter or put it in the _queue. + """ + self._consume_done_getters() + if self._getters: + assert not self._queue, ( + 'queue non-empty, why are getters waiting?') + + getter = self._getters.popleft() + self.__put_internal(item) + + # getter cannot be cancelled, we just removed done getters + getter.set_result(item) + else: + self._queue.appendleft(item) def get_nowait(self): """Remove and return an item from the queue. @@ -214,8 +244,7 @@ class Queue(object): self._consume_done_putters() if self._putters: assert self.full(), 'queue not full, why are putters waiting?' - item, putter = self._putters.popleft() - self.__put_internal(item) + putter = self._putters.popleft() # Wake putter on next tick. # getter cannot be cancelled, we just removed done putters diff --git a/trollius/selector_events.py b/trollius/selector_events.py index dc27ed1..67ef26e 100644 --- a/trollius/selector_events.py +++ b/trollius/selector_events.py @@ -19,6 +19,7 @@ except ImportError: # pragma: no cover ssl = None from . import base_events +from . import compat from . import constants from . import events from . import futures @@ -584,7 +585,7 @@ class _SelectorTransport(transports._FlowControlMixin, # On Python 3.3 and older, objects with a destructor part of a reference # cycle are never destroyed. It's not more the case on Python 3.4 thanks # to the PEP 442. - if sys.version_info >= (3, 4): + if compat.PY34: def __del__(self): if self._sock is not None: warnings.warn("unclosed transport %r" % self, ResourceWarning) diff --git a/trollius/sslproto.py b/trollius/sslproto.py index 707cc6d..1404fd7 100644 --- a/trollius/sslproto.py +++ b/trollius/sslproto.py @@ -1,5 +1,4 @@ import collections -import sys import warnings try: import ssl @@ -7,6 +6,7 @@ try: except ImportError: # pragma: no cover ssl = None +from . import compat from . import protocols from . import transports from .log import logger @@ -326,7 +326,7 @@ class _SSLProtocolTransport(transports._FlowControlMixin, # On Python 3.3 and older, objects with a destructor part of a reference # cycle are never destroyed. It's not more the case on Python 3.4 thanks # to the PEP 442. - if sys.version_info >= (3, 4): + if compat.PY34: def __del__(self): if not self._closed: warnings.warn("unclosed transport %r" % self, ResourceWarning) @@ -623,7 +623,8 @@ class SSLProtocol(protocols.Protocol): if data: ssldata, offset = self._sslpipe.feed_appdata(data, offset) elif offset: - ssldata = self._sslpipe.do_handshake(self._on_handshake_complete) + ssldata = self._sslpipe.do_handshake( + self._on_handshake_complete) offset = 1 else: ssldata = self._sslpipe.shutdown(self._finalize) @@ -647,9 +648,13 @@ class SSLProtocol(protocols.Protocol): self._write_buffer_size -= len(data) except BaseException as exc: if self._in_handshake: + # BaseExceptions will be re-raised in _on_handshake_complete. self._on_handshake_complete(exc) else: self._fatal_error(exc, 'Fatal error on SSL transport') + if not isinstance(exc, Exception): + # BaseException + raise def _fatal_error(self, exc, message='Fatal error on transport'): # Should be called from exception handler only. diff --git a/trollius/streams.py b/trollius/streams.py index b7ba4c5..cde58fb 100644 --- a/trollius/streams.py +++ b/trollius/streams.py @@ -6,7 +6,6 @@ __all__ = ['StreamReader', 'StreamWriter', 'StreamReaderProtocol', ] import socket -import sys if hasattr(socket, 'AF_UNIX'): __all__.extend(['open_unix_connection', 'start_unix_server']) @@ -243,6 +242,7 @@ class StreamReaderProtocol(FlowControlMixin, protocols.Protocol): def eof_received(self): self._stream_reader.feed_eof() + return True class StreamWriter(object): @@ -324,6 +324,24 @@ class StreamReader(object): self._transport = None self._paused = False + def __repr__(self): + info = ['StreamReader'] + if self._buffer: + info.append('%d bytes' % len(info)) + if self._eof: + info.append('eof') + if self._limit != _DEFAULT_LIMIT: + info.append('l=%d' % self._limit) + if self._waiter: + info.append('w=%r' % self._waiter) + if self._exception: + info.append('e=%r' % self._exception) + if self._transport: + info.append('t=%r' % self._transport) + if self._paused: + info.append('paused') + return '<%s>' % ' '.join(info) + def exception(self): return self._exception diff --git a/trollius/tasks.py b/trollius/tasks.py index 8de3e62..3e0e1b1 100644 --- a/trollius/tasks.py +++ b/trollius/tasks.py @@ -9,7 +9,6 @@ __all__ = ['Task', import functools import linecache -import sys import traceback import warnings try: @@ -141,7 +140,11 @@ class Task(futures.Future): returned for a suspended coroutine. """ frames = [] - f = self._coro.gi_frame + try: + # 'async def' coroutines + f = self._coro.cr_frame + except AttributeError: + f = self._coro.gi_frame if f is not None: while f is not None: if limit is not None: diff --git a/trollius/test_utils.py b/trollius/test_utils.py index 12cdd45..ebebb25 100644 --- a/trollius/test_utils.py +++ b/trollius/test_utils.py @@ -524,7 +524,7 @@ class TestCase(unittest.TestCase): if six.PY2: sys.exc_clear() else: - self.assertEqual(sys.exc_info(), (None, None, None)) + pass #self.assertEqual(sys.exc_info(), (None, None, None)) def check_soure_traceback(self, source_traceback, lineno_delta): frame = sys._getframe(1) diff --git a/trollius/transports.py b/trollius/transports.py index 10bad51..1f086c1 100644 --- a/trollius/transports.py +++ b/trollius/transports.py @@ -1,7 +1,5 @@ """Abstract Transport class.""" -import sys - from trollius import compat __all__ = ['BaseTransport', 'ReadTransport', 'WriteTransport', diff --git a/trollius/unix_events.py b/trollius/unix_events.py index fcccaaa..cdefaca 100644 --- a/trollius/unix_events.py +++ b/trollius/unix_events.py @@ -399,7 +399,7 @@ class _UnixReadPipeTransport(transports.ReadTransport): # On Python 3.3 and older, objects with a destructor part of a reference # cycle are never destroyed. It's not more the case on Python 3.4 thanks # to the PEP 442. - if sys.version_info >= (3, 4): + if compat.PY34: def __del__(self): if self._pipe is not None: warnings.warn("unclosed transport %r" % self, ResourceWarning) @@ -582,7 +582,7 @@ class _UnixWritePipeTransport(transports._FlowControlMixin, # On Python 3.3 and older, objects with a destructor part of a reference # cycle are never destroyed. It's not more the case on Python 3.4 thanks # to the PEP 442. - if sys.version_info >= (3, 4): + if compat.PY34: def __del__(self): if self._pipe is not None: warnings.warn("unclosed transport %r" % self, ResourceWarning) |