summaryrefslogtreecommitdiff
path: root/tests
diff options
context:
space:
mode:
authorVictor Stinner <vstinner@redhat.com>2015-09-08 22:55:49 +0200
committerVictor Stinner <vstinner@redhat.com>2015-09-08 22:55:49 +0200
commit0727ad2b6d49cd94ea0fb86ef08c8050208b839a (patch)
tree07179220ffd8f0851769b2d271731f2c365621ad /tests
parentf25cb291d8439d9f3d44f52811607d3fdb305d1f (diff)
parent8d79c57726e30fd19d5fadf46375853df7895516 (diff)
downloadtrollius-git-0727ad2b6d49cd94ea0fb86ef08c8050208b839a.tar.gz
Merge asyncio into trollius
Diffstat (limited to 'tests')
-rw-r--r--tests/test_queues.py96
-rw-r--r--tests/test_streams.py28
-rw-r--r--tests/test_subprocess.py19
-rw-r--r--tests/test_tasks.py33
4 files changed, 159 insertions, 17 deletions
diff --git a/tests/test_queues.py b/tests/test_queues.py
index e75ae4f..75ef988 100644
--- a/tests/test_queues.py
+++ b/tests/test_queues.py
@@ -173,7 +173,7 @@ class QueueGetTests(_QueueTestBase):
q.put_nowait(1)
waiter = asyncio.Future(loop=self.loop)
- q._putters.append((2, waiter))
+ q._putters.append(waiter)
res = self.loop.run_until_complete(q.get())
self.assertEqual(1, res)
@@ -326,6 +326,99 @@ class QueuePutTests(_QueueTestBase):
q.put_nowait(1)
self.assertEqual(1, q.get_nowait())
+ def test_get_cancel_drop_one_pending_reader(self):
+ def gen():
+ yield 0.01
+ yield 0.1
+
+ loop = self.new_test_loop(gen)
+
+ q = asyncio.Queue(loop=loop)
+
+ reader = loop.create_task(q.get())
+
+ loop.run_until_complete(asyncio.sleep(0.01, loop=loop))
+
+ q.put_nowait(1)
+ q.put_nowait(2)
+ reader.cancel()
+
+ try:
+ loop.run_until_complete(reader)
+ except asyncio.CancelledError:
+ # try again
+ reader = loop.create_task(q.get())
+ loop.run_until_complete(reader)
+
+ result = reader.result()
+ # if we get 2, it means 1 got dropped!
+ self.assertEqual(1, result)
+
+ def test_get_cancel_drop_many_pending_readers(self):
+ def gen():
+ yield 0.01
+ yield 0.1
+
+ loop = self.new_test_loop(gen)
+ loop.set_debug(True)
+
+ q = asyncio.Queue(loop=loop)
+
+ reader1 = loop.create_task(q.get())
+ reader2 = loop.create_task(q.get())
+ reader3 = loop.create_task(q.get())
+
+ loop.run_until_complete(asyncio.sleep(0.01, loop=loop))
+
+ q.put_nowait(1)
+ q.put_nowait(2)
+ reader1.cancel()
+
+ try:
+ loop.run_until_complete(reader1)
+ except asyncio.CancelledError:
+ pass
+
+ loop.run_until_complete(reader3)
+
+ # reader2 will receive `2`, because it was added to the
+ # queue of pending readers *before* put_nowaits were called.
+ self.assertEqual(reader2.result(), 2)
+ # reader3 will receive `1`, because reader1 was cancelled
+ # before is had a chance to execute, and `2` was already
+ # pushed to reader2 by second `put_nowait`.
+ self.assertEqual(reader3.result(), 1)
+
+ def test_put_cancel_drop(self):
+
+ def gen():
+ yield 0.01
+ yield 0.1
+
+ loop = self.new_test_loop(gen)
+ q = asyncio.Queue(1, loop=loop)
+
+ q.put_nowait(1)
+
+ # putting a second item in the queue has to block (qsize=1)
+ writer = loop.create_task(q.put(2))
+ loop.run_until_complete(asyncio.sleep(0.01, loop=loop))
+
+ value1 = q.get_nowait()
+ self.assertEqual(value1, 1)
+
+ writer.cancel()
+ try:
+ loop.run_until_complete(writer)
+ except asyncio.CancelledError:
+ # try again
+ writer = loop.create_task(q.put(2))
+ loop.run_until_complete(writer)
+
+ value2 = q.get_nowait()
+ self.assertEqual(value2, 2)
+ self.assertEqual(q.qsize(), 0)
+
def test_nonblocking_put_exception(self):
q = asyncio.Queue(maxsize=1, loop=self.loop)
q.put_nowait(1)
@@ -379,6 +472,7 @@ class QueuePutTests(_QueueTestBase):
test_utils.run_briefly(self.loop)
self.assertTrue(put_c.done())
self.assertEqual(q.get_nowait(), 'a')
+ test_utils.run_briefly(self.loop)
self.assertEqual(q.get_nowait(), 'b')
self.loop.run_until_complete(put_b)
diff --git a/tests/test_streams.py b/tests/test_streams.py
index 9ecbb66..390174c 100644
--- a/tests/test_streams.py
+++ b/tests/test_streams.py
@@ -450,6 +450,8 @@ class StreamReaderTests(test_utils.TestCase):
def handle_client(self, client_reader, client_writer):
data = yield From(client_reader.readline())
client_writer.write(data)
+ yield From(client_writer.drain())
+ client_writer.close()
def start(self):
sock = socket.socket()
@@ -461,12 +463,8 @@ class StreamReaderTests(test_utils.TestCase):
return sock.getsockname()
def handle_client_callback(self, client_reader, client_writer):
- task = asyncio.Task(client_reader.readline(), loop=self.loop)
-
- def done(task):
- client_writer.write(task.result())
-
- task.add_done_callback(done)
+ self.loop.create_task(self.handle_client(client_reader,
+ client_writer))
def start_callback(self):
sock = socket.socket()
@@ -526,6 +524,8 @@ class StreamReaderTests(test_utils.TestCase):
def handle_client(self, client_reader, client_writer):
data = yield From(client_reader.readline())
client_writer.write(data)
+ yield From(client_writer.drain())
+ client_writer.close()
def start(self):
self.server = self.loop.run_until_complete(
@@ -534,18 +534,14 @@ class StreamReaderTests(test_utils.TestCase):
loop=self.loop))
def handle_client_callback(self, client_reader, client_writer):
- task = asyncio.Task(client_reader.readline(), loop=self.loop)
-
- def done(task):
- client_writer.write(task.result())
-
- task.add_done_callback(done)
+ self.loop.create_task(self.handle_client(client_reader,
+ client_writer))
def start_callback(self):
- self.server = self.loop.run_until_complete(
- asyncio.start_unix_server(self.handle_client_callback,
- path=self.path,
- loop=self.loop))
+ start = asyncio.start_unix_server(self.handle_client_callback,
+ path=self.path,
+ loop=self.loop)
+ self.server = self.loop.run_until_complete(start)
def stop(self):
if self.server is not None:
diff --git a/tests/test_subprocess.py b/tests/test_subprocess.py
index a813834..21e003a 100644
--- a/tests/test_subprocess.py
+++ b/tests/test_subprocess.py
@@ -4,6 +4,7 @@ import trollius as asyncio
import os
import signal
import sys
+import warnings
from trollius import BrokenPipeError, ConnectionResetError, ProcessLookupError
from trollius import From, Return
from trollius import base_subprocess
@@ -427,6 +428,24 @@ class SubprocessMixin:
# the transport was not notified yet
self.assertFalse(killed)
+ def test_popen_error(self):
+ # Issue #24763: check that the subprocess transport is closed
+ # when BaseSubprocessTransport fails
+ if sys.platform == 'win32':
+ target = 'asyncio.windows_utils.Popen'
+ else:
+ target = 'subprocess.Popen'
+ with mock.patch(target) as popen:
+ exc = ZeroDivisionError
+ popen.side_effect = exc
+
+ create = asyncio.create_subprocess_exec(sys.executable, '-c',
+ 'pass', loop=self.loop)
+ with warnings.catch_warnings(record=True) as warns:
+ with self.assertRaises(exc):
+ self.loop.run_until_complete(create)
+ self.assertEqual(warns, [])
+
if sys.platform != 'win32':
# Unix
diff --git a/tests/test_tasks.py b/tests/test_tasks.py
index afa1190..6576ddb 100644
--- a/tests/test_tasks.py
+++ b/tests/test_tasks.py
@@ -2,8 +2,10 @@
import contextlib
import functools
+import io
import os
import re
+import six
import sys
import types
import weakref
@@ -157,6 +159,37 @@ class TaskTests(test_utils.TestCase):
'function is deprecated, use ensure_'):
self.assertIs(f, asyncio.async(f))
+ def test_get_stack(self):
+ non_local = {'T': None}
+
+ @asyncio.coroutine
+ def foo():
+ yield From(bar())
+
+ @asyncio.coroutine
+ def bar():
+ T = non_local['T']
+ # test get_stack()
+ f = T.get_stack(limit=1)
+ try:
+ self.assertEqual(f[0].f_code.co_name, 'foo')
+ finally:
+ f = None
+
+ # test print_stack()
+ file = six.StringIO()
+ T.print_stack(limit=1, file=file)
+ file.seek(0)
+ tb = file.read()
+ self.assertRegex(tb, r'foo\(\) running')
+
+ @asyncio.coroutine
+ def runner():
+ non_local['T'] = asyncio.ensure_future(foo(), loop=self.loop)
+ yield From(non_local['T'])
+
+ self.loop.run_until_complete(runner())
+
def test_task_repr(self):
self.loop.set_debug(False)