diff options
| author | Victor Stinner <vstinner@redhat.com> | 2015-09-08 22:55:49 +0200 |
|---|---|---|
| committer | Victor Stinner <vstinner@redhat.com> | 2015-09-08 22:55:49 +0200 |
| commit | 0727ad2b6d49cd94ea0fb86ef08c8050208b839a (patch) | |
| tree | 07179220ffd8f0851769b2d271731f2c365621ad /tests | |
| parent | f25cb291d8439d9f3d44f52811607d3fdb305d1f (diff) | |
| parent | 8d79c57726e30fd19d5fadf46375853df7895516 (diff) | |
| download | trollius-git-0727ad2b6d49cd94ea0fb86ef08c8050208b839a.tar.gz | |
Merge asyncio into trollius
Diffstat (limited to 'tests')
| -rw-r--r-- | tests/test_queues.py | 96 | ||||
| -rw-r--r-- | tests/test_streams.py | 28 | ||||
| -rw-r--r-- | tests/test_subprocess.py | 19 | ||||
| -rw-r--r-- | tests/test_tasks.py | 33 |
4 files changed, 159 insertions, 17 deletions
diff --git a/tests/test_queues.py b/tests/test_queues.py index e75ae4f..75ef988 100644 --- a/tests/test_queues.py +++ b/tests/test_queues.py @@ -173,7 +173,7 @@ class QueueGetTests(_QueueTestBase): q.put_nowait(1) waiter = asyncio.Future(loop=self.loop) - q._putters.append((2, waiter)) + q._putters.append(waiter) res = self.loop.run_until_complete(q.get()) self.assertEqual(1, res) @@ -326,6 +326,99 @@ class QueuePutTests(_QueueTestBase): q.put_nowait(1) self.assertEqual(1, q.get_nowait()) + def test_get_cancel_drop_one_pending_reader(self): + def gen(): + yield 0.01 + yield 0.1 + + loop = self.new_test_loop(gen) + + q = asyncio.Queue(loop=loop) + + reader = loop.create_task(q.get()) + + loop.run_until_complete(asyncio.sleep(0.01, loop=loop)) + + q.put_nowait(1) + q.put_nowait(2) + reader.cancel() + + try: + loop.run_until_complete(reader) + except asyncio.CancelledError: + # try again + reader = loop.create_task(q.get()) + loop.run_until_complete(reader) + + result = reader.result() + # if we get 2, it means 1 got dropped! + self.assertEqual(1, result) + + def test_get_cancel_drop_many_pending_readers(self): + def gen(): + yield 0.01 + yield 0.1 + + loop = self.new_test_loop(gen) + loop.set_debug(True) + + q = asyncio.Queue(loop=loop) + + reader1 = loop.create_task(q.get()) + reader2 = loop.create_task(q.get()) + reader3 = loop.create_task(q.get()) + + loop.run_until_complete(asyncio.sleep(0.01, loop=loop)) + + q.put_nowait(1) + q.put_nowait(2) + reader1.cancel() + + try: + loop.run_until_complete(reader1) + except asyncio.CancelledError: + pass + + loop.run_until_complete(reader3) + + # reader2 will receive `2`, because it was added to the + # queue of pending readers *before* put_nowaits were called. + self.assertEqual(reader2.result(), 2) + # reader3 will receive `1`, because reader1 was cancelled + # before is had a chance to execute, and `2` was already + # pushed to reader2 by second `put_nowait`. + self.assertEqual(reader3.result(), 1) + + def test_put_cancel_drop(self): + + def gen(): + yield 0.01 + yield 0.1 + + loop = self.new_test_loop(gen) + q = asyncio.Queue(1, loop=loop) + + q.put_nowait(1) + + # putting a second item in the queue has to block (qsize=1) + writer = loop.create_task(q.put(2)) + loop.run_until_complete(asyncio.sleep(0.01, loop=loop)) + + value1 = q.get_nowait() + self.assertEqual(value1, 1) + + writer.cancel() + try: + loop.run_until_complete(writer) + except asyncio.CancelledError: + # try again + writer = loop.create_task(q.put(2)) + loop.run_until_complete(writer) + + value2 = q.get_nowait() + self.assertEqual(value2, 2) + self.assertEqual(q.qsize(), 0) + def test_nonblocking_put_exception(self): q = asyncio.Queue(maxsize=1, loop=self.loop) q.put_nowait(1) @@ -379,6 +472,7 @@ class QueuePutTests(_QueueTestBase): test_utils.run_briefly(self.loop) self.assertTrue(put_c.done()) self.assertEqual(q.get_nowait(), 'a') + test_utils.run_briefly(self.loop) self.assertEqual(q.get_nowait(), 'b') self.loop.run_until_complete(put_b) diff --git a/tests/test_streams.py b/tests/test_streams.py index 9ecbb66..390174c 100644 --- a/tests/test_streams.py +++ b/tests/test_streams.py @@ -450,6 +450,8 @@ class StreamReaderTests(test_utils.TestCase): def handle_client(self, client_reader, client_writer): data = yield From(client_reader.readline()) client_writer.write(data) + yield From(client_writer.drain()) + client_writer.close() def start(self): sock = socket.socket() @@ -461,12 +463,8 @@ class StreamReaderTests(test_utils.TestCase): return sock.getsockname() def handle_client_callback(self, client_reader, client_writer): - task = asyncio.Task(client_reader.readline(), loop=self.loop) - - def done(task): - client_writer.write(task.result()) - - task.add_done_callback(done) + self.loop.create_task(self.handle_client(client_reader, + client_writer)) def start_callback(self): sock = socket.socket() @@ -526,6 +524,8 @@ class StreamReaderTests(test_utils.TestCase): def handle_client(self, client_reader, client_writer): data = yield From(client_reader.readline()) client_writer.write(data) + yield From(client_writer.drain()) + client_writer.close() def start(self): self.server = self.loop.run_until_complete( @@ -534,18 +534,14 @@ class StreamReaderTests(test_utils.TestCase): loop=self.loop)) def handle_client_callback(self, client_reader, client_writer): - task = asyncio.Task(client_reader.readline(), loop=self.loop) - - def done(task): - client_writer.write(task.result()) - - task.add_done_callback(done) + self.loop.create_task(self.handle_client(client_reader, + client_writer)) def start_callback(self): - self.server = self.loop.run_until_complete( - asyncio.start_unix_server(self.handle_client_callback, - path=self.path, - loop=self.loop)) + start = asyncio.start_unix_server(self.handle_client_callback, + path=self.path, + loop=self.loop) + self.server = self.loop.run_until_complete(start) def stop(self): if self.server is not None: diff --git a/tests/test_subprocess.py b/tests/test_subprocess.py index a813834..21e003a 100644 --- a/tests/test_subprocess.py +++ b/tests/test_subprocess.py @@ -4,6 +4,7 @@ import trollius as asyncio import os import signal import sys +import warnings from trollius import BrokenPipeError, ConnectionResetError, ProcessLookupError from trollius import From, Return from trollius import base_subprocess @@ -427,6 +428,24 @@ class SubprocessMixin: # the transport was not notified yet self.assertFalse(killed) + def test_popen_error(self): + # Issue #24763: check that the subprocess transport is closed + # when BaseSubprocessTransport fails + if sys.platform == 'win32': + target = 'asyncio.windows_utils.Popen' + else: + target = 'subprocess.Popen' + with mock.patch(target) as popen: + exc = ZeroDivisionError + popen.side_effect = exc + + create = asyncio.create_subprocess_exec(sys.executable, '-c', + 'pass', loop=self.loop) + with warnings.catch_warnings(record=True) as warns: + with self.assertRaises(exc): + self.loop.run_until_complete(create) + self.assertEqual(warns, []) + if sys.platform != 'win32': # Unix diff --git a/tests/test_tasks.py b/tests/test_tasks.py index afa1190..6576ddb 100644 --- a/tests/test_tasks.py +++ b/tests/test_tasks.py @@ -2,8 +2,10 @@ import contextlib import functools +import io import os import re +import six import sys import types import weakref @@ -157,6 +159,37 @@ class TaskTests(test_utils.TestCase): 'function is deprecated, use ensure_'): self.assertIs(f, asyncio.async(f)) + def test_get_stack(self): + non_local = {'T': None} + + @asyncio.coroutine + def foo(): + yield From(bar()) + + @asyncio.coroutine + def bar(): + T = non_local['T'] + # test get_stack() + f = T.get_stack(limit=1) + try: + self.assertEqual(f[0].f_code.co_name, 'foo') + finally: + f = None + + # test print_stack() + file = six.StringIO() + T.print_stack(limit=1, file=file) + file.seek(0) + tb = file.read() + self.assertRegex(tb, r'foo\(\) running') + + @asyncio.coroutine + def runner(): + non_local['T'] = asyncio.ensure_future(foo(), loop=self.loop) + yield From(non_local['T']) + + self.loop.run_until_complete(runner()) + def test_task_repr(self): self.loop.set_debug(False) |
