summaryrefslogtreecommitdiff
path: root/tests
diff options
context:
space:
mode:
Diffstat (limited to 'tests')
-rw-r--r--tests/echo3.py10
-rw-r--r--tests/test_asyncio.py141
-rw-r--r--tests/test_base_events.py219
-rw-r--r--tests/test_events.py392
-rw-r--r--tests/test_futures.py98
-rw-r--r--tests/test_locks.py165
-rw-r--r--tests/test_proactor_events.py21
-rw-r--r--tests/test_queues.py104
-rw-r--r--tests/test_selector_events.py92
-rw-r--r--tests/test_selectors.py9
-rw-r--r--tests/test_streams.py58
-rw-r--r--tests/test_subprocess.py103
-rw-r--r--tests/test_tasks.py434
-rw-r--r--tests/test_transports.py15
-rw-r--r--tests/test_unix_events.py134
-rw-r--r--tests/test_windows_events.py39
-rw-r--r--tests/test_windows_utils.py44
17 files changed, 1164 insertions, 914 deletions
diff --git a/tests/echo3.py b/tests/echo3.py
index 0644967..a009ea3 100644
--- a/tests/echo3.py
+++ b/tests/echo3.py
@@ -1,4 +1,12 @@
import os
+import sys
+
+asyncio_path = os.path.join(os.path.dirname(__file__), '..')
+asyncio_path = os.path.abspath(asyncio_path)
+
+sys.path.insert(0, asyncio_path)
+from trollius.py33_exceptions import wrap_error
+sys.path.remove(asyncio_path)
if __name__ == '__main__':
while True:
@@ -6,6 +14,6 @@ if __name__ == '__main__':
if not buf:
break
try:
- os.write(1, b'OUT:'+buf)
+ wrap_error(os.write, 1, b'OUT:'+buf)
except OSError as ex:
os.write(2, b'ERR:' + ex.__class__.__name__.encode('ascii'))
diff --git a/tests/test_asyncio.py b/tests/test_asyncio.py
new file mode 100644
index 0000000..39d9e1a
--- /dev/null
+++ b/tests/test_asyncio.py
@@ -0,0 +1,141 @@
+from trollius import test_utils
+from trollius import From, Return
+import trollius
+import trollius.coroutines
+import unittest
+
+try:
+ import asyncio
+except ImportError:
+ from trollius.test_utils import SkipTest
+ raise SkipTest('need asyncio')
+
+
+@asyncio.coroutine
+def asyncio_noop(value):
+ yield from []
+ return (value,)
+
+@asyncio.coroutine
+def asyncio_coroutine(coro, value):
+ res = yield from coro
+ return res + (value,)
+
+@trollius.coroutine
+def trollius_noop(value):
+ yield From(None)
+ raise Return((value,))
+
+@trollius.coroutine
+def trollius_coroutine(coro, value):
+ res = yield trollius.From(coro)
+ raise trollius.Return(res + (value,))
+
+
+class AsyncioTests(test_utils.TestCase):
+ def setUp(self):
+ policy = trollius.get_event_loop_policy()
+
+ asyncio.set_event_loop_policy(policy)
+ self.addCleanup(asyncio.set_event_loop_policy, None)
+
+ self.loop = policy.new_event_loop()
+ self.addCleanup(self.loop.close)
+ policy.set_event_loop(self.loop)
+
+ def test_policy(self):
+ self.assertIs(asyncio.get_event_loop(), self.loop)
+
+ def test_asyncio(self):
+ coro = asyncio_noop("asyncio")
+ res = self.loop.run_until_complete(coro)
+ self.assertEqual(res, ("asyncio",))
+
+ def test_asyncio_in_trollius(self):
+ coro1 = asyncio_noop(1)
+ coro2 = asyncio_coroutine(coro1, 2)
+ res = self.loop.run_until_complete(trollius_coroutine(coro2, 3))
+ self.assertEqual(res, (1, 2, 3))
+
+ def test_trollius_in_asyncio(self):
+ coro1 = trollius_noop(4)
+ coro2 = trollius_coroutine(coro1, 5)
+ res = self.loop.run_until_complete(asyncio_coroutine(coro2, 6))
+ self.assertEqual(res, (4, 5, 6))
+
+ def test_step_future(self):
+ old_debug = trollius.coroutines._DEBUG
+ try:
+ def step_future():
+ future = asyncio.Future()
+ self.loop.call_soon(future.set_result, "asyncio.Future")
+ return (yield from future)
+
+ # test in release mode
+ trollius.coroutines._DEBUG = False
+ result = self.loop.run_until_complete(step_future())
+ self.assertEqual(result, "asyncio.Future")
+
+ # test in debug mode
+ trollius.coroutines._DEBUG = True
+ result = self.loop.run_until_complete(step_future())
+ self.assertEqual(result, "asyncio.Future")
+ finally:
+ trollius.coroutines._DEBUG = old_debug
+
+ def test_async(self):
+ fut = asyncio.Future()
+ self.assertIs(fut._loop, self.loop)
+
+ fut2 = trollius.async(fut)
+ self.assertIs(fut2, fut)
+ self.assertIs(fut._loop, self.loop)
+
+ def test_wrap_future(self):
+ fut = asyncio.Future()
+ self.assertIs(trollius.wrap_future(fut), fut)
+
+ def test_run_until_complete(self):
+ fut = asyncio.Future()
+ fut.set_result("ok")
+ self.assertEqual(self.loop.run_until_complete(fut),
+ "ok")
+
+ def test_coroutine_decorator(self):
+ @trollius.coroutine
+ def asyncio_future(fut):
+ return fut
+
+ fut = asyncio.Future()
+ self.loop.call_soon(fut.set_result, 'ok')
+ res = self.loop.run_until_complete(asyncio_future(fut))
+ self.assertEqual(res, "ok")
+
+ def test_as_completed(self):
+ fut = asyncio.Future()
+ fut.set_result("ok")
+
+ with self.assertRaises(TypeError):
+ for f in trollius.as_completed(fut):
+ pass
+
+ @trollius.coroutine
+ def get_results(fut):
+ results = []
+ for f in trollius.as_completed([fut]):
+ res = yield trollius.From(f)
+ results.append(res)
+ raise trollius.Return(results)
+
+ results = self.loop.run_until_complete(get_results(fut))
+ self.assertEqual(results, ["ok"])
+
+ def test_gather(self):
+ fut = asyncio.Future()
+ fut.set_result("ok")
+ results = self.loop.run_until_complete(trollius.gather(fut))
+ self.assertEqual(results, ["ok"])
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/tests/test_base_events.py b/tests/test_base_events.py
index 4e5b6ca..83a1810 100644
--- a/tests/test_base_events.py
+++ b/tests/test_base_events.py
@@ -7,18 +7,22 @@ import socket
import sys
import time
import unittest
-from unittest import mock
-import asyncio
-from asyncio import base_events
-from asyncio import constants
-from asyncio import test_utils
+import trollius as asyncio
+from trollius import Return, From
+from trollius import base_events
+from trollius import constants
+from trollius import test_utils
+from trollius.py33_exceptions import BlockingIOError
+from trollius.test_utils import mock
+from trollius.time_monotonic import time_monotonic
+from trollius.test_support import assert_python_ok
try:
from test.script_helper import assert_python_ok
from test import support
except ImportError:
- from asyncio import test_support as support
- from asyncio.test_support import assert_python_ok
+ from trollius import test_support as support
+ from trollius.test_support import assert_python_ok
MOCK_ANY = mock.ANY
@@ -215,9 +219,9 @@ class BaseEventLoopTests(test_utils.TestCase):
f.cancel() # Don't complain about abandoned Future.
def test__run_once(self):
- h1 = asyncio.TimerHandle(time.monotonic() + 5.0, lambda: True, (),
+ h1 = asyncio.TimerHandle(time_monotonic() + 5.0, lambda: True, (),
self.loop)
- h2 = asyncio.TimerHandle(time.monotonic() + 10.0, lambda: True, (),
+ h2 = asyncio.TimerHandle(time_monotonic() + 10.0, lambda: True, (),
self.loop)
h1.cancel()
@@ -238,7 +242,7 @@ class BaseEventLoopTests(test_utils.TestCase):
self.loop.set_debug(False)
self.assertFalse(self.loop.get_debug())
- @mock.patch('asyncio.base_events.logger')
+ @mock.patch('trollius.base_events.logger')
def test__run_once_logging(self, m_logger):
def slow_select(timeout):
# Sleep a bit longer than a second to avoid timer resolution issues.
@@ -263,23 +267,21 @@ class BaseEventLoopTests(test_utils.TestCase):
self.assertEqual(logging.DEBUG, m_logger.log.call_args[0][0])
def test__run_once_schedule_handle(self):
- handle = None
- processed = False
+ non_local = {'handle': None, 'processed': False}
def cb(loop):
- nonlocal processed, handle
- processed = True
- handle = loop.call_soon(lambda: True)
+ non_local['processed'] = True
+ non_local['handle'] = loop.call_soon(lambda: True)
- h = asyncio.TimerHandle(time.monotonic() - 1, cb, (self.loop,),
+ h = asyncio.TimerHandle(time_monotonic() - 1, cb, (self.loop,),
self.loop)
self.loop._process_events = mock.Mock()
self.loop._scheduled.append(h)
self.loop._run_once()
- self.assertTrue(processed)
- self.assertEqual([handle], list(self.loop._ready))
+ self.assertTrue(non_local['processed'])
+ self.assertEqual([non_local['handle']], list(self.loop._ready))
def test__run_once_cancelled_event_cleanup(self):
self.loop._process_events = mock.Mock()
@@ -424,7 +426,7 @@ class BaseEventLoopTests(test_utils.TestCase):
1/0
# Test call_soon (events.Handle)
- with mock.patch('asyncio.base_events.logger') as log:
+ with mock.patch('trollius.base_events.logger') as log:
fut = asyncio.Future(loop=self.loop)
self.loop.call_soon(zero_error, fut)
fut.add_done_callback(lambda fut: self.loop.stop())
@@ -434,7 +436,7 @@ class BaseEventLoopTests(test_utils.TestCase):
exc_info=(ZeroDivisionError, MOCK_ANY, MOCK_ANY))
# Test call_later (events.TimerHandle)
- with mock.patch('asyncio.base_events.logger') as log:
+ with mock.patch('trollius.base_events.logger') as log:
fut = asyncio.Future(loop=self.loop)
self.loop.call_later(0.01, zero_error, fut)
fut.add_done_callback(lambda fut: self.loop.stop())
@@ -445,18 +447,21 @@ class BaseEventLoopTests(test_utils.TestCase):
def test_default_exc_handler_coro(self):
self.loop._process_events = mock.Mock()
+ self.loop.set_debug(True)
+ asyncio.set_event_loop(self.loop)
@asyncio.coroutine
def zero_error_coro():
- yield from asyncio.sleep(0.01, loop=self.loop)
+ yield From(asyncio.sleep(0.01, loop=self.loop))
1/0
# Test Future.__del__
- with mock.patch('asyncio.base_events.logger') as log:
+ with mock.patch('trollius.base_events.logger') as log:
fut = asyncio.async(zero_error_coro(), loop=self.loop)
fut.add_done_callback(lambda *args: self.loop.stop())
self.loop.run_forever()
fut = None # Trigger Future.__del__ or futures._TracebackLogger
+ support.gc_collect()
if PY34:
# Future.__del__ in Python 3.4 logs error with
# an actual exception context
@@ -499,7 +504,7 @@ class BaseEventLoopTests(test_utils.TestCase):
mock_handler.reset_mock()
self.loop.set_exception_handler(None)
- with mock.patch('asyncio.base_events.logger') as log:
+ with mock.patch('trollius.base_events.logger') as log:
run_loop()
log.error.assert_called_with(
test_utils.MockPattern(
@@ -522,7 +527,7 @@ class BaseEventLoopTests(test_utils.TestCase):
self.loop.set_exception_handler(handler)
- with mock.patch('asyncio.base_events.logger') as log:
+ with mock.patch('trollius.base_events.logger') as log:
run_loop()
log.error.assert_called_with(
test_utils.MockPattern(
@@ -530,7 +535,7 @@ class BaseEventLoopTests(test_utils.TestCase):
exc_info=(AttributeError, MOCK_ANY, MOCK_ANY))
def test_default_exc_handler_broken(self):
- _context = None
+ contexts = []
class Loop(base_events.BaseEventLoop):
@@ -538,8 +543,7 @@ class BaseEventLoopTests(test_utils.TestCase):
_process_events = mock.Mock()
def default_exception_handler(self, context):
- nonlocal _context
- _context = context
+ contexts.append(context)
# Simulates custom buggy "default_exception_handler"
raise ValueError('spam')
@@ -552,7 +556,7 @@ class BaseEventLoopTests(test_utils.TestCase):
loop.call_soon(zero_error)
loop._run_once()
- with mock.patch('asyncio.base_events.logger') as log:
+ with mock.patch('trollius.base_events.logger') as log:
run_loop()
log.error.assert_called_with(
'Exception in default exception handler',
@@ -561,9 +565,9 @@ class BaseEventLoopTests(test_utils.TestCase):
def custom_handler(loop, context):
raise ValueError('ham')
- _context = None
+ del contexts[:]
loop.set_exception_handler(custom_handler)
- with mock.patch('asyncio.base_events.logger') as log:
+ with mock.patch('trollius.base_events.logger') as log:
run_loop()
log.error.assert_called_with(
test_utils.MockPattern('Exception in default exception.*'
@@ -572,33 +576,25 @@ class BaseEventLoopTests(test_utils.TestCase):
# Check that original context was passed to default
# exception handler.
- self.assertIn('context', _context)
- self.assertIs(type(_context['context']['exception']),
+ context = contexts[0]
+ self.assertIn('context', context)
+ self.assertIs(type(context['context']['exception']),
ZeroDivisionError)
def test_env_var_debug(self):
code = '\n'.join((
- 'import asyncio',
- 'loop = asyncio.get_event_loop()',
+ 'import trollius',
+ 'loop = trollius.get_event_loop()',
'print(loop.get_debug())'))
- # Test with -E to not fail if the unit test was run with
- # PYTHONASYNCIODEBUG set to a non-empty string
- sts, stdout, stderr = assert_python_ok('-E', '-c', code)
- self.assertEqual(stdout.rstrip(), b'False')
-
sts, stdout, stderr = assert_python_ok('-c', code,
- PYTHONASYNCIODEBUG='')
+ TROLLIUSDEBUG='')
self.assertEqual(stdout.rstrip(), b'False')
sts, stdout, stderr = assert_python_ok('-c', code,
- PYTHONASYNCIODEBUG='1')
+ TROLLIUSDEBUG='1')
self.assertEqual(stdout.rstrip(), b'True')
- sts, stdout, stderr = assert_python_ok('-E', '-c', code,
- PYTHONASYNCIODEBUG='1')
- self.assertEqual(stdout.rstrip(), b'False')
-
def test_create_task(self):
class MyTask(asyncio.Task):
pass
@@ -732,7 +728,7 @@ class BaseEventLoopWithSelectorTests(test_utils.TestCase):
self.loop = asyncio.new_event_loop()
self.set_event_loop(self.loop)
- @mock.patch('asyncio.base_events.socket')
+ @mock.patch('trollius.base_events.socket')
def test_create_connection_multiple_errors(self, m_socket):
class MyProto(asyncio.Protocol):
@@ -740,36 +736,39 @@ class BaseEventLoopWithSelectorTests(test_utils.TestCase):
@asyncio.coroutine
def getaddrinfo(*args, **kw):
- yield from []
- return [(2, 1, 6, '', ('107.6.106.82', 80)),
- (2, 1, 6, '', ('107.6.106.82', 80))]
+ yield From(None)
+ raise Return([(2, 1, 6, '', ('107.6.106.82', 80)),
+ (2, 1, 6, '', ('107.6.106.82', 80))])
def getaddrinfo_task(*args, **kwds):
return asyncio.Task(getaddrinfo(*args, **kwds), loop=self.loop)
- idx = -1
- errors = ['err1', 'err2']
+ non_local = {
+ 'idx': -1,
+ 'errors': ['err1', 'err2'],
+ }
def _socket(*args, **kw):
- nonlocal idx, errors
- idx += 1
- raise OSError(errors[idx])
+ non_local['idx'] += 1
+ raise socket.error(non_local['errors'][non_local['idx']])
+ m_socket.error = socket.error
m_socket.socket = _socket
self.loop.getaddrinfo = getaddrinfo_task
coro = self.loop.create_connection(MyProto, 'example.com', 80)
- with self.assertRaises(OSError) as cm:
+ with self.assertRaises(socket.error) as cm:
self.loop.run_until_complete(coro)
self.assertEqual(str(cm.exception), 'Multiple exceptions: err1, err2')
- @mock.patch('asyncio.base_events.socket')
+ @mock.patch('trollius.base_events.socket')
def test_create_connection_timeout(self, m_socket):
# Ensure that the socket is closed on timeout
sock = mock.Mock()
m_socket.socket.return_value = sock
+ m_socket.error = socket.error
def getaddrinfo(*args, **kw):
fut = asyncio.Future(loop=self.loop)
@@ -798,7 +797,7 @@ class BaseEventLoopWithSelectorTests(test_utils.TestCase):
def test_create_connection_no_getaddrinfo(self):
@asyncio.coroutine
def getaddrinfo(*args, **kw):
- yield from []
+ yield From(None)
def getaddrinfo_task(*args, **kwds):
return asyncio.Task(getaddrinfo(*args, **kwds), loop=self.loop)
@@ -806,24 +805,24 @@ class BaseEventLoopWithSelectorTests(test_utils.TestCase):
self.loop.getaddrinfo = getaddrinfo_task
coro = self.loop.create_connection(MyProto, 'example.com', 80)
self.assertRaises(
- OSError, self.loop.run_until_complete, coro)
+ socket.error, self.loop.run_until_complete, coro)
def test_create_connection_connect_err(self):
@asyncio.coroutine
def getaddrinfo(*args, **kw):
- yield from []
- return [(2, 1, 6, '', ('107.6.106.82', 80))]
+ yield From(None)
+ raise Return([(2, 1, 6, '', ('107.6.106.82', 80))])
def getaddrinfo_task(*args, **kwds):
return asyncio.Task(getaddrinfo(*args, **kwds), loop=self.loop)
self.loop.getaddrinfo = getaddrinfo_task
self.loop.sock_connect = mock.Mock()
- self.loop.sock_connect.side_effect = OSError
+ self.loop.sock_connect.side_effect = socket.error
coro = self.loop.create_connection(MyProto, 'example.com', 80)
self.assertRaises(
- OSError, self.loop.run_until_complete, coro)
+ socket.error, self.loop.run_until_complete, coro)
def test_create_connection_multiple(self):
@asyncio.coroutine
@@ -836,22 +835,23 @@ class BaseEventLoopWithSelectorTests(test_utils.TestCase):
self.loop.getaddrinfo = getaddrinfo_task
self.loop.sock_connect = mock.Mock()
- self.loop.sock_connect.side_effect = OSError
+ self.loop.sock_connect.side_effect = socket.error
coro = self.loop.create_connection(
MyProto, 'example.com', 80, family=socket.AF_INET)
- with self.assertRaises(OSError):
+ with self.assertRaises(socket.error):
self.loop.run_until_complete(coro)
- @mock.patch('asyncio.base_events.socket')
+ @mock.patch('trollius.base_events.socket')
def test_create_connection_multiple_errors_local_addr(self, m_socket):
def bind(addr):
if addr[0] == '0.0.0.1':
- err = OSError('Err')
+ err = socket.error('Err')
err.strerror = 'Err'
raise err
+ m_socket.error = socket.error
m_socket.socket.return_value.bind = bind
@asyncio.coroutine
@@ -864,12 +864,12 @@ class BaseEventLoopWithSelectorTests(test_utils.TestCase):
self.loop.getaddrinfo = getaddrinfo_task
self.loop.sock_connect = mock.Mock()
- self.loop.sock_connect.side_effect = OSError('Err2')
+ self.loop.sock_connect.side_effect = socket.error('Err2')
coro = self.loop.create_connection(
MyProto, 'example.com', 80, family=socket.AF_INET,
local_addr=(None, 8080))
- with self.assertRaises(OSError) as cm:
+ with self.assertRaises(socket.error) as cm:
self.loop.run_until_complete(coro)
self.assertTrue(str(cm.exception).startswith('Multiple exceptions: '))
@@ -892,7 +892,7 @@ class BaseEventLoopWithSelectorTests(test_utils.TestCase):
MyProto, 'example.com', 80, family=socket.AF_INET,
local_addr=(None, 8080))
self.assertRaises(
- OSError, self.loop.run_until_complete, coro)
+ socket.error, self.loop.run_until_complete, coro)
def test_create_connection_ssl_server_hostname_default(self):
self.loop.getaddrinfo = mock.Mock()
@@ -905,7 +905,9 @@ class BaseEventLoopWithSelectorTests(test_utils.TestCase):
self.loop.getaddrinfo.side_effect = mock_getaddrinfo
self.loop.sock_connect = mock.Mock()
- self.loop.sock_connect.return_value = ()
+ f = asyncio.Future(loop=self.loop)
+ f.set_result(())
+ self.loop.sock_connect.return_value = f
self.loop._make_ssl_transport = mock.Mock()
class _SelectorTransportMock:
@@ -978,21 +980,20 @@ class BaseEventLoopWithSelectorTests(test_utils.TestCase):
def test_create_server_empty_host(self):
# if host is empty string use None instead
- host = object()
+ non_local = {'host': object()}
@asyncio.coroutine
def getaddrinfo(*args, **kw):
- nonlocal host
- host = args[0]
- yield from []
+ non_local['host'] = args[0]
+ yield From(None)
def getaddrinfo_task(*args, **kwds):
return asyncio.Task(getaddrinfo(*args, **kwds), loop=self.loop)
self.loop.getaddrinfo = getaddrinfo_task
fut = self.loop.create_server(MyProto, '', 0)
- self.assertRaises(OSError, self.loop.run_until_complete, fut)
- self.assertIsNone(host)
+ self.assertRaises(socket.error, self.loop.run_until_complete, fut)
+ self.assertIsNone(non_local['host'])
def test_create_server_host_port_sock(self):
fut = self.loop.create_server(
@@ -1004,18 +1005,25 @@ class BaseEventLoopWithSelectorTests(test_utils.TestCase):
self.assertRaises(ValueError, self.loop.run_until_complete, fut)
def test_create_server_no_getaddrinfo(self):
- getaddrinfo = self.loop.getaddrinfo = mock.Mock()
- getaddrinfo.return_value = []
+ @asyncio.coroutine
+ def getaddrinfo(*args, **kw):
+ raise Return([])
+
+ def getaddrinfo_task(*args, **kwds):
+ return asyncio.Task(getaddrinfo(*args, **kwds), loop=self.loop)
+
+ self.loop.getaddrinfo = getaddrinfo_task
f = self.loop.create_server(MyProto, '0.0.0.0', 0)
- self.assertRaises(OSError, self.loop.run_until_complete, f)
+ self.assertRaises(socket.error, self.loop.run_until_complete, f)
- @mock.patch('asyncio.base_events.socket')
+ @mock.patch('trollius.base_events.socket')
def test_create_server_cant_bind(self, m_socket):
- class Err(OSError):
+ class Err(socket.error):
strerror = 'error'
+ m_socket.error = socket.error
m_socket.getaddrinfo.return_value = [
(2, 1, 6, '', ('127.0.0.1', 10100))]
m_socket.getaddrinfo._is_coroutine = False
@@ -1023,18 +1031,19 @@ class BaseEventLoopWithSelectorTests(test_utils.TestCase):
m_sock.bind.side_effect = Err
fut = self.loop.create_server(MyProto, '0.0.0.0', 0)
- self.assertRaises(OSError, self.loop.run_until_complete, fut)
+ self.assertRaises(socket.error, self.loop.run_until_complete, fut)
self.assertTrue(m_sock.close.called)
- @mock.patch('asyncio.base_events.socket')
+ @mock.patch('trollius.base_events.socket')
def test_create_datagram_endpoint_no_addrinfo(self, m_socket):
+ m_socket.error = socket.error
m_socket.getaddrinfo.return_value = []
m_socket.getaddrinfo._is_coroutine = False
coro = self.loop.create_datagram_endpoint(
MyDatagramProto, local_addr=('localhost', 0))
self.assertRaises(
- OSError, self.loop.run_until_complete, coro)
+ socket.error, self.loop.run_until_complete, coro)
def test_create_datagram_endpoint_addr_error(self):
coro = self.loop.create_datagram_endpoint(
@@ -1048,29 +1057,31 @@ class BaseEventLoopWithSelectorTests(test_utils.TestCase):
def test_create_datagram_endpoint_connect_err(self):
self.loop.sock_connect = mock.Mock()
- self.loop.sock_connect.side_effect = OSError
+ self.loop.sock_connect.side_effect = socket.error
coro = self.loop.create_datagram_endpoint(
asyncio.DatagramProtocol, remote_addr=('127.0.0.1', 0))
self.assertRaises(
- OSError, self.loop.run_until_complete, coro)
+ socket.error, self.loop.run_until_complete, coro)
- @mock.patch('asyncio.base_events.socket')
+ @mock.patch('trollius.base_events.socket')
def test_create_datagram_endpoint_socket_err(self, m_socket):
+ m_socket.error = socket.error
m_socket.getaddrinfo = socket.getaddrinfo
- m_socket.socket.side_effect = OSError
+ m_socket.socket.side_effect = socket.error
coro = self.loop.create_datagram_endpoint(
asyncio.DatagramProtocol, family=socket.AF_INET)
self.assertRaises(
- OSError, self.loop.run_until_complete, coro)
+ socket.error, self.loop.run_until_complete, coro)
coro = self.loop.create_datagram_endpoint(
asyncio.DatagramProtocol, local_addr=('127.0.0.1', 0))
self.assertRaises(
- OSError, self.loop.run_until_complete, coro)
+ socket.error, self.loop.run_until_complete, coro)
- @unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 not supported or enabled')
+ @test_utils.skipUnless(support.IPV6_ENABLED,
+ 'IPv6 not supported or enabled')
def test_create_datagram_endpoint_no_matching_family(self):
coro = self.loop.create_datagram_endpoint(
asyncio.DatagramProtocol,
@@ -1078,14 +1089,15 @@ class BaseEventLoopWithSelectorTests(test_utils.TestCase):
self.assertRaises(
ValueError, self.loop.run_until_complete, coro)
- @mock.patch('asyncio.base_events.socket')
+ @mock.patch('trollius.base_events.socket')
def test_create_datagram_endpoint_setblk_err(self, m_socket):
- m_socket.socket.return_value.setblocking.side_effect = OSError
+ m_socket.error = socket.error
+ m_socket.socket.return_value.setblocking.side_effect = socket.error
coro = self.loop.create_datagram_endpoint(
asyncio.DatagramProtocol, family=socket.AF_INET)
self.assertRaises(
- OSError, self.loop.run_until_complete, coro)
+ socket.error, self.loop.run_until_complete, coro)
self.assertTrue(
m_socket.socket.return_value.close.called)
@@ -1094,12 +1106,13 @@ class BaseEventLoopWithSelectorTests(test_utils.TestCase):
asyncio.DatagramProtocol)
self.assertRaises(ValueError, self.loop.run_until_complete, coro)
- @mock.patch('asyncio.base_events.socket')
+ @mock.patch('trollius.base_events.socket')
def test_create_datagram_endpoint_cant_bind(self, m_socket):
- class Err(OSError):
+ class Err(socket.error):
pass
m_socket.AF_INET6 = socket.AF_INET6
+ m_socket.error = socket.error
m_socket.getaddrinfo = socket.getaddrinfo
m_sock = m_socket.socket.return_value = mock.Mock()
m_sock.bind.side_effect = Err
@@ -1117,11 +1130,11 @@ class BaseEventLoopWithSelectorTests(test_utils.TestCase):
self.loop._accept_connection(MyProto, sock)
self.assertFalse(sock.close.called)
- @mock.patch('asyncio.base_events.logger')
+ @mock.patch('trollius.base_events.logger')
def test_accept_connection_exception(self, m_log):
sock = mock.Mock()
sock.fileno.return_value = 10
- sock.accept.side_effect = OSError(errno.EMFILE, 'Too many open files')
+ sock.accept.side_effect = socket.error(errno.EMFILE, 'Too many open files')
self.loop.remove_reader = mock.Mock()
self.loop.call_later = mock.Mock()
@@ -1154,14 +1167,14 @@ class BaseEventLoopWithSelectorTests(test_utils.TestCase):
with self.assertRaises(TypeError):
self.loop.run_in_executor(None, func)
- @mock.patch('asyncio.base_events.logger')
+ @mock.patch('trollius.base_events.logger')
def test_log_slow_callbacks(self, m_logger):
def stop_loop_cb(loop):
loop.stop()
@asyncio.coroutine
def stop_loop_coro(loop):
- yield from ()
+ yield From(None)
loop.stop()
asyncio.set_event_loop(self.loop)
@@ -1171,14 +1184,16 @@ class BaseEventLoopWithSelectorTests(test_utils.TestCase):
# slow callback
self.loop.call_soon(stop_loop_cb, self.loop)
self.loop.run_forever()
- fmt, *args = m_logger.warning.call_args[0]
+ fmt = m_logger.warning.call_args[0][0]
+ args = m_logger.warning.call_args[0][1:]
self.assertRegex(fmt % tuple(args),
"^Executing <Handle.*stop_loop_cb.*> took .* seconds$")
# slow task
asyncio.async(stop_loop_coro(self.loop), loop=self.loop)
self.loop.run_forever()
- fmt, *args = m_logger.warning.call_args[0]
+ fmt = m_logger.warning.call_args[0][0]
+ args = m_logger.warning.call_args[0][1:]
self.assertRegex(fmt % tuple(args),
"^Executing <Task.*stop_loop_coro.*> took .* seconds$")
diff --git a/tests/test_events.py b/tests/test_events.py
index 0630292..c834af9 100644
--- a/tests/test_events.py
+++ b/tests/test_events.py
@@ -1,5 +1,6 @@
"""Tests for events.py."""
+import contextlib
import functools
import gc
import io
@@ -8,28 +9,41 @@ import platform
import re
import signal
import socket
-try:
- import ssl
-except ImportError:
- ssl = None
import subprocess
import sys
import threading
-import time
import errno
import unittest
-from unittest import mock
import weakref
+try:
+ import ssl
+except ImportError:
+ ssl = None
-import asyncio
-from asyncio import proactor_events
-from asyncio import selector_events
-from asyncio import test_utils
+try:
+ import concurrent
+except ImportError:
+ concurrent = None
+
+from trollius import Return, From
+from trollius import futures
+
+import trollius as asyncio
+from trollius import compat
+from trollius import events
+from trollius import proactor_events
+from trollius import selector_events
+from trollius import test_utils
+from trollius.py33_exceptions import (wrap_error,
+ BlockingIOError, ConnectionRefusedError,
+ FileNotFoundError)
+from trollius.test_utils import mock
+from trollius.time_monotonic import time_monotonic
try:
from test import support # find_unused_port, IPV6_ENABLED, TEST_HOME_DIR
except ImportError:
- from asyncio import test_support as support
+ from trollius import test_support as support
def data_file(filename):
@@ -94,7 +108,7 @@ class MyBaseProto(asyncio.Protocol):
class MyProto(MyBaseProto):
def connection_made(self, transport):
- super().connection_made(transport)
+ super(MyProto, self).connection_made(transport)
transport.write(b'GET / HTTP/1.0\r\nHost: example.com\r\n\r\n')
@@ -186,7 +200,7 @@ class MySubprocessProtocol(asyncio.SubprocessProtocol):
self.transport = None
self.connected = asyncio.Future(loop=loop)
self.completed = asyncio.Future(loop=loop)
- self.disconnects = {fd: asyncio.Future(loop=loop) for fd in range(3)}
+ self.disconnects = dict((fd, futures.Future(loop=loop)) for fd in range(3))
self.data = {1: b'', 2: b''}
self.returncode = None
self.got_data = {1: asyncio.Event(loop=loop),
@@ -220,10 +234,10 @@ class MySubprocessProtocol(asyncio.SubprocessProtocol):
self.returncode = self.transport.get_returncode()
-class EventLoopTestsMixin:
+class EventLoopTestsMixin(object):
def setUp(self):
- super().setUp()
+ super(EventLoopTestsMixin, self).setUp()
self.loop = self.create_event_loop()
self.set_event_loop(self.loop)
@@ -234,12 +248,12 @@ class EventLoopTestsMixin:
self.loop.close()
gc.collect()
- super().tearDown()
+ super(EventLoopTestsMixin, self).tearDown()
def test_run_until_complete_nesting(self):
@asyncio.coroutine
def coro1():
- yield
+ yield From(None)
@asyncio.coroutine
def coro2():
@@ -262,10 +276,13 @@ class EventLoopTestsMixin:
@asyncio.coroutine
def cb():
self.loop.stop()
- yield from asyncio.sleep(0.1, loop=self.loop)
+ yield From(asyncio.sleep(0.1, loop=self.loop))
+
task = cb()
self.assertRaises(RuntimeError,
self.loop.run_until_complete, task)
+ for task in asyncio.Task.all_tasks(loop=self.loop):
+ task._log_destroy_pending = False
def test_call_later(self):
results = []
@@ -275,9 +292,9 @@ class EventLoopTestsMixin:
self.loop.stop()
self.loop.call_later(0.1, callback, 'hello world')
- t0 = time.monotonic()
+ t0 = time_monotonic()
self.loop.run_forever()
- t1 = time.monotonic()
+ t1 = time_monotonic()
self.assertEqual(results, ['hello world'])
self.assertTrue(0.08 <= t1-t0 <= 0.8, t1-t0)
@@ -328,13 +345,14 @@ class EventLoopTestsMixin:
self.loop.run_forever()
self.assertEqual(results, ['hello', 'world'])
+ @test_utils.skipIf(concurrent is None, 'need concurrent.futures')
def test_run_in_executor(self):
def run(arg):
- return (arg, threading.get_ident())
+ return (arg, threading.current_thread().ident)
f2 = self.loop.run_in_executor(None, run, 'yo')
res, thread_id = self.loop.run_until_complete(f2)
self.assertEqual(res, 'yo')
- self.assertNotEqual(thread_id, threading.get_ident())
+ self.assertNotEqual(thread_id, threading.current_thread().ident)
def test_reader_callback(self):
r, w = test_utils.socketpair()
@@ -422,7 +440,7 @@ class EventLoopTestsMixin:
sock = socket.socket()
self._basetest_sock_client_ops(httpd, sock)
- @unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 'No UNIX Sockets')
+ @test_utils.skipUnless(hasattr(socket, 'AF_UNIX'), 'No UNIX Sockets')
def test_unix_sock_client_ops(self):
with test_utils.run_test_unix_server() as httpd:
sock = socket.socket(socket.AF_UNIX)
@@ -462,13 +480,12 @@ class EventLoopTestsMixin:
conn.close()
listener.close()
- @unittest.skipUnless(hasattr(signal, 'SIGKILL'), 'No SIGKILL')
+ @test_utils.skipUnless(hasattr(signal, 'SIGKILL'), 'No SIGKILL')
def test_add_signal_handler(self):
- caught = 0
+ non_local = {'caught': 0}
def my_handler():
- nonlocal caught
- caught += 1
+ non_local['caught'] += 1
# Check error behavior first.
self.assertRaises(
@@ -497,7 +514,7 @@ class EventLoopTestsMixin:
self.loop.add_signal_handler(signal.SIGINT, my_handler)
os.kill(os.getpid(), signal.SIGINT)
- test_utils.run_until(self.loop, lambda: caught)
+ test_utils.run_until(self.loop, lambda: non_local['caught'])
# Removing it should restore the default handler.
self.assertTrue(self.loop.remove_signal_handler(signal.SIGINT))
@@ -506,30 +523,28 @@ class EventLoopTestsMixin:
# Removing again returns False.
self.assertFalse(self.loop.remove_signal_handler(signal.SIGINT))
- @unittest.skipUnless(hasattr(signal, 'SIGALRM'), 'No SIGALRM')
+ @test_utils.skipUnless(hasattr(signal, 'SIGALRM'), 'No SIGALRM')
def test_signal_handling_while_selecting(self):
# Test with a signal actually arriving during a select() call.
- caught = 0
+ non_local = {'caught': 0}
def my_handler():
- nonlocal caught
- caught += 1
+ non_local['caught'] += 1
self.loop.stop()
self.loop.add_signal_handler(signal.SIGALRM, my_handler)
signal.setitimer(signal.ITIMER_REAL, 0.01, 0) # Send SIGALRM once.
self.loop.run_forever()
- self.assertEqual(caught, 1)
+ self.assertEqual(non_local['caught'], 1)
- @unittest.skipUnless(hasattr(signal, 'SIGALRM'), 'No SIGALRM')
+ @test_utils.skipUnless(hasattr(signal, 'SIGALRM'), 'No SIGALRM')
def test_signal_handling_args(self):
some_args = (42,)
- caught = 0
+ non_local = {'caught': 0}
def my_handler(*args):
- nonlocal caught
- caught += 1
+ non_local['caught'] += 1
self.assertEqual(args, some_args)
self.loop.add_signal_handler(signal.SIGALRM, my_handler, *some_args)
@@ -537,7 +552,7 @@ class EventLoopTestsMixin:
signal.setitimer(signal.ITIMER_REAL, 0.1, 0) # Send SIGALRM once.
self.loop.call_later(0.5, self.loop.stop)
self.loop.run_forever()
- self.assertEqual(caught, 1)
+ self.assertEqual(non_local['caught'], 1)
def _basetest_create_connection(self, connection_fut, check_sockname=True):
tr, pr = self.loop.run_until_complete(connection_fut)
@@ -556,7 +571,7 @@ class EventLoopTestsMixin:
lambda: MyProto(loop=self.loop), *httpd.address)
self._basetest_create_connection(conn_fut)
- @unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 'No UNIX Sockets')
+ @test_utils.skipUnless(hasattr(socket, 'AF_UNIX'), 'No UNIX Sockets')
def test_create_unix_connection(self):
# Issue #20682: On Mac OS X Tiger, getsockname() returns a
# zero-length address for UNIX socket.
@@ -613,8 +628,9 @@ class EventLoopTestsMixin:
self._basetest_create_ssl_connection(conn_fut, check_sockname)
# ssl.Purpose was introduced in Python 3.4
+ #if not asyncio.BACKPORT_SSL_CONTEXT:
if hasattr(ssl, 'Purpose'):
- def _dummy_ssl_create_context(purpose=ssl.Purpose.SERVER_AUTH, *,
+ def _dummy_ssl_create_context(purpose=ssl.Purpose.SERVER_AUTH,
cafile=None, capath=None,
cadata=None):
"""
@@ -631,17 +647,16 @@ class EventLoopTestsMixin:
self._basetest_create_ssl_connection(conn_fut, check_sockname)
self.assertEqual(m.call_count, 1)
- # With the real ssl.create_default_context(), certificate
- # validation will fail
- with self.assertRaises(ssl.SSLError) as cm:
- conn_fut = create_connection(ssl=True)
- # Ignore the "SSL handshake failed" log in debug mode
- with test_utils.disable_logger():
- self._basetest_create_ssl_connection(conn_fut, check_sockname)
-
- self.assertEqual(cm.exception.reason, 'CERTIFICATE_VERIFY_FAILED')
+ if not asyncio.BACKPORT_SSL_CONTEXT:
+ # With the real ssl.create_default_context(), certificate
+ # validation will fail
+ with self.assertRaises(ssl.SSLError) as cm:
+ conn_fut = create_connection(ssl=True)
+ # Ignore the "SSL handshake failed" log in debug mode
+ with test_utils.disable_logger():
+ self._basetest_create_ssl_connection(conn_fut, check_sockname)
- @unittest.skipIf(ssl is None, 'No ssl module')
+ @test_utils.skipIf(ssl is None, 'No ssl module')
def test_create_ssl_connection(self):
with test_utils.run_test_server(use_ssl=True) as httpd:
create_connection = functools.partial(
@@ -650,8 +665,8 @@ class EventLoopTestsMixin:
*httpd.address)
self._test_create_ssl_connection(httpd, create_connection)
- @unittest.skipIf(ssl is None, 'No ssl module')
- @unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 'No UNIX Sockets')
+ @test_utils.skipIf(ssl is None, 'No ssl module')
+ @test_utils.skipUnless(hasattr(socket, 'AF_UNIX'), 'No UNIX Sockets')
def test_create_ssl_unix_connection(self):
# Issue #20682: On Mac OS X Tiger, getsockname() returns a
# zero-length address for UNIX socket.
@@ -682,10 +697,11 @@ class EventLoopTestsMixin:
f = self.loop.create_connection(
lambda: MyProto(loop=self.loop),
*httpd.address, local_addr=httpd.address)
- with self.assertRaises(OSError) as cm:
+ with self.assertRaises(socket.error) as cm:
self.loop.run_until_complete(f)
self.assertEqual(cm.exception.errno, errno.EADDRINUSE)
- self.assertIn(str(httpd.address), cm.exception.strerror)
+ # FIXME: address missing from the message?
+ #self.assertIn(str(httpd.address), cm.exception.strerror)
def test_create_server(self):
proto = MyProto(self.loop)
@@ -732,7 +748,7 @@ class EventLoopTestsMixin:
return server, path
- @unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 'No UNIX Sockets')
+ @test_utils.skipUnless(hasattr(socket, 'AF_UNIX'), 'No UNIX Sockets')
def test_create_unix_server(self):
proto = MyProto(loop=self.loop)
server, path = self._make_unix_server(lambda: proto)
@@ -760,20 +776,23 @@ class EventLoopTestsMixin:
# close server
server.close()
- @unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 'No UNIX Sockets')
+ @test_utils.skipUnless(hasattr(socket, 'AF_UNIX'), 'No UNIX Sockets')
def test_create_unix_server_path_socket_error(self):
proto = MyProto(loop=self.loop)
sock = socket.socket()
- with sock:
+ try:
f = self.loop.create_unix_server(lambda: proto, '/test', sock=sock)
with self.assertRaisesRegex(ValueError,
'path and sock can not be specified '
'at the same time'):
self.loop.run_until_complete(f)
+ finally:
+ sock.close()
def _create_ssl_context(self, certfile, keyfile=None):
- sslcontext = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
- sslcontext.options |= ssl.OP_NO_SSLv2
+ sslcontext = asyncio.SSLContext(ssl.PROTOCOL_SSLv23)
+ if not asyncio.BACKPORT_SSL_CONTEXT:
+ sslcontext.options |= ssl.OP_NO_SSLv2
sslcontext.load_cert_chain(certfile, keyfile)
return sslcontext
@@ -792,7 +811,7 @@ class EventLoopTestsMixin:
sslcontext = self._create_ssl_context(certfile, keyfile)
return self._make_unix_server(factory, ssl=sslcontext)
- @unittest.skipIf(ssl is None, 'No ssl module')
+ @test_utils.skipIf(ssl is None, 'No ssl module')
def test_create_server_ssl(self):
proto = MyProto(loop=self.loop)
server, host, port = self._make_ssl_server(
@@ -826,8 +845,8 @@ class EventLoopTestsMixin:
# stop serving
server.close()
- @unittest.skipIf(ssl is None, 'No ssl module')
- @unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 'No UNIX Sockets')
+ @test_utils.skipIf(ssl is None, 'No ssl module')
+ @test_utils.skipUnless(hasattr(socket, 'AF_UNIX'), 'No UNIX Sockets')
def test_create_unix_server_ssl(self):
proto = MyProto(loop=self.loop)
server, path = self._make_ssl_unix_server(
@@ -857,13 +876,14 @@ class EventLoopTestsMixin:
# stop serving
server.close()
- @unittest.skipIf(ssl is None, 'No ssl module')
+ @test_utils.skipIf(ssl is None, 'No ssl module')
+ @test_utils.skipIf(asyncio.BACKPORT_SSL_CONTEXT, 'need ssl.SSLContext')
def test_create_server_ssl_verify_failed(self):
proto = MyProto(loop=self.loop)
server, host, port = self._make_ssl_server(
lambda: proto, SIGNED_CERTFILE)
- sslcontext_client = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
+ sslcontext_client = asyncio.SSLContext(ssl.PROTOCOL_SSLv23)
sslcontext_client.options |= ssl.OP_NO_SSLv2
sslcontext_client.verify_mode = ssl.CERT_REQUIRED
if hasattr(sslcontext_client, 'check_hostname'):
@@ -874,21 +894,22 @@ class EventLoopTestsMixin:
ssl=sslcontext_client)
with test_utils.disable_logger():
with self.assertRaisesRegex(ssl.SSLError,
- 'certificate verify failed '):
+ 'certificate verify failed'):
self.loop.run_until_complete(f_c)
# close connection
self.assertIsNone(proto.transport)
server.close()
- @unittest.skipIf(ssl is None, 'No ssl module')
- @unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 'No UNIX Sockets')
+ @test_utils.skipIf(ssl is None, 'No ssl module')
+ @test_utils.skipUnless(hasattr(socket, 'AF_UNIX'), 'No UNIX Sockets')
+ @test_utils.skipIf(asyncio.BACKPORT_SSL_CONTEXT, 'need ssl.SSLContext')
def test_create_unix_server_ssl_verify_failed(self):
proto = MyProto(loop=self.loop)
server, path = self._make_ssl_unix_server(
lambda: proto, SIGNED_CERTFILE)
- sslcontext_client = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
+ sslcontext_client = asyncio.SSLContext(ssl.PROTOCOL_SSLv23)
sslcontext_client.options |= ssl.OP_NO_SSLv2
sslcontext_client.verify_mode = ssl.CERT_REQUIRED
if hasattr(sslcontext_client, 'check_hostname'):
@@ -900,51 +921,61 @@ class EventLoopTestsMixin:
server_hostname='invalid')
with test_utils.disable_logger():
with self.assertRaisesRegex(ssl.SSLError,
- 'certificate verify failed '):
+ 'certificate verify failed'):
self.loop.run_until_complete(f_c)
# close connection
self.assertIsNone(proto.transport)
server.close()
- @unittest.skipIf(ssl is None, 'No ssl module')
+ @test_utils.skipIf(ssl is None, 'No ssl module')
def test_create_server_ssl_match_failed(self):
proto = MyProto(loop=self.loop)
server, host, port = self._make_ssl_server(
lambda: proto, SIGNED_CERTFILE)
- sslcontext_client = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
- sslcontext_client.options |= ssl.OP_NO_SSLv2
- sslcontext_client.verify_mode = ssl.CERT_REQUIRED
- sslcontext_client.load_verify_locations(
- cafile=SIGNING_CA)
+ sslcontext_client = asyncio.SSLContext(ssl.PROTOCOL_SSLv23)
+ if not asyncio.BACKPORT_SSL_CONTEXT:
+ sslcontext_client.options |= ssl.OP_NO_SSLv2
+ sslcontext_client.verify_mode = ssl.CERT_REQUIRED
+ sslcontext_client.load_verify_locations(
+ cafile=SIGNING_CA)
if hasattr(sslcontext_client, 'check_hostname'):
sslcontext_client.check_hostname = True
+ if compat.PY3:
+ err_msg = "hostname '127.0.0.1' doesn't match 'localhost'"
+ else:
+ # http://bugs.python.org/issue22861
+ err_msg = "hostname '127.0.0.1' doesn't match u'localhost'"
+
# incorrect server_hostname
- f_c = self.loop.create_connection(MyProto, host, port,
- ssl=sslcontext_client)
- with test_utils.disable_logger():
- with self.assertRaisesRegex(
- ssl.CertificateError,
- "hostname '127.0.0.1' doesn't match 'localhost'"):
- self.loop.run_until_complete(f_c)
+ if not asyncio.BACKPORT_SSL_CONTEXT:
+ f_c = self.loop.create_connection(MyProto, host, port,
+ ssl=sslcontext_client)
+ with test_utils.disable_logger():
+ with self.assertRaisesRegex(
+ ssl.CertificateError,
+ err_msg):
+ self.loop.run_until_complete(f_c)
+
+ # close connection
+ proto.transport.close()
- # close connection
- proto.transport.close()
server.close()
- @unittest.skipIf(ssl is None, 'No ssl module')
- @unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 'No UNIX Sockets')
+ @test_utils.skipIf(ssl is None, 'No ssl module')
+ @test_utils.skipUnless(hasattr(socket, 'AF_UNIX'), 'No UNIX Sockets')
def test_create_unix_server_ssl_verified(self):
proto = MyProto(loop=self.loop)
server, path = self._make_ssl_unix_server(
lambda: proto, SIGNED_CERTFILE)
- sslcontext_client = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
- sslcontext_client.options |= ssl.OP_NO_SSLv2
- sslcontext_client.verify_mode = ssl.CERT_REQUIRED
- sslcontext_client.load_verify_locations(cafile=SIGNING_CA)
+ sslcontext_client = asyncio.SSLContext(ssl.PROTOCOL_SSLv23)
+ if not asyncio.BACKPORT_SSL_CONTEXT:
+ sslcontext_client.options |= ssl.OP_NO_SSLv2
+ sslcontext_client.verify_mode = ssl.CERT_REQUIRED
+ sslcontext_client.load_verify_locations(cafile=SIGNING_CA)
if hasattr(sslcontext_client, 'check_hostname'):
sslcontext_client.check_hostname = True
@@ -959,37 +990,40 @@ class EventLoopTestsMixin:
client.close()
server.close()
- @unittest.skipIf(ssl is None, 'No ssl module')
+ @test_utils.skipIf(ssl is None, 'No ssl module')
def test_create_server_ssl_verified(self):
proto = MyProto(loop=self.loop)
server, host, port = self._make_ssl_server(
lambda: proto, SIGNED_CERTFILE)
- sslcontext_client = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
- sslcontext_client.options |= ssl.OP_NO_SSLv2
- sslcontext_client.verify_mode = ssl.CERT_REQUIRED
- sslcontext_client.load_verify_locations(cafile=SIGNING_CA)
+ sslcontext_client = asyncio.SSLContext(ssl.PROTOCOL_SSLv23)
+ if not asyncio.BACKPORT_SSL_CONTEXT:
+ sslcontext_client.options |= ssl.OP_NO_SSLv2
+ sslcontext_client.verify_mode = ssl.CERT_REQUIRED
+ sslcontext_client.load_verify_locations(cafile=SIGNING_CA)
if hasattr(sslcontext_client, 'check_hostname'):
sslcontext_client.check_hostname = True
- # Connection succeeds with correct CA and server hostname.
- f_c = self.loop.create_connection(MyProto, host, port,
- ssl=sslcontext_client,
- server_hostname='localhost')
- client, pr = self.loop.run_until_complete(f_c)
+ if not asyncio.BACKPORT_SSL_CONTEXT:
+ # Connection succeeds with correct CA and server hostname.
+ f_c = self.loop.create_connection(MyProto, host, port,
+ ssl=sslcontext_client,
+ server_hostname='localhost')
+ client, pr = self.loop.run_until_complete(f_c)
+
+ # close connection
+ proto.transport.close()
+ client.close()
- # close connection
- proto.transport.close()
- client.close()
server.close()
def test_create_server_sock(self):
- proto = asyncio.Future(loop=self.loop)
+ non_local = {'proto': asyncio.Future(loop=self.loop)}
class TestMyProto(MyProto):
def connection_made(self, transport):
- super().connection_made(transport)
- proto.set_result(self)
+ super(TestMyProto, self).connection_made(transport)
+ non_local['proto'].set_result(self)
sock_ob = socket.socket(type=socket.SOCK_STREAM)
sock_ob.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
@@ -1019,19 +1053,19 @@ class EventLoopTestsMixin:
host, port = sock.getsockname()
f = self.loop.create_server(MyProto, host=host, port=port)
- with self.assertRaises(OSError) as cm:
+ with self.assertRaises(socket.error) as cm:
self.loop.run_until_complete(f)
self.assertEqual(cm.exception.errno, errno.EADDRINUSE)
server.close()
- @unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 not supported or enabled')
+ @test_utils.skipUnless(support.IPV6_ENABLED, 'IPv6 not supported or enabled')
def test_create_server_dual_stack(self):
f_proto = asyncio.Future(loop=self.loop)
class TestMyProto(MyProto):
def connection_made(self, transport):
- super().connection_made(transport)
+ super(TestMyProto, self).connection_made(transport)
f_proto.set_result(self)
try_count = 0
@@ -1040,7 +1074,7 @@ class EventLoopTestsMixin:
port = support.find_unused_port()
f = self.loop.create_server(TestMyProto, host=None, port=port)
server = self.loop.run_until_complete(f)
- except OSError as ex:
+ except socket.error as ex:
if ex.errno == errno.EADDRINUSE:
try_count += 1
self.assertGreaterEqual(5, try_count)
@@ -1076,21 +1110,21 @@ class EventLoopTestsMixin:
client.connect(('127.0.0.1', port))
client.send(b'xxx')
client.close()
-
server.close()
client = socket.socket()
self.assertRaises(
- ConnectionRefusedError, client.connect, ('127.0.0.1', port))
+ ConnectionRefusedError, wrap_error, client.connect,
+ ('127.0.0.1', port))
client.close()
def test_create_datagram_endpoint(self):
class TestMyDatagramProto(MyDatagramProto):
def __init__(inner_self):
- super().__init__(loop=self.loop)
+ super(TestMyDatagramProto, inner_self).__init__(loop=self.loop)
def datagram_received(self, data, addr):
- super().datagram_received(data, addr)
+ super(TestMyDatagramProto, self).datagram_received(data, addr)
self.transport.sendto(b'resp:'+data, addr)
coro = self.loop.create_datagram_endpoint(
@@ -1142,7 +1176,7 @@ class EventLoopTestsMixin:
self.assertIsNone(loop._csock)
self.assertIsNone(loop._ssock)
- @unittest.skipUnless(sys.platform != 'win32',
+ @test_utils.skipUnless(sys.platform != 'win32',
"Don't support pipes for Windows")
def test_read_pipe(self):
proto = MyReadPipeProto(loop=self.loop)
@@ -1152,8 +1186,8 @@ class EventLoopTestsMixin:
@asyncio.coroutine
def connect():
- t, p = yield from self.loop.connect_read_pipe(
- lambda: proto, pipeobj)
+ t, p = yield From(self.loop.connect_read_pipe(
+ lambda: proto, pipeobj))
self.assertIs(p, proto)
self.assertIs(t, proto.transport)
self.assertEqual(['INITIAL', 'CONNECTED'], proto.state)
@@ -1177,7 +1211,7 @@ class EventLoopTestsMixin:
# extra info is available
self.assertIsNotNone(proto.transport.get_extra_info('pipe'))
- @unittest.skipUnless(sys.platform != 'win32',
+ @test_utils.skipUnless(sys.platform != 'win32',
"Don't support pipes for Windows")
# select, poll and kqueue don't support character devices (PTY) on Mac OS X
# older than 10.6 (Snow Leopard)
@@ -1192,8 +1226,8 @@ class EventLoopTestsMixin:
@asyncio.coroutine
def connect():
- t, p = yield from self.loop.connect_read_pipe(lambda: proto,
- master_read_obj)
+ t, p = yield From(self.loop.connect_read_pipe(lambda: proto,
+ master_read_obj))
self.assertIs(p, proto)
self.assertIs(t, proto.transport)
self.assertEqual(['INITIAL', 'CONNECTED'], proto.state)
@@ -1217,8 +1251,8 @@ class EventLoopTestsMixin:
# extra info is available
self.assertIsNotNone(proto.transport.get_extra_info('pipe'))
- @unittest.skipUnless(sys.platform != 'win32',
- "Don't support pipes for Windows")
+ @test_utils.skipUnless(sys.platform != 'win32',
+ "Don't support pipes for Windows")
def test_write_pipe(self):
rpipe, wpipe = os.pipe()
pipeobj = io.open(wpipe, 'wb', 1024)
@@ -1256,12 +1290,17 @@ class EventLoopTestsMixin:
self.loop.run_until_complete(proto.done)
self.assertEqual('CLOSED', proto.state)
- @unittest.skipUnless(sys.platform != 'win32',
+ @test_utils.skipUnless(sys.platform != 'win32',
"Don't support pipes for Windows")
def test_write_pipe_disconnect_on_close(self):
rsock, wsock = test_utils.socketpair()
rsock.setblocking(False)
- pipeobj = io.open(wsock.detach(), 'wb', 1024)
+ if hasattr(wsock, 'detach'):
+ wsock_fd = wsock.detach()
+ else:
+ # Python 2
+ wsock_fd = wsock.fileno()
+ pipeobj = io.open(wsock_fd, 'wb', 1024)
proto = MyWritePipeProto(loop=self.loop)
connect = self.loop.connect_write_pipe(lambda: proto, pipeobj)
@@ -1279,8 +1318,8 @@ class EventLoopTestsMixin:
self.loop.run_until_complete(proto.done)
self.assertEqual('CLOSED', proto.state)
- @unittest.skipUnless(sys.platform != 'win32',
- "Don't support pipes for Windows")
+ @test_utils.skipUnless(sys.platform != 'win32',
+ "Don't support pipes for Windows")
# select, poll and kqueue don't support character devices (PTY) on Mac OS X
# older than 10.6 (Snow Leopard)
@support.requires_mac_ver(10, 6)
@@ -1335,19 +1374,19 @@ class EventLoopTestsMixin:
def main():
try:
self.loop.call_soon(f.cancel)
- yield from f
+ yield From(f)
except asyncio.CancelledError:
res = 'cancelled'
else:
res = None
finally:
self.loop.stop()
- return res
+ raise Return(res)
- start = time.monotonic()
+ start = time_monotonic()
t = asyncio.Task(main(), loop=self.loop)
self.loop.run_forever()
- elapsed = time.monotonic() - start
+ elapsed = time_monotonic() - start
self.assertLess(elapsed, 0.1)
self.assertEqual(t.result(), 'cancelled')
@@ -1371,19 +1410,20 @@ class EventLoopTestsMixin:
@asyncio.coroutine
def wait():
loop = self.loop
- yield from asyncio.sleep(1e-2, loop=loop)
- yield from asyncio.sleep(1e-4, loop=loop)
- yield from asyncio.sleep(1e-6, loop=loop)
- yield from asyncio.sleep(1e-8, loop=loop)
- yield from asyncio.sleep(1e-10, loop=loop)
+ yield From(asyncio.sleep(1e-2, loop=loop))
+ yield From(asyncio.sleep(1e-4, loop=loop))
+ yield From(asyncio.sleep(1e-6, loop=loop))
+ yield From(asyncio.sleep(1e-8, loop=loop))
+ yield From(asyncio.sleep(1e-10, loop=loop))
self.loop.run_until_complete(wait())
- # The ideal number of call is 12, but on some platforms, the selector
+ # The ideal number of call is 22, but on some platforms, the selector
# may sleep at little bit less than timeout depending on the resolution
# of the clock used by the kernel. Tolerate a few useless calls on
# these platforms.
- self.assertLessEqual(self.loop._run_once_counter, 20,
- {'clock_resolution': self.loop._clock_resolution,
+ self.assertLessEqual(self.loop._run_once_counter, 30,
+ {'calls': self.loop._run_once_counter,
+ 'clock_resolution': self.loop._clock_resolution,
'selector': self.loop._selector.__class__.__name__})
def test_sock_connect_address(self):
@@ -1397,7 +1437,7 @@ class EventLoopTestsMixin:
for family, address in addresses:
for sock_type in (socket.SOCK_STREAM, socket.SOCK_DGRAM):
sock = socket.socket(family, sock_type)
- with sock:
+ with contextlib.closing(sock):
sock.setblocking(False)
connect = self.loop.sock_connect(sock, address)
with self.assertRaises(ValueError) as cm:
@@ -1471,7 +1511,7 @@ class EventLoopTestsMixin:
self.loop.add_signal_handler(signal.SIGTERM, func)
-class SubprocessTestsMixin:
+class SubprocessTestsMixin(object):
def check_terminated(self, returncode):
if sys.platform == 'win32':
@@ -1598,7 +1638,7 @@ class SubprocessTestsMixin:
self.loop.run_until_complete(proto.completed)
self.check_terminated(proto.returncode)
- @unittest.skipIf(sys.platform == 'win32', "Don't have SIGHUP")
+ @test_utils.skipIf(sys.platform == 'win32', "Don't have SIGHUP")
def test_subprocess_send_signal(self):
prog = os.path.join(os.path.dirname(__file__), 'echo.py')
@@ -1688,6 +1728,7 @@ class SubprocessTestsMixin:
self.loop.run_until_complete(proto.completed)
self.check_terminated(proto.returncode)
+ @test_utils.skipUnless(hasattr(os, 'setsid'), "need os.setsid()")
def test_subprocess_wait_no_same_group(self):
# start the new process in a new session
connect = self.loop.subprocess_shell(
@@ -1702,9 +1743,9 @@ class SubprocessTestsMixin:
def test_subprocess_exec_invalid_args(self):
@asyncio.coroutine
def connect(**kwds):
- yield from self.loop.subprocess_exec(
+ yield From(self.loop.subprocess_exec(
asyncio.SubprocessProtocol,
- 'pwd', **kwds)
+ 'pwd', **kwds))
with self.assertRaises(ValueError):
self.loop.run_until_complete(connect(universal_newlines=True))
@@ -1718,9 +1759,9 @@ class SubprocessTestsMixin:
def connect(cmd=None, **kwds):
if not cmd:
cmd = 'pwd'
- yield from self.loop.subprocess_shell(
+ yield From(self.loop.subprocess_shell(
asyncio.SubprocessProtocol,
- cmd, **kwds)
+ cmd, **kwds))
with self.assertRaises(ValueError):
self.loop.run_until_complete(connect(['ls', '-l']))
@@ -1780,18 +1821,18 @@ if sys.platform == 'win32':
def test_remove_fds_after_closing(self):
raise unittest.SkipTest("IocpEventLoop does not have add_reader()")
else:
- from asyncio import selectors
+ from trollius import selectors
class UnixEventLoopTestsMixin(EventLoopTestsMixin):
def setUp(self):
- super().setUp()
+ super(UnixEventLoopTestsMixin, self).setUp()
watcher = asyncio.SafeChildWatcher()
watcher.attach_loop(self.loop)
asyncio.set_child_watcher(watcher)
def tearDown(self):
asyncio.set_child_watcher(None)
- super().tearDown()
+ super(UnixEventLoopTestsMixin, self).tearDown()
if hasattr(selectors, 'KqueueSelector'):
class KqueueEventLoopTests(UnixEventLoopTestsMixin,
@@ -1807,16 +1848,16 @@ else:
@support.requires_mac_ver(10, 9)
# Issue #20667: KqueueEventLoopTests.test_read_pty_output()
# hangs on OpenBSD 5.5
- @unittest.skipIf(sys.platform.startswith('openbsd'),
- 'test hangs on OpenBSD')
+ @test_utils.skipIf(sys.platform.startswith('openbsd'),
+ 'test hangs on OpenBSD')
def test_read_pty_output(self):
- super().test_read_pty_output()
+ super(KqueueEventLoopTests, self).test_read_pty_output()
# kqueue doesn't support character devices (PTY) on Mac OS X older
# than 10.9 (Maverick)
@support.requires_mac_ver(10, 9)
def test_write_pty(self):
- super().test_write_pty()
+ super(KqueueEventLoopTests, self).test_write_pty()
if hasattr(selectors, 'EpollSelector'):
class EPollEventLoopTests(UnixEventLoopTestsMixin,
@@ -1941,7 +1982,7 @@ class HandleTests(test_utils.TestCase):
self.loop.get_debug.return_value = True
# simple function
- create_filename = __file__
+ create_filename = sys._getframe().f_code.co_filename
create_lineno = sys._getframe().f_lineno + 1
h = asyncio.Handle(noop, (1, 2), self.loop)
filename, lineno = test_utils.get_function_source(noop)
@@ -1968,38 +2009,30 @@ class HandleTests(test_utils.TestCase):
loop.set_debug(True)
self.set_event_loop(loop)
- def check_source_traceback(h):
- lineno = sys._getframe(1).f_lineno - 1
- self.assertIsInstance(h._source_traceback, list)
- self.assertEqual(h._source_traceback[-1][:3],
- (__file__,
- lineno,
- 'test_handle_source_traceback'))
-
# call_soon
h = loop.call_soon(noop)
- check_source_traceback(h)
+ self.check_soure_traceback(h._source_traceback, -1)
# call_soon_threadsafe
h = loop.call_soon_threadsafe(noop)
- check_source_traceback(h)
+ self.check_soure_traceback(h._source_traceback, -1)
# call_later
h = loop.call_later(0, noop)
- check_source_traceback(h)
+ self.check_soure_traceback(h._source_traceback, -1)
# call_at
h = loop.call_later(0, noop)
- check_source_traceback(h)
+ self.check_soure_traceback(h._source_traceback, -1)
-class TimerTests(unittest.TestCase):
+class TimerTests(test_utils.TestCase):
def setUp(self):
self.loop = mock.Mock()
def test_hash(self):
- when = time.monotonic()
+ when = time_monotonic()
h = asyncio.TimerHandle(when, lambda: False, (),
mock.Mock())
self.assertEqual(hash(h), hash(when))
@@ -2009,7 +2042,7 @@ class TimerTests(unittest.TestCase):
return args
args = (1, 2, 3)
- when = time.monotonic()
+ when = time_monotonic()
h = asyncio.TimerHandle(when, callback, args, mock.Mock())
self.assertIs(h._callback, callback)
self.assertIs(h._args, args)
@@ -2044,7 +2077,7 @@ class TimerTests(unittest.TestCase):
self.loop.get_debug.return_value = True
# simple function
- create_filename = __file__
+ create_filename = sys._getframe().f_code.co_filename
create_lineno = sys._getframe().f_lineno + 1
h = asyncio.TimerHandle(123, noop, (), self.loop)
filename, lineno = test_utils.get_function_source(noop)
@@ -2065,7 +2098,7 @@ class TimerTests(unittest.TestCase):
def callback(*args):
return args
- when = time.monotonic()
+ when = time_monotonic()
h1 = asyncio.TimerHandle(when, callback, (), self.loop)
h2 = asyncio.TimerHandle(when, callback, (), self.loop)
@@ -2102,7 +2135,7 @@ class TimerTests(unittest.TestCase):
self.assertIs(NotImplemented, h1.__ne__(h3))
-class AbstractEventLoopTests(unittest.TestCase):
+class AbstractEventLoopTests(test_utils.TestCase):
def test_not_implemented(self):
f = mock.Mock()
@@ -2115,13 +2148,16 @@ class AbstractEventLoopTests(unittest.TestCase):
NotImplementedError, loop.stop)
self.assertRaises(
NotImplementedError, loop.is_running)
- self.assertRaises(
- NotImplementedError, loop.is_closed)
+ # skip some tests if the AbstractEventLoop class comes from asyncio
+ # and the asyncio version (python version in fact) is older than 3.4.2
+ if events.asyncio is None or sys.version_info >= (3, 4, 2):
+ self.assertRaises(
+ NotImplementedError, loop.is_closed)
+ self.assertRaises(
+ NotImplementedError, loop.create_task, None)
self.assertRaises(
NotImplementedError, loop.close)
self.assertRaises(
- NotImplementedError, loop.create_task, None)
- self.assertRaises(
NotImplementedError, loop.call_later, None, None)
self.assertRaises(
NotImplementedError, loop.call_at, f, f)
@@ -2190,7 +2226,7 @@ class AbstractEventLoopTests(unittest.TestCase):
NotImplementedError, loop.set_debug, f)
-class ProtocolsAbsTests(unittest.TestCase):
+class ProtocolsAbsTests(test_utils.TestCase):
def test_empty(self):
f = mock.Mock()
@@ -2214,7 +2250,7 @@ class ProtocolsAbsTests(unittest.TestCase):
self.assertIsNone(sp.process_exited())
-class PolicyTests(unittest.TestCase):
+class PolicyTests(test_utils.TestCase):
def test_event_loop_policy(self):
policy = asyncio.AbstractEventLoopPolicy()
@@ -2257,7 +2293,7 @@ class PolicyTests(unittest.TestCase):
policy.set_event_loop(None)
self.assertRaises(RuntimeError, policy.get_event_loop)
- @mock.patch('asyncio.events.threading.current_thread')
+ @mock.patch('trollius.events.threading.current_thread')
def test_get_event_loop_thread(self, m_current_thread):
def f():
diff --git a/tests/test_futures.py b/tests/test_futures.py
index f9c3ad2..387e552 100644
--- a/tests/test_futures.py
+++ b/tests/test_futures.py
@@ -1,19 +1,26 @@
"""Tests for futures.py."""
-import concurrent.futures
+try:
+ import concurrent.futures
+except ImportError:
+ concurrent = None
import re
import sys
import threading
import unittest
-from unittest import mock
-import asyncio
-from asyncio import test_utils
+import trollius as asyncio
+from trollius import compat
+from trollius import test_utils
+from trollius.test_utils import mock
try:
from test import support # gc_collect
except ImportError:
- from asyncio import test_support as support
+ from trollius import test_support as support
+
+def get_thread_ident():
+ return threading.current_thread().ident
def _fakefunc(f):
return f
@@ -42,10 +49,6 @@ class FutureTests(test_utils.TestCase):
f = asyncio.Future()
self.assertIs(f._loop, self.loop)
- def test_constructor_positional(self):
- # Make sure Future doesn't accept a positional argument
- self.assertRaises(TypeError, asyncio.Future, 42)
-
def test_cancel(self):
f = asyncio.Future(loop=self.loop)
self.assertTrue(f.cancel())
@@ -89,24 +92,6 @@ class FutureTests(test_utils.TestCase):
f.set_exception(RuntimeError)
self.assertIsInstance(f.exception(), RuntimeError)
- def test_yield_from_twice(self):
- f = asyncio.Future(loop=self.loop)
-
- def fixture():
- yield 'A'
- x = yield from f
- yield 'B', x
- y = yield from f
- yield 'C', y
-
- g = fixture()
- self.assertEqual(next(g), 'A') # yield 'A'.
- self.assertEqual(next(g), f) # First yield from f.
- f.set_result(42)
- self.assertEqual(next(g), ('B', 42)) # yield 'B', x.
- # The second "yield from f" does not yield f.
- self.assertEqual(next(g), ('C', 42)) # yield 'C', y.
-
def test_future_repr(self):
self.loop.set_debug(True)
f_pending_debug = asyncio.Future(loop=self.loop)
@@ -138,7 +123,8 @@ class FutureTests(test_utils.TestCase):
def func_repr(func):
filename, lineno = test_utils.get_function_source(func)
- text = '%s() at %s:%s' % (func.__qualname__, filename, lineno)
+ func_name = getattr(func, '__qualname__', func.__name__)
+ text = '%s() at %s:%s' % (func_name, filename, lineno)
return re.escape(text)
f_one_callbacks = asyncio.Future(loop=self.loop)
@@ -197,32 +183,20 @@ class FutureTests(test_utils.TestCase):
newf_cancelled._copy_state(f_cancelled)
self.assertTrue(newf_cancelled.cancelled())
- def test_iter(self):
- fut = asyncio.Future(loop=self.loop)
-
- def coro():
- yield from fut
-
- def test():
- arg1, arg2 = coro()
-
- self.assertRaises(AssertionError, test)
- fut.cancel()
-
- @mock.patch('asyncio.base_events.logger')
+ @mock.patch('trollius.base_events.logger')
def test_tb_logger_abandoned(self, m_log):
fut = asyncio.Future(loop=self.loop)
del fut
self.assertFalse(m_log.error.called)
- @mock.patch('asyncio.base_events.logger')
+ @mock.patch('trollius.base_events.logger')
def test_tb_logger_result_unretrieved(self, m_log):
fut = asyncio.Future(loop=self.loop)
fut.set_result(42)
del fut
self.assertFalse(m_log.error.called)
- @mock.patch('asyncio.base_events.logger')
+ @mock.patch('trollius.base_events.logger')
def test_tb_logger_result_retrieved(self, m_log):
fut = asyncio.Future(loop=self.loop)
fut.set_result(42)
@@ -230,15 +204,18 @@ class FutureTests(test_utils.TestCase):
del fut
self.assertFalse(m_log.error.called)
- @mock.patch('asyncio.base_events.logger')
+ @mock.patch('trollius.base_events.logger')
def test_tb_logger_exception_unretrieved(self, m_log):
+ self.loop.set_debug(True)
+ asyncio.set_event_loop(self.loop)
fut = asyncio.Future(loop=self.loop)
fut.set_exception(RuntimeError('boom'))
del fut
test_utils.run_briefly(self.loop)
+ support.gc_collect()
self.assertTrue(m_log.error.called)
- @mock.patch('asyncio.base_events.logger')
+ @mock.patch('trollius.base_events.logger')
def test_tb_logger_exception_retrieved(self, m_log):
fut = asyncio.Future(loop=self.loop)
fut.set_exception(RuntimeError('boom'))
@@ -246,7 +223,7 @@ class FutureTests(test_utils.TestCase):
del fut
self.assertFalse(m_log.error.called)
- @mock.patch('asyncio.base_events.logger')
+ @mock.patch('trollius.base_events.logger')
def test_tb_logger_exception_result_retrieved(self, m_log):
fut = asyncio.Future(loop=self.loop)
fut.set_exception(RuntimeError('boom'))
@@ -254,32 +231,35 @@ class FutureTests(test_utils.TestCase):
del fut
self.assertFalse(m_log.error.called)
+ @test_utils.skipIf(concurrent is None, 'need concurrent.futures')
def test_wrap_future(self):
def run(arg):
- return (arg, threading.get_ident())
+ return (arg, get_thread_ident())
ex = concurrent.futures.ThreadPoolExecutor(1)
f1 = ex.submit(run, 'oi')
f2 = asyncio.wrap_future(f1, loop=self.loop)
res, ident = self.loop.run_until_complete(f2)
self.assertIsInstance(f2, asyncio.Future)
self.assertEqual(res, 'oi')
- self.assertNotEqual(ident, threading.get_ident())
+ self.assertNotEqual(ident, get_thread_ident())
def test_wrap_future_future(self):
f1 = asyncio.Future(loop=self.loop)
f2 = asyncio.wrap_future(f1)
self.assertIs(f1, f2)
- @mock.patch('asyncio.futures.events')
+ @test_utils.skipIf(concurrent is None, 'need concurrent.futures')
+ @mock.patch('trollius.futures.events')
def test_wrap_future_use_global_loop(self, m_events):
def run(arg):
- return (arg, threading.get_ident())
+ return (arg, get_thread_ident())
ex = concurrent.futures.ThreadPoolExecutor(1)
f1 = ex.submit(run, 'oi')
f2 = asyncio.wrap_future(f1)
self.assertIs(m_events.get_event_loop.return_value, f2._loop)
+ @test_utils.skipIf(concurrent is None, 'need concurrent.futures')
def test_wrap_future_cancel(self):
f1 = concurrent.futures.Future()
f2 = asyncio.wrap_future(f1, loop=self.loop)
@@ -288,6 +268,7 @@ class FutureTests(test_utils.TestCase):
self.assertTrue(f1.cancelled())
self.assertTrue(f2.cancelled())
+ @test_utils.skipIf(concurrent is None, 'need concurrent.futures')
def test_wrap_future_cancel2(self):
f1 = concurrent.futures.Future()
f2 = asyncio.wrap_future(f1, loop=self.loop)
@@ -302,14 +283,9 @@ class FutureTests(test_utils.TestCase):
self.loop.set_debug(True)
future = asyncio.Future(loop=self.loop)
- lineno = sys._getframe().f_lineno - 1
- self.assertIsInstance(future._source_traceback, list)
- self.assertEqual(future._source_traceback[-1][:3],
- (__file__,
- lineno,
- 'test_future_source_traceback'))
-
- @mock.patch('asyncio.base_events.logger')
+ self.check_soure_traceback(future._source_traceback, -1)
+
+ @mock.patch('trollius.base_events.logger')
def check_future_exception_never_retrieved(self, debug, m_log):
self.loop.set_debug(debug)
@@ -358,12 +334,16 @@ class FutureTests(test_utils.TestCase):
r'.*\n'
r'MemoryError$'
).format(filename=re.escape(frame[0]), lineno=frame[1])
- else:
+ elif compat.PY3:
regex = (r'^Future/Task exception was never retrieved\n'
r'Traceback \(most recent call last\):\n'
r'.*\n'
r'MemoryError$'
)
+ else:
+ regex = (r'^Future/Task exception was never retrieved\n'
+ r'MemoryError$'
+ )
m_log.error.assert_called_once_with(mock.ANY, exc_info=False)
message = m_log.error.call_args[0][0]
self.assertRegex(message, re.compile(regex, re.DOTALL))
diff --git a/tests/test_locks.py b/tests/test_locks.py
index dda4577..ec7dbba 100644
--- a/tests/test_locks.py
+++ b/tests/test_locks.py
@@ -1,11 +1,12 @@
"""Tests for lock.py"""
import unittest
-from unittest import mock
import re
-import asyncio
-from asyncio import test_utils
+import trollius as asyncio
+from trollius import From, Return
+from trollius import test_utils
+from trollius.test_utils import mock
STR_RGX_REPR = (
@@ -42,7 +43,7 @@ class LockTests(test_utils.TestCase):
@asyncio.coroutine
def acquire_lock():
- yield from lock
+ yield From(lock.acquire())
self.loop.run_until_complete(acquire_lock())
self.assertTrue(repr(lock).endswith('[locked]>'))
@@ -53,7 +54,8 @@ class LockTests(test_utils.TestCase):
@asyncio.coroutine
def acquire_lock():
- return (yield from lock)
+ yield From(lock.acquire())
+ raise Return(lock)
res = self.loop.run_until_complete(acquire_lock())
@@ -71,21 +73,21 @@ class LockTests(test_utils.TestCase):
@asyncio.coroutine
def c1(result):
- if (yield from lock.acquire()):
+ if (yield From(lock.acquire())):
result.append(1)
- return True
+ raise Return(True)
@asyncio.coroutine
def c2(result):
- if (yield from lock.acquire()):
+ if (yield From(lock.acquire())):
result.append(2)
- return True
+ raise Return(True)
@asyncio.coroutine
def c3(result):
- if (yield from lock.acquire()):
+ if (yield From(lock.acquire())):
result.append(3)
- return True
+ raise Return(True)
t1 = asyncio.Task(c1(result), loop=self.loop)
t2 = asyncio.Task(c2(result), loop=self.loop)
@@ -147,22 +149,22 @@ class LockTests(test_utils.TestCase):
@asyncio.coroutine
def lockit(name, blocker):
- yield from lock.acquire()
+ yield From(lock.acquire())
try:
if blocker is not None:
- yield from blocker
+ yield From(blocker)
finally:
lock.release()
fa = asyncio.Future(loop=self.loop)
ta = asyncio.Task(lockit('A', fa), loop=self.loop)
- test_utils.run_briefly(self.loop)
+ test_utils.run_briefly(self.loop, 2)
self.assertTrue(lock.locked())
tb = asyncio.Task(lockit('B', None), loop=self.loop)
- test_utils.run_briefly(self.loop)
+ test_utils.run_briefly(self.loop, 2)
self.assertEqual(len(lock._waiters), 1)
tc = asyncio.Task(lockit('C', None), loop=self.loop)
- test_utils.run_briefly(self.loop)
+ test_utils.run_briefly(self.loop, 2)
self.assertEqual(len(lock._waiters), 2)
# Create the race and check.
@@ -170,7 +172,7 @@ class LockTests(test_utils.TestCase):
fa.set_result(None)
tb.cancel()
self.assertTrue(lock._waiters[0].cancelled())
- test_utils.run_briefly(self.loop)
+ test_utils.run_briefly(self.loop, 2)
self.assertFalse(lock.locked())
self.assertTrue(ta.done())
self.assertTrue(tb.cancelled())
@@ -194,7 +196,7 @@ class LockTests(test_utils.TestCase):
@asyncio.coroutine
def acquire_lock():
- return (yield from lock)
+ raise Return((yield From(lock)))
with self.loop.run_until_complete(acquire_lock()):
self.assertTrue(lock.locked())
@@ -206,9 +208,9 @@ class LockTests(test_utils.TestCase):
@asyncio.coroutine
def acquire_lock():
- return (yield from lock)
+ raise Return((yield From(lock)))
- # This spells "yield from lock" outside a generator.
+ # This spells "yield From(lock)" outside a generator.
cm = self.loop.run_until_complete(acquire_lock())
with cm:
self.assertTrue(lock.locked())
@@ -228,7 +230,7 @@ class LockTests(test_utils.TestCase):
except RuntimeError as err:
self.assertEqual(
str(err),
- '"yield from" should be used as context manager expression')
+ '"yield" should be used as context manager expression')
self.assertFalse(lock.locked())
@@ -273,30 +275,30 @@ class EventTests(test_utils.TestCase):
@asyncio.coroutine
def c1(result):
- if (yield from ev.wait()):
+ if (yield From(ev.wait())):
result.append(1)
@asyncio.coroutine
def c2(result):
- if (yield from ev.wait()):
+ if (yield From(ev.wait())):
result.append(2)
@asyncio.coroutine
def c3(result):
- if (yield from ev.wait()):
+ if (yield From(ev.wait())):
result.append(3)
t1 = asyncio.Task(c1(result), loop=self.loop)
t2 = asyncio.Task(c2(result), loop=self.loop)
- test_utils.run_briefly(self.loop)
+ test_utils.run_briefly(self.loop, 2)
self.assertEqual([], result)
t3 = asyncio.Task(c3(result), loop=self.loop)
ev.set()
- test_utils.run_briefly(self.loop)
- self.assertEqual([3, 1, 2], result)
+ test_utils.run_briefly(self.loop, 2)
+ self.assertEqual([1, 2, 3], result)
self.assertTrue(t1.done())
self.assertIsNone(t1.result())
@@ -338,9 +340,9 @@ class EventTests(test_utils.TestCase):
@asyncio.coroutine
def c1(result):
- if (yield from ev.wait()):
+ if (yield From(ev.wait())):
result.append(1)
- return True
+ raise Return(True)
t = asyncio.Task(c1(result), loop=self.loop)
test_utils.run_briefly(self.loop)
@@ -386,56 +388,56 @@ class ConditionTests(test_utils.TestCase):
@asyncio.coroutine
def c1(result):
- yield from cond.acquire()
- if (yield from cond.wait()):
+ yield From(cond.acquire())
+ if (yield From(cond.wait())):
result.append(1)
- return True
+ raise Return(True)
@asyncio.coroutine
def c2(result):
- yield from cond.acquire()
- if (yield from cond.wait()):
+ yield From(cond.acquire())
+ if (yield From(cond.wait())):
result.append(2)
- return True
+ raise Return(True)
@asyncio.coroutine
def c3(result):
- yield from cond.acquire()
- if (yield from cond.wait()):
+ yield From(cond.acquire())
+ if (yield From(cond.wait())):
result.append(3)
- return True
+ raise Return(True)
t1 = asyncio.Task(c1(result), loop=self.loop)
t2 = asyncio.Task(c2(result), loop=self.loop)
t3 = asyncio.Task(c3(result), loop=self.loop)
- test_utils.run_briefly(self.loop)
+ test_utils.run_briefly(self.loop, 2)
self.assertEqual([], result)
self.assertFalse(cond.locked())
self.assertTrue(self.loop.run_until_complete(cond.acquire()))
cond.notify()
- test_utils.run_briefly(self.loop)
+ test_utils.run_briefly(self.loop, 2)
self.assertEqual([], result)
self.assertTrue(cond.locked())
cond.release()
- test_utils.run_briefly(self.loop)
+ test_utils.run_briefly(self.loop, 2)
self.assertEqual([1], result)
self.assertTrue(cond.locked())
cond.notify(2)
- test_utils.run_briefly(self.loop)
+ test_utils.run_briefly(self.loop, 2)
self.assertEqual([1], result)
self.assertTrue(cond.locked())
cond.release()
- test_utils.run_briefly(self.loop)
+ test_utils.run_briefly(self.loop, 2)
self.assertEqual([1, 2], result)
self.assertTrue(cond.locked())
cond.release()
- test_utils.run_briefly(self.loop)
+ test_utils.run_briefly(self.loop, 2)
self.assertEqual([1, 2, 3], result)
self.assertTrue(cond.locked())
@@ -475,11 +477,11 @@ class ConditionTests(test_utils.TestCase):
@asyncio.coroutine
def c1(result):
- yield from cond.acquire()
- if (yield from cond.wait_for(predicate)):
+ yield From(cond.acquire())
+ if (yield From(cond.wait_for(predicate))):
result.append(1)
cond.release()
- return True
+ raise Return(True)
t = asyncio.Task(c1(result), loop=self.loop)
@@ -520,27 +522,27 @@ class ConditionTests(test_utils.TestCase):
@asyncio.coroutine
def c1(result):
- yield from cond.acquire()
- if (yield from cond.wait()):
+ yield From(cond.acquire())
+ if (yield From(cond.wait())):
result.append(1)
cond.release()
- return True
+ raise Return(True)
@asyncio.coroutine
def c2(result):
- yield from cond.acquire()
- if (yield from cond.wait()):
+ yield From(cond.acquire())
+ if (yield From(cond.wait())):
result.append(2)
cond.release()
- return True
+ raise Return(True)
@asyncio.coroutine
def c3(result):
- yield from cond.acquire()
- if (yield from cond.wait()):
+ yield From(cond.acquire())
+ if (yield From(cond.wait())):
result.append(3)
cond.release()
- return True
+ raise Return(True)
t1 = asyncio.Task(c1(result), loop=self.loop)
t2 = asyncio.Task(c2(result), loop=self.loop)
@@ -552,14 +554,16 @@ class ConditionTests(test_utils.TestCase):
self.loop.run_until_complete(cond.acquire())
cond.notify(1)
cond.release()
- test_utils.run_briefly(self.loop)
+ # each coroutine requires 2 runs of the event loop
+ test_utils.run_briefly(self.loop, 2)
self.assertEqual([1], result)
self.loop.run_until_complete(cond.acquire())
cond.notify(1)
cond.notify(2048)
cond.release()
- test_utils.run_briefly(self.loop)
+ # each coroutine requires 2 runs of the event loop
+ test_utils.run_briefly(self.loop, 4)
self.assertEqual([1, 2, 3], result)
self.assertTrue(t1.done())
@@ -576,19 +580,19 @@ class ConditionTests(test_utils.TestCase):
@asyncio.coroutine
def c1(result):
- yield from cond.acquire()
- if (yield from cond.wait()):
+ yield From(cond.acquire())
+ if (yield From(cond.wait())):
result.append(1)
cond.release()
- return True
+ raise Return(True)
@asyncio.coroutine
def c2(result):
- yield from cond.acquire()
- if (yield from cond.wait()):
+ yield From(cond.acquire())
+ if (yield From(cond.wait())):
result.append(2)
cond.release()
- return True
+ raise Return(True)
t1 = asyncio.Task(c1(result), loop=self.loop)
t2 = asyncio.Task(c2(result), loop=self.loop)
@@ -599,7 +603,8 @@ class ConditionTests(test_utils.TestCase):
self.loop.run_until_complete(cond.acquire())
cond.notify_all()
cond.release()
- test_utils.run_briefly(self.loop)
+ # each coroutine requires 2 runs of the event loop
+ test_utils.run_briefly(self.loop, 4)
self.assertEqual([1, 2], result)
self.assertTrue(t1.done())
@@ -636,7 +641,7 @@ class ConditionTests(test_utils.TestCase):
@asyncio.coroutine
def acquire_cond():
- return (yield from cond)
+ raise Return((yield From(cond)))
with self.loop.run_until_complete(acquire_cond()):
self.assertTrue(cond.locked())
@@ -652,7 +657,7 @@ class ConditionTests(test_utils.TestCase):
except RuntimeError as err:
self.assertEqual(
str(err),
- '"yield from" should be used as context manager expression')
+ '"yield From" should be used as context manager expression')
self.assertFalse(cond.locked())
@@ -718,7 +723,8 @@ class SemaphoreTests(test_utils.TestCase):
@asyncio.coroutine
def acquire_lock():
- return (yield from sem)
+ yield From(sem.acquire())
+ raise Return(sem)
res = self.loop.run_until_complete(acquire_lock())
@@ -743,33 +749,34 @@ class SemaphoreTests(test_utils.TestCase):
@asyncio.coroutine
def c1(result):
- yield from sem.acquire()
+ yield From(sem.acquire())
result.append(1)
- return True
+ raise Return(True)
@asyncio.coroutine
def c2(result):
- yield from sem.acquire()
+ yield From(sem.acquire())
result.append(2)
- return True
+ raise Return(True)
@asyncio.coroutine
def c3(result):
- yield from sem.acquire()
+ yield From(sem.acquire())
result.append(3)
- return True
+ raise Return(True)
@asyncio.coroutine
def c4(result):
- yield from sem.acquire()
+ yield From(sem.acquire())
result.append(4)
- return True
+ raise Return(True)
t1 = asyncio.Task(c1(result), loop=self.loop)
t2 = asyncio.Task(c2(result), loop=self.loop)
t3 = asyncio.Task(c3(result), loop=self.loop)
- test_utils.run_briefly(self.loop)
+ # each coroutine requires 2 runs of the event loop
+ test_utils.run_briefly(self.loop, 2)
self.assertEqual([1], result)
self.assertTrue(sem.locked())
self.assertEqual(2, len(sem._waiters))
@@ -829,7 +836,7 @@ class SemaphoreTests(test_utils.TestCase):
@asyncio.coroutine
def acquire_lock():
- return (yield from sem)
+ raise Return((yield From(sem)))
with self.loop.run_until_complete(acquire_lock()):
self.assertFalse(sem.locked())
@@ -849,7 +856,7 @@ class SemaphoreTests(test_utils.TestCase):
except RuntimeError as err:
self.assertEqual(
str(err),
- '"yield from" should be used as context manager expression')
+ '"yield" should be used as context manager expression')
self.assertEqual(2, sem._value)
diff --git a/tests/test_proactor_events.py b/tests/test_proactor_events.py
index 9e9b41a..9801889 100644
--- a/tests/test_proactor_events.py
+++ b/tests/test_proactor_events.py
@@ -2,14 +2,15 @@
import socket
import unittest
-from unittest import mock
-import asyncio
-from asyncio.proactor_events import BaseProactorEventLoop
-from asyncio.proactor_events import _ProactorSocketTransport
-from asyncio.proactor_events import _ProactorWritePipeTransport
-from asyncio.proactor_events import _ProactorDuplexPipeTransport
-from asyncio import test_utils
+from trollius import test_utils
+from trollius.proactor_events import BaseProactorEventLoop
+from trollius.proactor_events import _ProactorDuplexPipeTransport
+from trollius.proactor_events import _ProactorSocketTransport
+from trollius.proactor_events import _ProactorWritePipeTransport
+from trollius.py33_exceptions import ConnectionAbortedError, ConnectionResetError
+from trollius.test_utils import mock
+import trollius as asyncio
class ProactorSocketTransportTests(test_utils.TestCase):
@@ -139,7 +140,7 @@ class ProactorSocketTransportTests(test_utils.TestCase):
self.loop._proactor.send.return_value.add_done_callback.\
assert_called_with(tr._loop_writing)
- @mock.patch('asyncio.proactor_events.logger')
+ @mock.patch('trollius.proactor_events.logger')
def test_loop_writing_err(self, m_log):
err = self.loop._proactor.send.side_effect = OSError()
tr = _ProactorSocketTransport(self.loop, self.sock, self.protocol)
@@ -213,7 +214,7 @@ class ProactorSocketTransportTests(test_utils.TestCase):
test_utils.run_briefly(self.loop)
self.assertFalse(self.protocol.connection_lost.called)
- @mock.patch('asyncio.base_events.logger')
+ @mock.patch('trollius.base_events.logger')
def test_fatal_error(self, m_logging):
tr = _ProactorSocketTransport(self.loop, self.sock, self.protocol)
tr._force_close = mock.Mock()
@@ -522,7 +523,7 @@ class BaseProactorEventLoopTests(test_utils.TestCase):
def test_process_events(self):
self.loop._process_events([])
- @mock.patch('asyncio.base_events.logger')
+ @mock.patch('trollius.base_events.logger')
def test_create_server(self, m_log):
pf = mock.Mock()
call_soon = self.loop.call_soon = mock.Mock()
diff --git a/tests/test_queues.py b/tests/test_queues.py
index 3d4ac51..3492bd5 100644
--- a/tests/test_queues.py
+++ b/tests/test_queues.py
@@ -1,10 +1,11 @@
"""Tests for queues.py"""
import unittest
-from unittest import mock
-import asyncio
-from asyncio import test_utils
+import trollius as asyncio
+from trollius import Return, From
+from trollius import test_utils
+from trollius.test_utils import mock
class _QueueTestBase(test_utils.TestCase):
@@ -32,7 +33,7 @@ class QueueBasicTests(_QueueTestBase):
q = asyncio.Queue(loop=loop)
self.assertTrue(fn(q).startswith('<Queue'), fn(q))
- id_is_present = hex(id(q)) in fn(q)
+ id_is_present = ("%x" % id(q)) in fn(q)
self.assertEqual(expect_id, id_is_present)
@asyncio.coroutine
@@ -41,7 +42,7 @@ class QueueBasicTests(_QueueTestBase):
# Start a task that waits to get.
asyncio.Task(q.get(), loop=loop)
# Let it start waiting.
- yield from asyncio.sleep(0.1, loop=loop)
+ yield From(asyncio.sleep(0.1, loop=loop))
self.assertTrue('_getters[1]' in fn(q))
# resume q.get coroutine to finish generator
q.put_nowait(0)
@@ -55,7 +56,7 @@ class QueueBasicTests(_QueueTestBase):
# Start a task that waits to put.
asyncio.Task(q.put(2), loop=loop)
# Let it start waiting.
- yield from asyncio.sleep(0.1, loop=loop)
+ yield From(asyncio.sleep(0.1, loop=loop))
self.assertTrue('_putters[1]' in fn(q))
# resume q.put coroutine to finish generator
q.get_nowait()
@@ -127,21 +128,22 @@ class QueueBasicTests(_QueueTestBase):
@asyncio.coroutine
def putter():
for i in range(3):
- yield from q.put(i)
+ yield From(q.put(i))
have_been_put.append(i)
- return True
+ raise Return(True)
@asyncio.coroutine
def test():
t = asyncio.Task(putter(), loop=loop)
- yield from asyncio.sleep(0.01, loop=loop)
+ yield From(None) # one extra iteration for the putter coroutine
+ yield From(asyncio.sleep(0.01, loop=loop))
# The putter is blocked after putting two items.
self.assertEqual([0, 1], have_been_put)
self.assertEqual(0, q.get_nowait())
# Let the putter resume and put last item.
- yield from asyncio.sleep(0.01, loop=loop)
+ yield From(asyncio.sleep(0.01, loop=loop))
self.assertEqual([0, 1, 2], have_been_put)
self.assertEqual(1, q.get_nowait())
self.assertEqual(2, q.get_nowait())
@@ -161,7 +163,8 @@ class QueueGetTests(_QueueTestBase):
@asyncio.coroutine
def queue_get():
- return (yield from q.get())
+ result = (yield From(q.get()))
+ raise Return(result)
res = self.loop.run_until_complete(queue_get())
self.assertEqual(1, res)
@@ -189,25 +192,24 @@ class QueueGetTests(_QueueTestBase):
q = asyncio.Queue(loop=loop)
started = asyncio.Event(loop=loop)
- finished = False
+ non_local = {'finished': False}
@asyncio.coroutine
def queue_get():
- nonlocal finished
started.set()
- res = yield from q.get()
- finished = True
- return res
+ res = yield From(q.get())
+ non_local['finished'] = True
+ raise Return(res)
@asyncio.coroutine
def queue_put():
loop.call_later(0.01, q.put_nowait, 1)
queue_get_task = asyncio.Task(queue_get(), loop=loop)
- yield from started.wait()
- self.assertFalse(finished)
- res = yield from queue_get_task
- self.assertTrue(finished)
- return res
+ yield From(started.wait())
+ self.assertFalse(non_local['finished'])
+ res = yield From(queue_get_task)
+ self.assertTrue(non_local['finished'])
+ raise Return(res)
res = loop.run_until_complete(queue_put())
self.assertEqual(1, res)
@@ -237,14 +239,16 @@ class QueueGetTests(_QueueTestBase):
@asyncio.coroutine
def queue_get():
- return (yield from asyncio.wait_for(q.get(), 0.051, loop=loop))
+ result = (yield From(asyncio.wait_for(q.get(), 0.051, loop=loop)))
+ raise Return(result)
@asyncio.coroutine
def test():
get_task = asyncio.Task(queue_get(), loop=loop)
- yield from asyncio.sleep(0.01, loop=loop) # let the task start
+ yield From(asyncio.sleep(0.01, loop=loop)) # let the task start
q.put_nowait(1)
- return (yield from get_task)
+ result = (yield From(get_task))
+ raise Return(result)
self.assertEqual(1, loop.run_until_complete(test()))
self.assertAlmostEqual(0.06, loop.time())
@@ -280,12 +284,13 @@ class QueuePutTests(_QueueTestBase):
@asyncio.coroutine
def queue_put():
# No maxsize, won't block.
- yield from q.put(1)
+ yield From(q.put(1))
self.loop.run_until_complete(queue_put())
def test_blocking_put_wait(self):
+ @asyncio.coroutine
def gen():
when = yield
self.assertAlmostEqual(0.01, when)
@@ -295,24 +300,24 @@ class QueuePutTests(_QueueTestBase):
q = asyncio.Queue(maxsize=1, loop=loop)
started = asyncio.Event(loop=loop)
- finished = False
+ non_local = {'finished': False}
@asyncio.coroutine
def queue_put():
- nonlocal finished
started.set()
- yield from q.put(1)
- yield from q.put(2)
- finished = True
+ yield From(q.put(1))
+ yield From(q.put(2))
+ non_local['finished'] = True
@asyncio.coroutine
def queue_get():
- loop.call_later(0.01, q.get_nowait)
queue_put_task = asyncio.Task(queue_put(), loop=loop)
- yield from started.wait()
- self.assertFalse(finished)
- yield from queue_put_task
- self.assertTrue(finished)
+ yield From(None)
+ loop.call_later(0.01, q.get_nowait)
+ yield From(started.wait())
+ self.assertFalse(non_local['finished'])
+ yield From(queue_put_task)
+ self.assertTrue(non_local['finished'])
loop.run_until_complete(queue_get())
self.assertAlmostEqual(0.01, loop.time())
@@ -337,8 +342,8 @@ class QueuePutTests(_QueueTestBase):
q = asyncio.Queue(maxsize=1.3, loop=self.loop)
@asyncio.coroutine
def queue_put():
- yield from q.put(1)
- yield from q.put(2)
+ yield From(q.put(1))
+ yield From(q.put(2))
self.assertTrue(q.full())
self.loop.run_until_complete(queue_put())
@@ -347,12 +352,13 @@ class QueuePutTests(_QueueTestBase):
@asyncio.coroutine
def queue_put():
- yield from q.put(1)
- return True
+ yield From(q.put(1))
+ raise Return(True)
@asyncio.coroutine
def test():
- return (yield from q.get())
+ result = (yield From(q.get()))
+ raise Return(result)
t = asyncio.Task(queue_put(), loop=self.loop)
self.assertEqual(1, self.loop.run_until_complete(test()))
@@ -419,7 +425,7 @@ class JoinableQueueTests(_QueueTestBase):
for i in range(100):
q.put_nowait(i)
- accumulator = 0
+ non_local = {'accumulator': 0}
# Two workers get items from the queue and call task_done after each.
# Join the queue and assert all items have been processed.
@@ -427,11 +433,9 @@ class JoinableQueueTests(_QueueTestBase):
@asyncio.coroutine
def worker():
- nonlocal accumulator
-
while running:
- item = yield from q.get()
- accumulator += item
+ item = yield From(q.get())
+ non_local['accumulator'] += item
q.task_done()
@asyncio.coroutine
@@ -439,11 +443,11 @@ class JoinableQueueTests(_QueueTestBase):
tasks = [asyncio.Task(worker(), loop=self.loop)
for index in range(2)]
- yield from q.join()
- return tasks
+ yield From(q.join())
+ raise Return(tasks)
tasks = self.loop.run_until_complete(test())
- self.assertEqual(sum(range(100)), accumulator)
+ self.assertEqual(sum(range(100)), non_local['accumulator'])
# close running generators
running = False
@@ -459,8 +463,8 @@ class JoinableQueueTests(_QueueTestBase):
@asyncio.coroutine
def join():
- yield from q.join()
- yield from q.join()
+ yield From(q.join())
+ yield From(q.join())
self.loop.run_until_complete(join())
diff --git a/tests/test_selector_events.py b/tests/test_selector_events.py
index 8eba56c..38a8c83 100644
--- a/tests/test_selector_events.py
+++ b/tests/test_selector_events.py
@@ -1,26 +1,42 @@
"""Tests for selector_events.py"""
import errno
-import gc
-import pprint
+#import gc
+#import pprint
import socket
import sys
import unittest
-from unittest import mock
try:
import ssl
except ImportError:
ssl = None
-
-import asyncio
-from asyncio import selectors
-from asyncio import test_utils
-from asyncio.selector_events import BaseSelectorEventLoop
-from asyncio.selector_events import _SelectorTransport
-from asyncio.selector_events import _SelectorSslTransport
-from asyncio.selector_events import _SelectorSocketTransport
-from asyncio.selector_events import _SelectorDatagramTransport
-
+else:
+ HAS_SNI = getattr(ssl, 'HAS_SNI', False)
+ from trollius.py3_ssl import SSLWantReadError, SSLWantWriteError
+
+import trollius as asyncio
+from trollius.py33_exceptions import (
+ BlockingIOError, InterruptedError,
+ ConnectionResetError, ConnectionRefusedError)
+from trollius import selectors
+from trollius import test_utils
+from trollius.selector_events import BaseSelectorEventLoop
+from trollius.selector_events import _SelectorDatagramTransport
+from trollius.selector_events import _SelectorSocketTransport
+from trollius.selector_events import _SelectorSslTransport
+from trollius.selector_events import _SelectorTransport
+from trollius.selector_events import _SSL_REQUIRES_SELECT
+from trollius.test_utils import mock
+
+if sys.version_info >= (3,):
+ UNICODE_STR = 'unicode'
+else:
+ UNICODE_STR = unicode('unicode')
+ try:
+ memoryview
+ except NameError:
+ # Python 2.6
+ memoryview = buffer
MOCK_ANY = mock.ANY
@@ -51,7 +67,7 @@ class BaseSelectorEventLoopTests(test_utils.TestCase):
transport = self.loop._make_socket_transport(m, asyncio.Protocol())
self.assertIsInstance(transport, _SelectorSocketTransport)
- @unittest.skipIf(ssl is None, 'No ssl module')
+ @test_utils.skipIf(ssl is None, 'No ssl module')
def test_make_ssl_transport(self):
m = mock.Mock()
self.loop.add_reader = mock.Mock()
@@ -64,7 +80,7 @@ class BaseSelectorEventLoopTests(test_utils.TestCase):
m, asyncio.Protocol(), m, waiter)
self.assertIsInstance(transport, _SelectorSslTransport)
- @mock.patch('asyncio.selector_events.ssl', None)
+ @mock.patch('trollius.selector_events.ssl', None)
def test_make_ssl_transport_without_ssl_error(self):
m = mock.Mock()
self.loop.add_reader = mock.Mock()
@@ -687,7 +703,7 @@ class SelectorTransportTests(test_utils.TestCase):
self.assertFalse(self.loop.readers)
self.assertEqual(1, self.loop.remove_reader_count[7])
- @mock.patch('asyncio.log.logger.error')
+ @mock.patch('trollius.log.logger.error')
def test_fatal_error(self, m_exc):
exc = OSError()
tr = _SelectorTransport(self.loop, self.sock, self.protocol, None)
@@ -857,7 +873,7 @@ class SelectorSocketTransportTests(test_utils.TestCase):
transport = _SelectorSocketTransport(
self.loop, self.sock, self.protocol)
transport.write(data)
- self.sock.send.assert_called_with(data)
+ self.sock.send.assert_called_with(b'data')
def test_write_no_data(self):
transport = _SelectorSocketTransport(
@@ -933,7 +949,7 @@ class SelectorSocketTransportTests(test_utils.TestCase):
self.loop.assert_writer(7, transport._write_ready)
self.assertEqual(list_to_buffer([b'data']), transport._buffer)
- @mock.patch('asyncio.selector_events.logger')
+ @mock.patch('trollius.selector_events.logger')
def test_write_exception(self, m_log):
err = self.sock.send.side_effect = OSError()
@@ -1051,7 +1067,7 @@ class SelectorSocketTransportTests(test_utils.TestCase):
err,
'Fatal write error on socket transport')
- @mock.patch('asyncio.base_events.logger')
+ @mock.patch('trollius.base_events.logger')
def test_write_ready_exception_and_close(self, m_log):
self.sock.send.side_effect = OSError()
remove_writer = self.loop.remove_writer = mock.Mock()
@@ -1089,7 +1105,7 @@ class SelectorSocketTransportTests(test_utils.TestCase):
tr.close()
-@unittest.skipIf(ssl is None, 'No ssl module')
+@test_utils.skipIf(ssl is None, 'No ssl module')
class SelectorSslTransportTests(test_utils.TestCase):
def setUp(self):
@@ -1123,14 +1139,14 @@ class SelectorSslTransportTests(test_utils.TestCase):
def test_on_handshake_reader_retry(self):
self.loop.set_debug(False)
- self.sslsock.do_handshake.side_effect = ssl.SSLWantReadError
+ self.sslsock.do_handshake.side_effect = SSLWantReadError
transport = _SelectorSslTransport(
self.loop, self.sock, self.protocol, self.sslcontext)
self.loop.assert_reader(1, transport._on_handshake, None)
def test_on_handshake_writer_retry(self):
self.loop.set_debug(False)
- self.sslsock.do_handshake.side_effect = ssl.SSLWantWriteError
+ self.sslsock.do_handshake.side_effect = SSLWantWriteError
transport = _SelectorSslTransport(
self.loop, self.sock, self.protocol, self.sslcontext)
self.loop.assert_writer(1, transport._on_handshake, None)
@@ -1198,7 +1214,7 @@ class SelectorSslTransportTests(test_utils.TestCase):
def test_write_str(self):
transport = self._make_one()
- self.assertRaises(TypeError, transport.write, 'str')
+ self.assertRaises(TypeError, transport.write, UNICODE_STR)
def test_write_closing(self):
transport = self._make_one()
@@ -1207,7 +1223,7 @@ class SelectorSslTransportTests(test_utils.TestCase):
transport.write(b'data')
self.assertEqual(transport._conn_lost, 2)
- @mock.patch('asyncio.selector_events.logger')
+ @mock.patch('trollius.selector_events.logger')
def test_write_exception(self, m_log):
transport = self._make_one()
transport._conn_lost = 1
@@ -1219,6 +1235,7 @@ class SelectorSslTransportTests(test_utils.TestCase):
transport.write(b'data')
m_log.warning.assert_called_with('socket.send() raised exception.')
+ @test_utils.skipIf(_SSL_REQUIRES_SELECT, 'buggy ssl with the workaround')
def test_read_ready_recv(self):
self.sslsock.recv.return_value = b'data'
transport = self._make_one()
@@ -1240,6 +1257,7 @@ class SelectorSslTransportTests(test_utils.TestCase):
self.loop.add_writer.assert_called_with(
transport._sock_fd, transport._write_ready)
+ @test_utils.skipIf(_SSL_REQUIRES_SELECT, 'buggy ssl with the workaround')
def test_read_ready_recv_eof(self):
self.sslsock.recv.return_value = b''
transport = self._make_one()
@@ -1248,6 +1266,7 @@ class SelectorSslTransportTests(test_utils.TestCase):
transport.close.assert_called_with()
self.protocol.eof_received.assert_called_with()
+ @test_utils.skipIf(_SSL_REQUIRES_SELECT, 'buggy ssl with the workaround')
def test_read_ready_recv_conn_reset(self):
err = self.sslsock.recv.side_effect = ConnectionResetError()
transport = self._make_one()
@@ -1256,8 +1275,9 @@ class SelectorSslTransportTests(test_utils.TestCase):
transport._read_ready()
transport._force_close.assert_called_with(err)
+ @test_utils.skipIf(_SSL_REQUIRES_SELECT, 'buggy ssl with the workaround')
def test_read_ready_recv_retry(self):
- self.sslsock.recv.side_effect = ssl.SSLWantReadError
+ self.sslsock.recv.side_effect = SSLWantReadError
transport = self._make_one()
transport._read_ready()
self.assertTrue(self.sslsock.recv.called)
@@ -1271,10 +1291,11 @@ class SelectorSslTransportTests(test_utils.TestCase):
transport._read_ready()
self.assertFalse(self.protocol.data_received.called)
+ @test_utils.skipIf(_SSL_REQUIRES_SELECT, 'buggy ssl with the workaround')
def test_read_ready_recv_write(self):
self.loop.remove_reader = mock.Mock()
self.loop.add_writer = mock.Mock()
- self.sslsock.recv.side_effect = ssl.SSLWantWriteError
+ self.sslsock.recv.side_effect = SSLWantWriteError
transport = self._make_one()
transport._read_ready()
self.assertFalse(self.protocol.data_received.called)
@@ -1284,6 +1305,7 @@ class SelectorSslTransportTests(test_utils.TestCase):
self.loop.add_writer.assert_called_with(
transport._sock_fd, transport._write_ready)
+ @test_utils.skipIf(_SSL_REQUIRES_SELECT, 'buggy ssl with the workaround')
def test_read_ready_recv_exc(self):
err = self.sslsock.recv.side_effect = OSError()
transport = self._make_one()
@@ -1347,7 +1369,7 @@ class SelectorSslTransportTests(test_utils.TestCase):
transport = self._make_one()
transport._buffer = list_to_buffer([b'data'])
- self.sslsock.send.side_effect = ssl.SSLWantWriteError
+ self.sslsock.send.side_effect = SSLWantWriteError
transport._write_ready()
self.assertEqual(list_to_buffer([b'data']), transport._buffer)
@@ -1360,7 +1382,7 @@ class SelectorSslTransportTests(test_utils.TestCase):
transport._buffer = list_to_buffer([b'data'])
self.loop.remove_writer = mock.Mock()
- self.sslsock.send.side_effect = ssl.SSLWantReadError
+ self.sslsock.send.side_effect = SSLWantReadError
transport._write_ready()
self.assertFalse(self.protocol.data_received.called)
self.assertTrue(transport._write_wants_read)
@@ -1408,7 +1430,7 @@ class SelectorSslTransportTests(test_utils.TestCase):
self.assertEqual(tr._conn_lost, 1)
self.assertEqual(1, self.loop.remove_reader_count[1])
- @unittest.skipIf(ssl is None, 'No SSL support')
+ @test_utils.skipIf(ssl is None, 'No SSL support')
def test_server_hostname(self):
_SelectorSslTransport(
self.loop, self.sock, self.protocol, self.sslcontext,
@@ -1418,9 +1440,9 @@ class SelectorSslTransportTests(test_utils.TestCase):
server_hostname='localhost')
-class SelectorSslWithoutSslTransportTests(unittest.TestCase):
+class SelectorSslWithoutSslTransportTests(test_utils.TestCase):
- @mock.patch('asyncio.selector_events.ssl', None)
+ @mock.patch('trollius.selector_events.ssl', None)
def test_ssl_transport_requires_ssl_module(self):
Mock = mock.Mock
with self.assertRaises(RuntimeError):
@@ -1503,7 +1525,7 @@ class SelectorDatagramTransportTests(test_utils.TestCase):
transport.sendto(data, ('0.0.0.0', 1234))
self.assertTrue(self.sock.sendto.called)
self.assertEqual(
- self.sock.sendto.call_args[0], (data, ('0.0.0.0', 1234)))
+ self.sock.sendto.call_args[0], (b'data', ('0.0.0.0', 1234)))
def test_sendto_no_data(self):
transport = _SelectorDatagramTransport(
@@ -1564,7 +1586,7 @@ class SelectorDatagramTransportTests(test_utils.TestCase):
self.assertEqual(
[(b'data', ('0.0.0.0', 12345))], list(transport._buffer))
- @mock.patch('asyncio.selector_events.logger')
+ @mock.patch('trollius.selector_events.logger')
def test_sendto_exception(self, m_log):
data = b'data'
err = self.sock.sendto.side_effect = RuntimeError()
@@ -1617,7 +1639,7 @@ class SelectorDatagramTransportTests(test_utils.TestCase):
def test_sendto_str(self):
transport = _SelectorDatagramTransport(
self.loop, self.sock, self.protocol)
- self.assertRaises(TypeError, transport.sendto, 'str', ())
+ self.assertRaises(TypeError, transport.sendto, UNICODE_STR, ())
def test_sendto_connected_addr(self):
transport = _SelectorDatagramTransport(
@@ -1720,7 +1742,7 @@ class SelectorDatagramTransportTests(test_utils.TestCase):
self.assertFalse(transport._fatal_error.called)
self.assertTrue(self.protocol.error_received.called)
- @mock.patch('asyncio.base_events.logger.error')
+ @mock.patch('trollius.base_events.logger.error')
def test_fatal_error_connected(self, m_exc):
transport = _SelectorDatagramTransport(
self.loop, self.sock, self.protocol, ('0.0.0.0', 1))
diff --git a/tests/test_selectors.py b/tests/test_selectors.py
index 9392962..a766f9f 100644
--- a/tests/test_selectors.py
+++ b/tests/test_selectors.py
@@ -1,9 +1,10 @@
"""Tests for selectors.py."""
import unittest
-from unittest import mock
-from asyncio import selectors
+from trollius import selectors
+from trollius import test_utils
+from trollius.test_utils import mock
class FakeSelector(selectors._BaseSelectorImpl):
@@ -13,7 +14,7 @@ class FakeSelector(selectors._BaseSelectorImpl):
raise NotImplementedError
-class _SelectorMappingTests(unittest.TestCase):
+class _SelectorMappingTests(test_utils.TestCase):
def test_len(self):
s = FakeSelector()
@@ -60,7 +61,7 @@ class _SelectorMappingTests(unittest.TestCase):
self.assertEqual(1, counter)
-class BaseSelectorTests(unittest.TestCase):
+class BaseSelectorTests(test_utils.TestCase):
def test_fileobj_to_fd(self):
self.assertEqual(10, selectors._fileobj_to_fd(10))
diff --git a/tests/test_streams.py b/tests/test_streams.py
index 73a375a..4f4a684 100644
--- a/tests/test_streams.py
+++ b/tests/test_streams.py
@@ -1,18 +1,21 @@
"""Tests for streams.py."""
import gc
+import io
import os
import socket
import sys
import unittest
-from unittest import mock
try:
import ssl
except ImportError:
ssl = None
-import asyncio
-from asyncio import test_utils
+import trollius as asyncio
+from trollius import Return, From
+from trollius import compat
+from trollius import test_utils
+from trollius.test_utils import mock
class StreamReaderTests(test_utils.TestCase):
@@ -29,9 +32,9 @@ class StreamReaderTests(test_utils.TestCase):
self.loop.close()
gc.collect()
- super().tearDown()
+ super(StreamReaderTests, self).tearDown()
- @mock.patch('asyncio.streams.events')
+ @mock.patch('trollius.streams.events')
def test_ctor_global_loop(self, m_events):
stream = asyncio.StreamReader()
self.assertIs(stream._loop, m_events.get_event_loop.return_value)
@@ -53,7 +56,7 @@ class StreamReaderTests(test_utils.TestCase):
loop=self.loop)
self._basetest_open_connection(conn_fut)
- @unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 'No UNIX Sockets')
+ @test_utils.skipUnless(hasattr(socket, 'AF_UNIX'), 'No UNIX Sockets')
def test_open_unix_connection(self):
with test_utils.run_test_unix_server() as httpd:
conn_fut = asyncio.open_unix_connection(httpd.address,
@@ -72,7 +75,7 @@ class StreamReaderTests(test_utils.TestCase):
writer.close()
- @unittest.skipIf(ssl is None, 'No ssl module')
+ @test_utils.skipIf(ssl is None, 'No ssl module')
def test_open_connection_no_loop_ssl(self):
with test_utils.run_test_server(use_ssl=True) as httpd:
conn_fut = asyncio.open_connection(
@@ -82,8 +85,8 @@ class StreamReaderTests(test_utils.TestCase):
self._basetest_open_connection_no_loop_ssl(conn_fut)
- @unittest.skipIf(ssl is None, 'No ssl module')
- @unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 'No UNIX Sockets')
+ @test_utils.skipIf(ssl is None, 'No ssl module')
+ @test_utils.skipUnless(hasattr(socket, 'AF_UNIX'), 'No UNIX Sockets')
def test_open_unix_connection_no_loop_ssl(self):
with test_utils.run_test_unix_server(use_ssl=True) as httpd:
conn_fut = asyncio.open_unix_connection(
@@ -109,7 +112,7 @@ class StreamReaderTests(test_utils.TestCase):
loop=self.loop)
self._basetest_open_connection_error(conn_fut)
- @unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 'No UNIX Sockets')
+ @test_utils.skipUnless(hasattr(socket, 'AF_UNIX'), 'No UNIX Sockets')
def test_open_unix_connection_error(self):
with test_utils.run_test_unix_server() as httpd:
conn_fut = asyncio.open_unix_connection(httpd.address,
@@ -417,7 +420,7 @@ class StreamReaderTests(test_utils.TestCase):
@asyncio.coroutine
def readline():
- yield from stream.readline()
+ yield From(stream.readline())
t1 = asyncio.Task(stream.readline(), loop=self.loop)
t2 = asyncio.Task(set_err(), loop=self.loop)
@@ -431,7 +434,7 @@ class StreamReaderTests(test_utils.TestCase):
@asyncio.coroutine
def read_a_line():
- yield from stream.readline()
+ yield From(stream.readline())
t = asyncio.Task(read_a_line(), loop=self.loop)
test_utils.run_briefly(self.loop)
@@ -452,7 +455,7 @@ class StreamReaderTests(test_utils.TestCase):
@asyncio.coroutine
def handle_client(self, client_reader, client_writer):
- data = yield from client_reader.readline()
+ data = yield From(client_reader.readline())
client_writer.write(data)
def start(self):
@@ -491,14 +494,14 @@ class StreamReaderTests(test_utils.TestCase):
@asyncio.coroutine
def client(addr):
- reader, writer = yield from asyncio.open_connection(
- *addr, loop=self.loop)
+ reader, writer = yield From(asyncio.open_connection(
+ *addr, loop=self.loop))
# send a line
writer.write(b"hello world!\n")
# read it back
- msgback = yield from reader.readline()
+ msgback = yield From(reader.readline())
writer.close()
- return msgback
+ raise Return(msgback)
# test the server variant with a coroutine as client handler
server = MyServer(self.loop)
@@ -516,7 +519,7 @@ class StreamReaderTests(test_utils.TestCase):
server.stop()
self.assertEqual(msg, b"hello world!\n")
- @unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 'No UNIX Sockets')
+ @test_utils.skipUnless(hasattr(socket, 'AF_UNIX'), 'No UNIX Sockets')
def test_start_unix_server(self):
class MyServer:
@@ -528,7 +531,7 @@ class StreamReaderTests(test_utils.TestCase):
@asyncio.coroutine
def handle_client(self, client_reader, client_writer):
- data = yield from client_reader.readline()
+ data = yield From(client_reader.readline())
client_writer.write(data)
def start(self):
@@ -559,14 +562,14 @@ class StreamReaderTests(test_utils.TestCase):
@asyncio.coroutine
def client(path):
- reader, writer = yield from asyncio.open_unix_connection(
- path, loop=self.loop)
+ reader, writer = yield From(asyncio.open_unix_connection(
+ path, loop=self.loop))
# send a line
writer.write(b"hello world!\n")
# read it back
- msgback = yield from reader.readline()
+ msgback = yield From(reader.readline())
writer.close()
- return msgback
+ raise Return(msgback)
# test the server variant with a coroutine as client handler
with test_utils.unix_socket_path() as path:
@@ -586,7 +589,7 @@ class StreamReaderTests(test_utils.TestCase):
server.stop()
self.assertEqual(msg, b"hello world!\n")
- @unittest.skipIf(sys.platform == 'win32', "Don't have pipes")
+ @test_utils.skipIf(sys.platform == 'win32', "Don't have pipes")
def test_read_all_from_pipe_reader(self):
# See Tulip issue 168. This test is derived from the example
# subprocess_attach_read_pipe.py, but we configure the
@@ -603,7 +606,7 @@ os.close(fd)
rfd, wfd = os.pipe()
args = [sys.executable, '-c', code, str(wfd)]
- pipe = open(rfd, 'rb', 0)
+ pipe = io.open(rfd, 'rb', 0)
reader = asyncio.StreamReader(loop=self.loop, limit=1)
protocol = asyncio.StreamReaderProtocol(reader, loop=self.loop)
transport, _ = self.loop.run_until_complete(
@@ -613,8 +616,11 @@ os.close(fd)
watcher.attach_loop(self.loop)
try:
asyncio.set_child_watcher(watcher)
+ kw = {'loop': self.loop}
+ if compat.PY3:
+ kw['pass_fds'] = set((wfd,))
proc = self.loop.run_until_complete(
- asyncio.create_subprocess_exec(*args, pass_fds={wfd}, loop=self.loop))
+ asyncio.create_subprocess_exec(*args, **kw))
self.loop.run_until_complete(proc.wait())
finally:
asyncio.set_child_watcher(None)
diff --git a/tests/test_subprocess.py b/tests/test_subprocess.py
index 08c8ac2..c65a313 100644
--- a/tests/test_subprocess.py
+++ b/tests/test_subprocess.py
@@ -1,50 +1,59 @@
+from trollius import subprocess
+from trollius import test_utils
+import trollius as asyncio
+import os
import signal
import sys
import unittest
-from unittest import mock
+from trollius import From, Return
+from trollius.test_utils import mock
+from trollius.py33_exceptions import BrokenPipeError, ConnectionResetError
-import asyncio
-from asyncio import subprocess
-from asyncio import test_utils
if sys.platform != 'win32':
- from asyncio import unix_events
+ from trollius import unix_events
+
try:
from test import support # PIPE_MAX_SIZE
except ImportError:
- from asyncio import test_support as support
+ from trollius import test_support as support
+
# Program blocking
PROGRAM_BLOCKED = [sys.executable, '-c', 'import time; time.sleep(3600)']
# Program copying input to output
-PROGRAM_CAT = [
- sys.executable, '-c',
- ';'.join(('import sys',
- 'data = sys.stdin.buffer.read()',
- 'sys.stdout.buffer.write(data)'))]
+if sys.version_info >= (3,):
+ PROGRAM_CAT = ';'.join(('import sys',
+ 'data = sys.stdin.buffer.read()',
+ 'sys.stdout.buffer.write(data)'))
+else:
+ PROGRAM_CAT = ';'.join(('import sys',
+ 'data = sys.stdin.read()',
+ 'sys.stdout.write(data)'))
+PROGRAM_CAT = [sys.executable, '-c', PROGRAM_CAT]
-class SubprocessMixin:
+class SubprocessMixin(object):
def test_stdin_stdout(self):
args = PROGRAM_CAT
@asyncio.coroutine
def run(data):
- proc = yield from asyncio.create_subprocess_exec(
- *args,
- stdin=subprocess.PIPE,
- stdout=subprocess.PIPE,
- loop=self.loop)
+ proc = yield From(asyncio.create_subprocess_exec(
+ *args,
+ stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE,
+ loop=self.loop))
# feed data
proc.stdin.write(data)
- yield from proc.stdin.drain()
+ yield From(proc.stdin.drain())
proc.stdin.close()
# get output and exitcode
- data = yield from proc.stdout.read()
- exitcode = yield from proc.wait()
- return (exitcode, data)
+ data = yield From(proc.stdout.read())
+ exitcode = yield From(proc.wait())
+ raise Return(exitcode, data)
task = run(b'some data')
task = asyncio.wait_for(task, 60.0, loop=self.loop)
@@ -57,13 +66,13 @@ class SubprocessMixin:
@asyncio.coroutine
def run(data):
- proc = yield from asyncio.create_subprocess_exec(
+ proc = yield From(asyncio.create_subprocess_exec(
*args,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
- loop=self.loop)
- stdout, stderr = yield from proc.communicate(data)
- return proc.returncode, stdout
+ loop=self.loop))
+ stdout, stderr = yield From(proc.communicate(data))
+ raise Return(proc.returncode, stdout)
task = run(b'some data')
task = asyncio.wait_for(task, 60.0, loop=self.loop)
@@ -78,10 +87,14 @@ class SubprocessMixin:
exitcode = self.loop.run_until_complete(proc.wait())
self.assertEqual(exitcode, 7)
+ @test_utils.skipUnless(hasattr(os, 'setsid'), "need os.setsid()")
def test_start_new_session(self):
+ def start_new_session():
+ os.setsid()
+
# start the new process in a new session
create = asyncio.create_subprocess_shell('exit 8',
- start_new_session=True,
+ preexec_fn=start_new_session,
loop=self.loop)
proc = self.loop.run_until_complete(create)
exitcode = self.loop.run_until_complete(proc.wait())
@@ -111,9 +124,13 @@ class SubprocessMixin:
else:
self.assertEqual(-signal.SIGTERM, returncode)
- @unittest.skipIf(sys.platform == 'win32', "Don't have SIGHUP")
+ @test_utils.skipIf(sys.platform == 'win32', "Don't have SIGHUP")
def test_send_signal(self):
- code = 'import time; print("sleeping", flush=True); time.sleep(3600)'
+ code = '; '.join((
+ 'import sys, time',
+ 'print("sleeping")',
+ 'sys.stdout.flush()',
+ 'time.sleep(3600)'))
args = [sys.executable, '-c', code]
create = asyncio.create_subprocess_exec(*args, loop=self.loop, stdout=subprocess.PIPE)
proc = self.loop.run_until_complete(create)
@@ -121,12 +138,12 @@ class SubprocessMixin:
@asyncio.coroutine
def send_signal(proc):
# basic synchronization to wait until the program is sleeping
- line = yield from proc.stdout.readline()
+ line = yield From(proc.stdout.readline())
self.assertEqual(line, b'sleeping\n')
proc.send_signal(signal.SIGHUP)
- returncode = (yield from proc.wait())
- return returncode
+ returncode = yield From(proc.wait())
+ raise Return(returncode)
returncode = self.loop.run_until_complete(send_signal(proc))
self.assertEqual(-signal.SIGHUP, returncode)
@@ -149,7 +166,7 @@ class SubprocessMixin:
@asyncio.coroutine
def write_stdin(proc, data):
proc.stdin.write(data)
- yield from proc.stdin.drain()
+ yield From(proc.stdin.drain())
coro = write_stdin(proc, large_data)
# drain() must raise BrokenPipeError or ConnectionResetError
@@ -177,22 +194,22 @@ class SubprocessMixin:
'sys.stdout.write("x" * %s)' % size,
'sys.stdout.flush()',
))
- proc = yield from asyncio.create_subprocess_exec(
+ proc = yield From(asyncio.create_subprocess_exec(
sys.executable, '-c', code,
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
limit=limit,
- loop=self.loop)
+ loop=self.loop))
stdout_transport = proc._transport.get_pipe_transport(1)
stdout_transport.pause_reading = mock.Mock()
stdout_transport.resume_reading = mock.Mock()
- stdout, stderr = yield from proc.communicate()
+ stdout, stderr = yield From(proc.communicate())
# The child process produced more than limit bytes of output,
# the stream reader transport should pause the protocol to not
# allocate too much memory.
- return (stdout, stdout_transport)
+ raise Return(stdout, stdout_transport)
# Issue #22685: Ensure that the stream reader pauses the protocol
# when the child process produces too much data
@@ -208,16 +225,16 @@ class SubprocessMixin:
@asyncio.coroutine
def len_message(message):
code = 'import sys; data = sys.stdin.read(); print(len(data))'
- proc = yield from asyncio.create_subprocess_exec(
+ proc = yield From(asyncio.create_subprocess_exec(
sys.executable, '-c', code,
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
close_fds=False,
- loop=self.loop)
- stdout, stderr = yield from proc.communicate(message)
- exitcode = yield from proc.wait()
- return (stdout, exitcode)
+ loop=self.loop))
+ stdout, stderr = yield From(proc.communicate(message))
+ exitcode = yield From(proc.wait())
+ raise Return(stdout, exitcode)
output, exitcode = self.loop.run_until_complete(len_message(b'abc'))
self.assertEqual(output.rstrip(), b'3')
@@ -245,7 +262,7 @@ if sys.platform != 'win32':
policy = asyncio.get_event_loop_policy()
policy.set_child_watcher(None)
self.loop.close()
- super().tearDown()
+ super(SubprocessWatcherMixin, self).tearDown()
class SubprocessSafeWatcherTests(SubprocessWatcherMixin,
test_utils.TestCase):
@@ -272,7 +289,7 @@ else:
policy = asyncio.get_event_loop_policy()
self.loop.close()
policy.set_event_loop(None)
- super().tearDown()
+ super(SubprocessProactorTests, self).tearDown()
if __name__ == '__main__':
diff --git a/tests/test_tasks.py b/tests/test_tasks.py
index 25b21dc..8d3f776 100644
--- a/tests/test_tasks.py
+++ b/tests/test_tasks.py
@@ -6,19 +6,23 @@ import sys
import types
import unittest
import weakref
-from unittest import mock
+
+import trollius as asyncio
+from trollius import From, Return
+from trollius import coroutines
+from trollius import test_support as support
+from trollius import test_utils
+from trollius.test_support import assert_python_ok
+from trollius.test_utils import mock
try:
from test import support # gc_collect
from test.script_helper import assert_python_ok
except ImportError:
- from asyncio import test_support as support
- from asyncio.test_support import assert_python_ok
-
-import asyncio
-from asyncio import coroutines
-from asyncio import test_utils
+ from trollius import test_support as support
+ from trollius.test_support import assert_python_ok
+PY33 = (sys.version_info >= (3, 3))
PY34 = (sys.version_info >= (3, 4))
PY35 = (sys.version_info >= (3, 5))
@@ -140,9 +144,14 @@ class TaskTests(test_utils.TestCase):
self.loop.set_debug(False)
@asyncio.coroutine
+ def noop():
+ yield From(None)
+ raise Return('abc')
+
+ @asyncio.coroutine
def notmuch():
- yield from []
- return 'abc'
+ yield From(noop())
+ raise Return('abc')
# test coroutine function
self.assertEqual(notmuch.__name__, 'notmuch')
@@ -156,7 +165,7 @@ class TaskTests(test_utils.TestCase):
# test coroutine object
gen = notmuch()
- if coroutines._DEBUG or PY35:
+ if PY35 or (coroutines._DEBUG and PY33):
coro_qualname = 'TaskTests.test_task_repr.<locals>.notmuch'
else:
coro_qualname = 'notmuch'
@@ -200,7 +209,7 @@ class TaskTests(test_utils.TestCase):
@asyncio.coroutine
def notmuch():
- # notmuch() function doesn't use yield from: it will be wrapped by
+ # notmuch() function doesn't use yield: it will be wrapped by
# @coroutine decorator
return 123
@@ -213,12 +222,15 @@ class TaskTests(test_utils.TestCase):
# test coroutine object
gen = notmuch()
- if coroutines._DEBUG or PY35:
+ if PY35 or coroutines._DEBUG:
# On Python >= 3.5, generators now inherit the name of the
# function, as expected, and have a qualified name (__qualname__
# attribute).
coro_name = 'notmuch'
- coro_qualname = 'TaskTests.test_task_repr_coro_decorator.<locals>.notmuch'
+ if PY35 or (coroutines._DEBUG and PY33):
+ coro_qualname = 'TaskTests.test_task_repr_coro_decorator.<locals>.notmuch'
+ else:
+ coro_qualname = 'notmuch'
else:
# On Python < 3.5, generators inherit the name of the code, not of
# the function. See: http://bugs.python.org/issue21205
@@ -265,7 +277,8 @@ class TaskTests(test_utils.TestCase):
@asyncio.coroutine
def wait_for(fut):
- return (yield from fut)
+ res = yield From(fut)
+ raise Return(res)
fut = asyncio.Future(loop=self.loop)
task = asyncio.Task(wait_for(fut), loop=self.loop)
@@ -279,9 +292,9 @@ class TaskTests(test_utils.TestCase):
def test_task_basics(self):
@asyncio.coroutine
def outer():
- a = yield from inner1()
- b = yield from inner2()
- return a+b
+ a = yield From(inner1())
+ b = yield From(inner2())
+ raise Return(a+b)
@asyncio.coroutine
def inner1():
@@ -305,10 +318,11 @@ class TaskTests(test_utils.TestCase):
@asyncio.coroutine
def task():
- yield from asyncio.sleep(10.0, loop=loop)
- return 12
+ yield From(asyncio.sleep(10.0, loop=loop))
+ raise Return(12)
t = asyncio.Task(task(), loop=loop)
+ test_utils.run_briefly(loop)
loop.call_soon(t.cancel)
with self.assertRaises(asyncio.CancelledError):
loop.run_until_complete(t)
@@ -319,9 +333,9 @@ class TaskTests(test_utils.TestCase):
def test_cancel_yield(self):
@asyncio.coroutine
def task():
- yield
- yield
- return 12
+ yield From(None)
+ yield From(None)
+ raise Return(12)
t = asyncio.Task(task(), loop=self.loop)
test_utils.run_briefly(self.loop) # start coro
@@ -337,8 +351,8 @@ class TaskTests(test_utils.TestCase):
@asyncio.coroutine
def task():
- yield from f
- return 12
+ yield From(f)
+ raise Return(12)
t = asyncio.Task(task(), loop=self.loop)
test_utils.run_briefly(self.loop) # start task
@@ -353,8 +367,8 @@ class TaskTests(test_utils.TestCase):
@asyncio.coroutine
def task():
- yield from f
- return 12
+ yield From(f)
+ raise Return(12)
t = asyncio.Task(task(), loop=self.loop)
test_utils.run_briefly(self.loop)
@@ -375,11 +389,11 @@ class TaskTests(test_utils.TestCase):
@asyncio.coroutine
def task():
- yield from fut1
+ yield From(fut1)
try:
- yield from fut2
+ yield From(fut2)
except asyncio.CancelledError:
- return 42
+ raise Return(42)
t = asyncio.Task(task(), loop=self.loop)
test_utils.run_briefly(self.loop)
@@ -400,13 +414,13 @@ class TaskTests(test_utils.TestCase):
@asyncio.coroutine
def task():
- yield from fut1
+ yield From(fut1)
try:
- yield from fut2
+ yield From(fut2)
except asyncio.CancelledError:
pass
- res = yield from fut3
- return res
+ res = yield From(fut3)
+ raise Return(res)
t = asyncio.Task(task(), loop=self.loop)
test_utils.run_briefly(self.loop)
@@ -433,8 +447,8 @@ class TaskTests(test_utils.TestCase):
t.cancel()
self.assertTrue(t._must_cancel) # White-box test.
# The sleep should be cancelled immediately.
- yield from asyncio.sleep(100, loop=loop)
- return 12
+ yield From(asyncio.sleep(100, loop=loop))
+ raise Return(12)
t = asyncio.Task(task(), loop=loop)
self.assertRaises(
@@ -456,17 +470,16 @@ class TaskTests(test_utils.TestCase):
loop = self.new_test_loop(gen)
- x = 0
+ non_local = {'x': 0}
waiters = []
@asyncio.coroutine
def task():
- nonlocal x
- while x < 10:
+ while non_local['x'] < 10:
waiters.append(asyncio.sleep(0.1, loop=loop))
- yield from waiters[-1]
- x += 1
- if x == 2:
+ yield From(waiters[-1])
+ non_local['x'] += 1
+ if non_local['x'] == 3:
loop.stop()
t = asyncio.Task(task(), loop=loop)
@@ -475,7 +488,7 @@ class TaskTests(test_utils.TestCase):
self.assertEqual(str(cm.exception),
'Event loop stopped before Future completed.')
self.assertFalse(t.done())
- self.assertEqual(x, 2)
+ self.assertEqual(non_local['x'], 3)
self.assertAlmostEqual(0.3, loop.time())
# close generators
@@ -486,6 +499,7 @@ class TaskTests(test_utils.TestCase):
def test_wait_for(self):
+ @asyncio.coroutine
def gen():
when = yield
self.assertAlmostEqual(0.2, when)
@@ -495,27 +509,34 @@ class TaskTests(test_utils.TestCase):
loop = self.new_test_loop(gen)
- foo_running = None
+ non_local = {'foo_running': None}
@asyncio.coroutine
def foo():
- nonlocal foo_running
- foo_running = True
+ non_local['foo_running'] = True
try:
- yield from asyncio.sleep(0.2, loop=loop)
+ yield From(asyncio.sleep(0.2, loop=loop))
finally:
- foo_running = False
- return 'done'
+ non_local['foo_running'] = False
+ raise Return('done')
fut = asyncio.Task(foo(), loop=loop)
+ test_utils.run_briefly(loop)
with self.assertRaises(asyncio.TimeoutError):
loop.run_until_complete(asyncio.wait_for(fut, 0.1, loop=loop))
+
+ # Trollius issue #2: need to run the loop briefly to ensure that the
+ # cancellation is propagated to all tasks
+ waiter = asyncio.Future(loop=loop)
+ fut.add_done_callback(lambda f: waiter.set_result(True))
+ loop.run_until_complete(waiter)
+
self.assertTrue(fut.done())
# it should have been cancelled due to the timeout
self.assertTrue(fut.cancelled())
self.assertAlmostEqual(0.1, loop.time())
- self.assertEqual(foo_running, False)
+ self.assertEqual(non_local['foo_running'], False)
def test_wait_for_blocking(self):
loop = self.new_test_loop()
@@ -542,17 +563,24 @@ class TaskTests(test_utils.TestCase):
@asyncio.coroutine
def foo():
- yield from asyncio.sleep(0.2, loop=loop)
- return 'done'
+ yield From(asyncio.sleep(0.2, loop=loop))
+ raise Return('done')
asyncio.set_event_loop(loop)
try:
fut = asyncio.Task(foo(), loop=loop)
+ test_utils.run_briefly(loop)
with self.assertRaises(asyncio.TimeoutError):
loop.run_until_complete(asyncio.wait_for(fut, 0.01))
finally:
asyncio.set_event_loop(None)
+ # Trollius issue #2: need to run the loop briefly to ensure that the
+ # cancellation is propagated to all tasks
+ waiter = asyncio.Future(loop=loop)
+ fut.add_done_callback(lambda f: waiter.set_result(True))
+ loop.run_until_complete(waiter)
+
self.assertAlmostEqual(0.01, loop.time())
self.assertTrue(fut.done())
self.assertTrue(fut.cancelled())
@@ -588,10 +616,10 @@ class TaskTests(test_utils.TestCase):
@asyncio.coroutine
def foo():
- done, pending = yield from asyncio.wait([b, a], loop=loop)
+ done, pending = yield From(asyncio.wait([b, a], loop=loop))
self.assertEqual(done, set([a, b]))
self.assertEqual(pending, set())
- return 42
+ raise Return(42)
res = loop.run_until_complete(asyncio.Task(foo(), loop=loop))
self.assertEqual(res, 42)
@@ -618,10 +646,10 @@ class TaskTests(test_utils.TestCase):
@asyncio.coroutine
def foo():
- done, pending = yield from asyncio.wait([b, a])
+ done, pending = yield From(asyncio.wait([b, a]))
self.assertEqual(done, set([a, b]))
self.assertEqual(pending, set())
- return 42
+ raise Return(42)
asyncio.set_event_loop(loop)
res = loop.run_until_complete(
@@ -642,7 +670,7 @@ class TaskTests(test_utils.TestCase):
done, pending = self.loop.run_until_complete(task)
self.assertFalse(pending)
- self.assertEqual(set(f.result() for f in done), {'test', 'spam'})
+ self.assertEqual(set(f.result() for f in done), set(('test', 'spam')))
def test_wait_errors(self):
self.assertRaises(
@@ -676,8 +704,8 @@ class TaskTests(test_utils.TestCase):
loop=loop)
done, pending = loop.run_until_complete(task)
- self.assertEqual({b}, done)
- self.assertEqual({a}, pending)
+ self.assertEqual(set((b,)), done)
+ self.assertEqual(set((a,)), pending)
self.assertFalse(a.done())
self.assertTrue(b.done())
self.assertIsNone(b.result())
@@ -693,12 +721,12 @@ class TaskTests(test_utils.TestCase):
@asyncio.coroutine
def coro1():
- yield
+ yield From(None)
@asyncio.coroutine
def coro2():
- yield
- yield
+ yield From(None)
+ yield From(None)
a = asyncio.Task(coro1(), loop=self.loop)
b = asyncio.Task(coro2(), loop=self.loop)
@@ -708,7 +736,7 @@ class TaskTests(test_utils.TestCase):
loop=self.loop)
done, pending = self.loop.run_until_complete(task)
- self.assertEqual({a, b}, done)
+ self.assertEqual(set((a, b)), done)
self.assertTrue(a.done())
self.assertIsNone(a.result())
self.assertTrue(b.done())
@@ -737,8 +765,8 @@ class TaskTests(test_utils.TestCase):
loop=loop)
done, pending = loop.run_until_complete(task)
- self.assertEqual({b}, done)
- self.assertEqual({a}, pending)
+ self.assertEqual(set((b,)), done)
+ self.assertEqual(set((a,)), pending)
self.assertAlmostEqual(0, loop.time())
# move forward to close generator
@@ -761,7 +789,7 @@ class TaskTests(test_utils.TestCase):
@asyncio.coroutine
def exc():
- yield from asyncio.sleep(0.01, loop=loop)
+ yield From(asyncio.sleep(0.01, loop=loop))
raise ZeroDivisionError('err')
b = asyncio.Task(exc(), loop=loop)
@@ -769,8 +797,8 @@ class TaskTests(test_utils.TestCase):
loop=loop)
done, pending = loop.run_until_complete(task)
- self.assertEqual({b}, done)
- self.assertEqual({a}, pending)
+ self.assertEqual(set((b,)), done)
+ self.assertEqual(set((a,)), pending)
self.assertAlmostEqual(0.01, loop.time())
# move forward to close generator
@@ -792,14 +820,14 @@ class TaskTests(test_utils.TestCase):
@asyncio.coroutine
def sleeper():
- yield from asyncio.sleep(0.15, loop=loop)
+ yield From(asyncio.sleep(0.15, loop=loop))
raise ZeroDivisionError('really')
b = asyncio.Task(sleeper(), loop=loop)
@asyncio.coroutine
def foo():
- done, pending = yield from asyncio.wait([b, a], loop=loop)
+ done, pending = yield From(asyncio.wait([b, a], loop=loop))
self.assertEqual(len(done), 2)
self.assertEqual(pending, set())
errors = set(f for f in done if f.exception() is not None)
@@ -829,8 +857,8 @@ class TaskTests(test_utils.TestCase):
@asyncio.coroutine
def foo():
- done, pending = yield from asyncio.wait([b, a], timeout=0.11,
- loop=loop)
+ done, pending = yield From(asyncio.wait([b, a], timeout=0.11,
+ loop=loop))
self.assertEqual(done, set([a]))
self.assertEqual(pending, set([b]))
@@ -880,17 +908,16 @@ class TaskTests(test_utils.TestCase):
# disable "slow callback" warning
loop.slow_callback_duration = 1.0
completed = set()
- time_shifted = False
+ non_local = {'time_shifted': False}
@asyncio.coroutine
def sleeper(dt, x):
- nonlocal time_shifted
- yield from asyncio.sleep(dt, loop=loop)
+ yield From(asyncio.sleep(dt, loop=loop))
completed.add(x)
- if not time_shifted and 'a' in completed and 'b' in completed:
- time_shifted = True
+ if not non_local['time_shifted'] and 'a' in completed and 'b' in completed:
+ non_local['time_shifted'] = True
loop.advance_time(0.14)
- return x
+ raise Return(x)
a = sleeper(0.01, 'a')
b = sleeper(0.01, 'b')
@@ -900,8 +927,8 @@ class TaskTests(test_utils.TestCase):
def foo():
values = []
for f in asyncio.as_completed([b, c, a], loop=loop):
- values.append((yield from f))
- return values
+ values.append((yield From(f)))
+ raise Return(values)
res = loop.run_until_complete(asyncio.Task(foo(), loop=loop))
self.assertAlmostEqual(0.15, loop.time())
@@ -933,11 +960,11 @@ class TaskTests(test_utils.TestCase):
if values:
loop.advance_time(0.02)
try:
- v = yield from f
+ v = yield From(f)
values.append((1, v))
except asyncio.TimeoutError as exc:
values.append((2, exc))
- return values
+ raise Return(values)
res = loop.run_until_complete(asyncio.Task(foo(), loop=loop))
self.assertEqual(len(res), 2, res)
@@ -964,7 +991,7 @@ class TaskTests(test_utils.TestCase):
@asyncio.coroutine
def foo():
for f in asyncio.as_completed([a], timeout=1, loop=loop):
- v = yield from f
+ v = yield From(f)
self.assertEqual(v, 'a')
loop.run_until_complete(asyncio.Task(foo(), loop=loop))
@@ -980,7 +1007,7 @@ class TaskTests(test_utils.TestCase):
a = asyncio.sleep(0.05, 'a', loop=loop)
b = asyncio.sleep(0.10, 'b', loop=loop)
- fs = {a, b}
+ fs = set((a, b))
futs = list(asyncio.as_completed(fs, loop=loop))
self.assertEqual(len(futs), 2)
@@ -1005,12 +1032,12 @@ class TaskTests(test_utils.TestCase):
a = asyncio.sleep(0.05, 'a', loop=loop)
b = asyncio.sleep(0.05, 'b', loop=loop)
- fs = {a, b}
+ fs = set((a, b))
futs = list(asyncio.as_completed(fs, loop=loop))
self.assertEqual(len(futs), 2)
waiter = asyncio.wait(futs, loop=loop)
done, pending = loop.run_until_complete(waiter)
- self.assertEqual(set(f.result() for f in done), {'a', 'b'})
+ self.assertEqual(set(f.result() for f in done), set(('a', 'b')))
def test_as_completed_duplicate_coroutines(self):
@@ -1024,13 +1051,13 @@ class TaskTests(test_utils.TestCase):
c = coro('ham')
for f in asyncio.as_completed([c, c, coro('spam')],
loop=self.loop):
- result.append((yield from f))
- return result
+ result.append((yield From(f)))
+ raise Return(result)
fut = asyncio.Task(runner(), loop=self.loop)
self.loop.run_until_complete(fut)
result = fut.result()
- self.assertEqual(set(result), {'ham', 'spam'})
+ self.assertEqual(set(result), set(('ham', 'spam')))
self.assertEqual(len(result), 2)
def test_sleep(self):
@@ -1046,9 +1073,9 @@ class TaskTests(test_utils.TestCase):
@asyncio.coroutine
def sleeper(dt, arg):
- yield from asyncio.sleep(dt/2, loop=loop)
- res = yield from asyncio.sleep(dt/2, arg, loop=loop)
- return res
+ yield From(asyncio.sleep(dt/2, loop=loop))
+ res = yield From(asyncio.sleep(dt/2, arg, loop=loop))
+ raise Return(res)
t = asyncio.Task(sleeper(0.1, 'yeah'), loop=loop)
loop.run_until_complete(t)
@@ -1068,22 +1095,21 @@ class TaskTests(test_utils.TestCase):
t = asyncio.Task(asyncio.sleep(10.0, 'yeah', loop=loop),
loop=loop)
- handle = None
+ non_local = {'handle': None}
orig_call_later = loop.call_later
def call_later(delay, callback, *args):
- nonlocal handle
- handle = orig_call_later(delay, callback, *args)
- return handle
+ non_local['handle'] = orig_call_later(delay, callback, *args)
+ return non_local['handle']
loop.call_later = call_later
test_utils.run_briefly(loop)
- self.assertFalse(handle._cancelled)
+ self.assertFalse(non_local['handle']._cancelled)
t.cancel()
test_utils.run_briefly(loop)
- self.assertTrue(handle._cancelled)
+ self.assertTrue(non_local['handle']._cancelled)
def test_task_cancel_sleeping_task(self):
@@ -1098,18 +1124,18 @@ class TaskTests(test_utils.TestCase):
@asyncio.coroutine
def sleep(dt):
- yield from asyncio.sleep(dt, loop=loop)
+ yield From(asyncio.sleep(dt, loop=loop))
@asyncio.coroutine
def doit():
sleeper = asyncio.Task(sleep(5000), loop=loop)
loop.call_later(0.1, sleeper.cancel)
try:
- yield from sleeper
+ yield From(sleeper)
except asyncio.CancelledError:
- return 'cancelled'
+ raise Return('cancelled')
else:
- return 'slept in'
+ raise Return('slept in')
doer = doit()
self.assertEqual(loop.run_until_complete(doer), 'cancelled')
@@ -1120,7 +1146,7 @@ class TaskTests(test_utils.TestCase):
@asyncio.coroutine
def coro():
- yield from fut
+ yield From(fut)
task = asyncio.Task(coro(), loop=self.loop)
test_utils.run_briefly(self.loop)
@@ -1148,9 +1174,9 @@ class TaskTests(test_utils.TestCase):
def test_step_result(self):
@asyncio.coroutine
def notmuch():
- yield None
- yield 1
- return 'ko'
+ yield From(None)
+ yield From(1)
+ raise Return('ko')
self.assertRaises(
RuntimeError, self.loop.run_until_complete, notmuch())
@@ -1161,19 +1187,18 @@ class TaskTests(test_utils.TestCase):
class Fut(asyncio.Future):
def __init__(self, *args, **kwds):
self.cb_added = False
- super().__init__(*args, **kwds)
+ super(Fut, self).__init__(*args, **kwds)
def add_done_callback(self, fn):
self.cb_added = True
- super().add_done_callback(fn)
+ super(Fut, self).add_done_callback(fn)
fut = Fut(loop=self.loop)
- result = None
+ non_local = {'result': None}
@asyncio.coroutine
def wait_for_future():
- nonlocal result
- result = yield from fut
+ non_local['result'] = yield From(fut)
t = asyncio.Task(wait_for_future(), loop=self.loop)
test_utils.run_briefly(self.loop)
@@ -1182,7 +1207,7 @@ class TaskTests(test_utils.TestCase):
res = object()
fut.set_result(res)
test_utils.run_briefly(self.loop)
- self.assertIs(res, result)
+ self.assertIs(res, non_local['result'])
self.assertTrue(t.done())
self.assertIsNone(t.result())
@@ -1208,24 +1233,24 @@ class TaskTests(test_utils.TestCase):
@asyncio.coroutine
def sleeper():
- yield from asyncio.sleep(10, loop=loop)
+ yield From(asyncio.sleep(10, loop=loop))
base_exc = BaseException()
@asyncio.coroutine
def notmutch():
try:
- yield from sleeper()
+ yield From(sleeper())
except asyncio.CancelledError:
raise base_exc
task = asyncio.Task(notmutch(), loop=loop)
- test_utils.run_briefly(loop)
+ test_utils.run_briefly(loop, 2)
task.cancel()
self.assertFalse(task.done())
- self.assertRaises(BaseException, test_utils.run_briefly, loop)
+ self.assertRaises(BaseException, test_utils.run_briefly, loop, 2)
self.assertTrue(task.done())
self.assertFalse(task.cancelled())
@@ -1246,37 +1271,6 @@ class TaskTests(test_utils.TestCase):
yield
self.assertTrue(asyncio.iscoroutinefunction(fn2))
- def test_yield_vs_yield_from(self):
- fut = asyncio.Future(loop=self.loop)
-
- @asyncio.coroutine
- def wait_for_future():
- yield fut
-
- task = wait_for_future()
- with self.assertRaises(RuntimeError):
- self.loop.run_until_complete(task)
-
- self.assertFalse(fut.done())
-
- def test_yield_vs_yield_from_generator(self):
- @asyncio.coroutine
- def coro():
- yield
-
- @asyncio.coroutine
- def wait_for_future():
- gen = coro()
- try:
- yield gen
- finally:
- gen.close()
-
- task = wait_for_future()
- self.assertRaises(
- RuntimeError,
- self.loop.run_until_complete, task)
-
def test_coroutine_non_gen_function(self):
@asyncio.coroutine
def func():
@@ -1327,7 +1321,7 @@ class TaskTests(test_utils.TestCase):
@asyncio.coroutine
def coro1(loop):
self.assertTrue(asyncio.Task.current_task(loop=loop) is task1)
- yield from fut1
+ yield From(fut1)
self.assertTrue(asyncio.Task.current_task(loop=loop) is task1)
fut2.set_result(True)
@@ -1335,7 +1329,7 @@ class TaskTests(test_utils.TestCase):
def coro2(loop):
self.assertTrue(asyncio.Task.current_task(loop=loop) is task2)
fut1.set_result(True)
- yield from fut2
+ yield From(fut2)
self.assertTrue(asyncio.Task.current_task(loop=loop) is task2)
task1 = asyncio.Task(coro1(self.loop), loop=self.loop)
@@ -1350,54 +1344,50 @@ class TaskTests(test_utils.TestCase):
def test_yield_future_passes_cancel(self):
# Cancelling outer() cancels inner() cancels waiter.
- proof = 0
+ non_local = {'proof': 0}
waiter = asyncio.Future(loop=self.loop)
@asyncio.coroutine
def inner():
- nonlocal proof
try:
- yield from waiter
+ yield From(waiter)
except asyncio.CancelledError:
- proof += 1
+ non_local['proof'] += 1
raise
else:
self.fail('got past sleep() in inner()')
@asyncio.coroutine
def outer():
- nonlocal proof
try:
- yield from inner()
+ yield From(inner())
except asyncio.CancelledError:
- proof += 100 # Expect this path.
+ non_local['proof'] += 100 # Expect this path.
else:
- proof += 10
+ non_local['proof'] += 10
f = asyncio.async(outer(), loop=self.loop)
test_utils.run_briefly(self.loop)
f.cancel()
self.loop.run_until_complete(f)
- self.assertEqual(proof, 101)
+ self.assertEqual(non_local['proof'], 101)
self.assertTrue(waiter.cancelled())
def test_yield_wait_does_not_shield_cancel(self):
# Cancelling outer() makes wait() return early, leaves inner()
# running.
- proof = 0
+ non_local = {'proof': 0}
waiter = asyncio.Future(loop=self.loop)
@asyncio.coroutine
def inner():
- nonlocal proof
- yield from waiter
- proof += 1
+ yield From(waiter)
+ non_local['proof'] += 1
@asyncio.coroutine
def outer():
- nonlocal proof
- d, p = yield from asyncio.wait([inner()], loop=self.loop)
- proof += 100
+ d, p = yield From(asyncio.wait([inner()], loop=self.loop))
+ non_local['proof'] += 100
f = asyncio.async(outer(), loop=self.loop)
test_utils.run_briefly(self.loop)
@@ -1406,7 +1396,7 @@ class TaskTests(test_utils.TestCase):
asyncio.CancelledError, self.loop.run_until_complete, f)
waiter.set_result(None)
test_utils.run_briefly(self.loop)
- self.assertEqual(proof, 1)
+ self.assertEqual(non_local['proof'], 1)
def test_shield_result(self):
inner = asyncio.Future(loop=self.loop)
@@ -1440,20 +1430,18 @@ class TaskTests(test_utils.TestCase):
def test_shield_effect(self):
# Cancelling outer() does not affect inner().
- proof = 0
+ non_local = {'proof': 0}
waiter = asyncio.Future(loop=self.loop)
@asyncio.coroutine
def inner():
- nonlocal proof
- yield from waiter
- proof += 1
+ yield From(waiter)
+ non_local['proof'] += 1
@asyncio.coroutine
def outer():
- nonlocal proof
- yield from asyncio.shield(inner(), loop=self.loop)
- proof += 100
+ yield From(asyncio.shield(inner(), loop=self.loop))
+ non_local['proof'] += 100
f = asyncio.async(outer(), loop=self.loop)
test_utils.run_briefly(self.loop)
@@ -1462,7 +1450,7 @@ class TaskTests(test_utils.TestCase):
self.loop.run_until_complete(f)
waiter.set_result(None)
test_utils.run_briefly(self.loop)
- self.assertEqual(proof, 1)
+ self.assertEqual(non_local['proof'], 1)
def test_shield_gather(self):
child1 = asyncio.Future(loop=self.loop)
@@ -1531,7 +1519,7 @@ class TaskTests(test_utils.TestCase):
def coro():
# The actual coroutine.
self.assertTrue(gen.gi_running)
- yield from fut
+ yield From(fut)
# A completed Future used to run the coroutine.
fut = asyncio.Future(loop=self.loop)
@@ -1573,13 +1561,15 @@ class TaskTests(test_utils.TestCase):
try:
@asyncio.coroutine
def t1():
- return (yield from t2())
+ res = yield From(t2())
+ raise Return(res)
@asyncio.coroutine
def t2():
f = asyncio.Future(loop=self.loop)
asyncio.Task(t3(f), loop=self.loop)
- return (yield from f)
+ res = yield From(f)
+ raise Return(res)
@asyncio.coroutine
def t3(f):
@@ -1594,7 +1584,7 @@ class TaskTests(test_utils.TestCase):
def test_yield_from_corowrapper_send(self):
def foo():
a = yield
- return a
+ raise Return(a)
def call(arg):
cw = asyncio.coroutines.CoroWrapper(foo(), foo)
@@ -1602,7 +1592,8 @@ class TaskTests(test_utils.TestCase):
try:
cw.send(arg)
except StopIteration as ex:
- return ex.args[0]
+ ex.raised = True
+ return ex.value
else:
raise AssertionError('StopIteration was expected')
@@ -1611,18 +1602,19 @@ class TaskTests(test_utils.TestCase):
def test_corowrapper_weakref(self):
wd = weakref.WeakValueDictionary()
- def foo(): yield from []
+ def foo():
+ yield From(None)
cw = asyncio.coroutines.CoroWrapper(foo(), foo)
wd['cw'] = cw # Would fail without __weakref__ slot.
cw.gen = None # Suppress warning from __del__.
- @unittest.skipUnless(PY34,
- 'need python 3.4 or later')
+ @test_utils.skipUnless(PY34,
+ 'need python 3.4 or later')
def test_log_destroyed_pending_task(self):
@asyncio.coroutine
def kill_me(loop):
future = asyncio.Future(loop=loop)
- yield from future
+ yield From(future)
# at this point, the only reference to kill_me() task is
# the Task._wakeup() method in future._callbacks
raise Exception("code never reached")
@@ -1634,7 +1626,7 @@ class TaskTests(test_utils.TestCase):
# schedule the task
coro = kill_me(self.loop)
task = asyncio.async(coro, loop=self.loop)
- self.assertEqual(asyncio.Task.all_tasks(loop=self.loop), {task})
+ self.assertEqual(asyncio.Task.all_tasks(loop=self.loop), set((task,)))
# execute the task so it waits for future
self.loop._run_once()
@@ -1658,7 +1650,7 @@ class TaskTests(test_utils.TestCase):
})
mock_handler.reset_mock()
- @mock.patch('asyncio.coroutines.logger')
+ @mock.patch('trollius.coroutines.logger')
def test_coroutine_never_yielded(self, m_log):
debug = asyncio.coroutines._DEBUG
try:
@@ -1669,7 +1661,7 @@ class TaskTests(test_utils.TestCase):
finally:
asyncio.coroutines._DEBUG = debug
- tb_filename = __file__
+ tb_filename = sys._getframe().f_code.co_filename
tb_lineno = sys._getframe().f_lineno + 2
# create a coroutine object but don't use it
coro_noop()
@@ -1678,12 +1670,13 @@ class TaskTests(test_utils.TestCase):
self.assertTrue(m_log.error.called)
message = m_log.error.call_args[0][0]
func_filename, func_lineno = test_utils.get_function_source(coro_noop)
+ coro_name = getattr(coro_noop, '__qualname__', coro_noop.__name__)
regex = (r'^<CoroWrapper %s\(\) .* at %s:%s, .*> was never yielded from\n'
r'Coroutine object created at \(most recent call last\):\n'
r'.*\n'
r' File "%s", line %s, in test_coroutine_never_yielded\n'
r' coro_noop\(\)$'
- % (re.escape(coro_noop.__qualname__),
+ % (re.escape(coro_name),
re.escape(func_filename), func_lineno,
re.escape(tb_filename), tb_lineno))
@@ -1693,14 +1686,23 @@ class TaskTests(test_utils.TestCase):
self.loop.set_debug(True)
task = asyncio.Task(coroutine_function(), loop=self.loop)
- lineno = sys._getframe().f_lineno - 1
- self.assertIsInstance(task._source_traceback, list)
- self.assertEqual(task._source_traceback[-1][:3],
- (__file__,
- lineno,
- 'test_task_source_traceback'))
+ self.check_soure_traceback(task._source_traceback, -1)
self.loop.run_until_complete(task)
+ def test_coroutine_class(self):
+ # Trollius issue #9
+ self.loop.set_debug(True)
+
+ class MyClass(object):
+ def __call__(self):
+ return 7
+
+ obj = MyClass()
+ coro_func = asyncio.coroutine(obj)
+ coro_obj = coro_func()
+ res = self.loop.run_until_complete(coro_obj)
+ self.assertEqual(res, 7)
+
class GatherTestsBase:
@@ -1776,30 +1778,19 @@ class GatherTestsBase:
aio_path = os.path.dirname(os.path.dirname(asyncio.__file__))
code = '\n'.join((
- 'import asyncio.coroutines',
- 'print(asyncio.coroutines._DEBUG)'))
-
- # Test with -E to not fail if the unit test was run with
- # PYTHONASYNCIODEBUG set to a non-empty string
- sts, stdout, stderr = assert_python_ok('-E', '-c', code,
- PYTHONPATH=aio_path)
- self.assertEqual(stdout.rstrip(), b'False')
+ 'import trollius.coroutines',
+ 'print(trollius.coroutines._DEBUG)'))
sts, stdout, stderr = assert_python_ok('-c', code,
- PYTHONASYNCIODEBUG='',
+ TROLLIUSDEBUG='',
PYTHONPATH=aio_path)
self.assertEqual(stdout.rstrip(), b'False')
sts, stdout, stderr = assert_python_ok('-c', code,
- PYTHONASYNCIODEBUG='1',
+ TROLLIUSDEBUG='1',
PYTHONPATH=aio_path)
self.assertEqual(stdout.rstrip(), b'True')
- sts, stdout, stderr = assert_python_ok('-E', '-c', code,
- PYTHONASYNCIODEBUG='1',
- PYTHONPATH=aio_path)
- self.assertEqual(stdout.rstrip(), b'False')
-
class FutureGatherTests(GatherTestsBase, test_utils.TestCase):
@@ -1888,7 +1879,7 @@ class FutureGatherTests(GatherTestsBase, test_utils.TestCase):
class CoroutineGatherTests(GatherTestsBase, test_utils.TestCase):
def setUp(self):
- super().setUp()
+ super(CoroutineGatherTests, self).setUp()
asyncio.set_event_loop(self.one_loop)
def wrap_futures(self, *futures):
@@ -1896,7 +1887,8 @@ class CoroutineGatherTests(GatherTestsBase, test_utils.TestCase):
for fut in futures:
@asyncio.coroutine
def coro(fut=fut):
- return (yield from fut)
+ result = (yield From(fut))
+ raise Return(result)
coros.append(coro())
return coros
@@ -1928,44 +1920,42 @@ class CoroutineGatherTests(GatherTestsBase, test_utils.TestCase):
def test_cancellation_broadcast(self):
# Cancelling outer() cancels all children.
- proof = 0
+ non_local = {'proof': 0}
waiter = asyncio.Future(loop=self.one_loop)
@asyncio.coroutine
def inner():
- nonlocal proof
- yield from waiter
- proof += 1
+ yield From(waiter)
+ non_local['proof'] += 1
child1 = asyncio.async(inner(), loop=self.one_loop)
child2 = asyncio.async(inner(), loop=self.one_loop)
- gatherer = None
+ non_local['gatherer'] = None
@asyncio.coroutine
def outer():
- nonlocal proof, gatherer
- gatherer = asyncio.gather(child1, child2, loop=self.one_loop)
- yield from gatherer
- proof += 100
+ non_local['gatherer'] = asyncio.gather(child1, child2, loop=self.one_loop)
+ yield From(non_local['gatherer'])
+ non_local['proof'] += 100
f = asyncio.async(outer(), loop=self.one_loop)
test_utils.run_briefly(self.one_loop)
self.assertTrue(f.cancel())
with self.assertRaises(asyncio.CancelledError):
self.one_loop.run_until_complete(f)
- self.assertFalse(gatherer.cancel())
+ self.assertFalse(non_local['gatherer'].cancel())
self.assertTrue(waiter.cancelled())
self.assertTrue(child1.cancelled())
self.assertTrue(child2.cancelled())
test_utils.run_briefly(self.one_loop)
- self.assertEqual(proof, 0)
+ self.assertEqual(non_local['proof'], 0)
def test_exception_marking(self):
# Test for the first line marked "Mark exception retrieved."
@asyncio.coroutine
def inner(f):
- yield from f
+ yield From(f)
raise RuntimeError('should not be ignored')
a = asyncio.Future(loop=self.one_loop)
@@ -1973,7 +1963,7 @@ class CoroutineGatherTests(GatherTestsBase, test_utils.TestCase):
@asyncio.coroutine
def outer():
- yield from asyncio.gather(inner(a), inner(b), loop=self.one_loop)
+ yield From(asyncio.gather(inner(a), inner(b), loop=self.one_loop))
f = asyncio.async(outer(), loop=self.one_loop)
test_utils.run_briefly(self.one_loop)
diff --git a/tests/test_transports.py b/tests/test_transports.py
index 3b6e3d6..42f7729 100644
--- a/tests/test_transports.py
+++ b/tests/test_transports.py
@@ -1,13 +1,20 @@
"""Tests for transports.py."""
import unittest
-from unittest import mock
-import asyncio
-from asyncio import transports
+import trollius as asyncio
+from trollius import test_utils
+from trollius import transports
+from trollius.test_utils import mock
+try:
+ memoryview
+except NameError:
+ # Python 2.6
+ memoryview = buffer
-class TransportTests(unittest.TestCase):
+
+class TransportTests(test_utils.TestCase):
def test_ctor_extra_is_none(self):
transport = asyncio.Transport()
diff --git a/tests/test_unix_events.py b/tests/test_unix_events.py
index b6ad018..d6ec243 100644
--- a/tests/test_unix_events.py
+++ b/tests/test_unix_events.py
@@ -1,11 +1,12 @@
"""Tests for unix_events.py."""
import collections
-import gc
+import contextlib
+#import gc
import errno
import io
import os
-import pprint
+#import pprint
import signal
import socket
import stat
@@ -13,22 +14,23 @@ import sys
import tempfile
import threading
import unittest
-from unittest import mock
if sys.platform == 'win32':
raise unittest.SkipTest('UNIX only')
-import asyncio
-from asyncio import log
-from asyncio import test_utils
-from asyncio import unix_events
+import trollius as asyncio
+from trollius import log
+from trollius import test_utils
+from trollius import unix_events
+from trollius.py33_exceptions import BlockingIOError, ChildProcessError
+from trollius.test_utils import mock
MOCK_ANY = mock.ANY
-@unittest.skipUnless(signal, 'Signals are not supported')
+@test_utils.skipUnless(signal, 'Signals are not supported')
class SelectorEventLoopSignalTests(test_utils.TestCase):
def setUp(self):
@@ -53,7 +55,7 @@ class SelectorEventLoopSignalTests(test_utils.TestCase):
self.loop._handle_signal(signal.NSIG + 1)
self.loop.remove_signal_handler.assert_called_with(signal.NSIG + 1)
- @mock.patch('asyncio.unix_events.signal')
+ @mock.patch('trollius.unix_events.signal')
def test_add_signal_handler_setup_error(self, m_signal):
m_signal.NSIG = signal.NSIG
m_signal.set_wakeup_fd.side_effect = ValueError
@@ -63,13 +65,13 @@ class SelectorEventLoopSignalTests(test_utils.TestCase):
self.loop.add_signal_handler,
signal.SIGINT, lambda: True)
- @mock.patch('asyncio.unix_events.signal')
+ @mock.patch('trollius.unix_events.signal')
def test_add_signal_handler_coroutine_error(self, m_signal):
m_signal.NSIG = signal.NSIG
@asyncio.coroutine
def simple_coroutine():
- yield from []
+ yield None
# callback must not be a coroutine function
coro_func = simple_coroutine
@@ -81,7 +83,7 @@ class SelectorEventLoopSignalTests(test_utils.TestCase):
self.loop.add_signal_handler,
signal.SIGINT, func)
- @mock.patch('asyncio.unix_events.signal')
+ @mock.patch('trollius.unix_events.signal')
def test_add_signal_handler(self, m_signal):
m_signal.NSIG = signal.NSIG
@@ -91,7 +93,7 @@ class SelectorEventLoopSignalTests(test_utils.TestCase):
self.assertIsInstance(h, asyncio.Handle)
self.assertEqual(h._callback, cb)
- @mock.patch('asyncio.unix_events.signal')
+ @mock.patch('trollius.unix_events.signal')
def test_add_signal_handler_install_error(self, m_signal):
m_signal.NSIG = signal.NSIG
@@ -109,8 +111,8 @@ class SelectorEventLoopSignalTests(test_utils.TestCase):
self.loop.add_signal_handler,
signal.SIGINT, lambda: True)
- @mock.patch('asyncio.unix_events.signal')
- @mock.patch('asyncio.base_events.logger')
+ @mock.patch('trollius.unix_events.signal')
+ @mock.patch('trollius.base_events.logger')
def test_add_signal_handler_install_error2(self, m_logging, m_signal):
m_signal.NSIG = signal.NSIG
@@ -126,8 +128,8 @@ class SelectorEventLoopSignalTests(test_utils.TestCase):
self.assertFalse(m_logging.info.called)
self.assertEqual(1, m_signal.set_wakeup_fd.call_count)
- @mock.patch('asyncio.unix_events.signal')
- @mock.patch('asyncio.base_events.logger')
+ @mock.patch('trollius.unix_events.signal')
+ @mock.patch('trollius.base_events.logger')
def test_add_signal_handler_install_error3(self, m_logging, m_signal):
class Err(OSError):
errno = errno.EINVAL
@@ -141,7 +143,7 @@ class SelectorEventLoopSignalTests(test_utils.TestCase):
self.assertFalse(m_logging.info.called)
self.assertEqual(2, m_signal.set_wakeup_fd.call_count)
- @mock.patch('asyncio.unix_events.signal')
+ @mock.patch('trollius.unix_events.signal')
def test_remove_signal_handler(self, m_signal):
m_signal.NSIG = signal.NSIG
@@ -154,7 +156,7 @@ class SelectorEventLoopSignalTests(test_utils.TestCase):
self.assertEqual(
(signal.SIGHUP, m_signal.SIG_DFL), m_signal.signal.call_args[0])
- @mock.patch('asyncio.unix_events.signal')
+ @mock.patch('trollius.unix_events.signal')
def test_remove_signal_handler_2(self, m_signal):
m_signal.NSIG = signal.NSIG
m_signal.SIGINT = signal.SIGINT
@@ -171,8 +173,8 @@ class SelectorEventLoopSignalTests(test_utils.TestCase):
(signal.SIGINT, m_signal.default_int_handler),
m_signal.signal.call_args[0])
- @mock.patch('asyncio.unix_events.signal')
- @mock.patch('asyncio.base_events.logger')
+ @mock.patch('trollius.unix_events.signal')
+ @mock.patch('trollius.base_events.logger')
def test_remove_signal_handler_cleanup_error(self, m_logging, m_signal):
m_signal.NSIG = signal.NSIG
self.loop.add_signal_handler(signal.SIGHUP, lambda: True)
@@ -182,7 +184,7 @@ class SelectorEventLoopSignalTests(test_utils.TestCase):
self.loop.remove_signal_handler(signal.SIGHUP)
self.assertTrue(m_logging.info)
- @mock.patch('asyncio.unix_events.signal')
+ @mock.patch('trollius.unix_events.signal')
def test_remove_signal_handler_error(self, m_signal):
m_signal.NSIG = signal.NSIG
self.loop.add_signal_handler(signal.SIGHUP, lambda: True)
@@ -192,7 +194,7 @@ class SelectorEventLoopSignalTests(test_utils.TestCase):
self.assertRaises(
OSError, self.loop.remove_signal_handler, signal.SIGHUP)
- @mock.patch('asyncio.unix_events.signal')
+ @mock.patch('trollius.unix_events.signal')
def test_remove_signal_handler_error2(self, m_signal):
m_signal.NSIG = signal.NSIG
self.loop.add_signal_handler(signal.SIGHUP, lambda: True)
@@ -204,7 +206,7 @@ class SelectorEventLoopSignalTests(test_utils.TestCase):
self.assertRaises(
RuntimeError, self.loop.remove_signal_handler, signal.SIGHUP)
- @mock.patch('asyncio.unix_events.signal')
+ @mock.patch('trollius.unix_events.signal')
def test_close(self, m_signal):
m_signal.NSIG = signal.NSIG
@@ -221,8 +223,8 @@ class SelectorEventLoopSignalTests(test_utils.TestCase):
m_signal.set_wakeup_fd.assert_called_once_with(-1)
-@unittest.skipUnless(hasattr(socket, 'AF_UNIX'),
- 'UNIX Sockets are not supported')
+@test_utils.skipUnless(hasattr(socket, 'AF_UNIX'),
+ 'UNIX Sockets are not supported')
class SelectorEventLoopUnixSocketTests(test_utils.TestCase):
def setUp(self):
@@ -233,7 +235,7 @@ class SelectorEventLoopUnixSocketTests(test_utils.TestCase):
with test_utils.unix_socket_path() as path:
sock = socket.socket(socket.AF_UNIX)
sock.bind(path)
- with sock:
+ with contextlib.closing(sock):
coro = self.loop.create_unix_server(lambda: None, path)
with self.assertRaisesRegex(OSError,
'Address.*is already in use'):
@@ -261,18 +263,19 @@ class SelectorEventLoopUnixSocketTests(test_utils.TestCase):
def test_create_unix_server_path_inetsock(self):
sock = socket.socket()
- with sock:
+ with contextlib.closing(sock):
coro = self.loop.create_unix_server(lambda: None, path=None,
sock=sock)
with self.assertRaisesRegex(ValueError,
'A UNIX Domain Socket was expected'):
self.loop.run_until_complete(coro)
- @mock.patch('asyncio.unix_events.socket')
+ @mock.patch('trollius.unix_events.socket')
def test_create_unix_server_bind_error(self, m_socket):
# Ensure that the socket is closed on any bind error
sock = mock.Mock()
m_socket.socket.return_value = sock
+ m_socket.error = socket.error
sock.bind.side_effect = OSError
coro = self.loop.create_unix_server(lambda: None, path="/test")
@@ -324,7 +327,7 @@ class UnixReadPipeTransportTests(test_utils.TestCase):
self.pipe = mock.Mock(spec_set=io.RawIOBase)
self.pipe.fileno.return_value = 5
- blocking_patcher = mock.patch('asyncio.unix_events._set_nonblocking')
+ blocking_patcher = mock.patch('trollius.unix_events._set_nonblocking')
blocking_patcher.start()
self.addCleanup(blocking_patcher.stop)
@@ -383,7 +386,7 @@ class UnixReadPipeTransportTests(test_utils.TestCase):
test_utils.run_briefly(self.loop)
self.assertFalse(self.protocol.data_received.called)
- @mock.patch('asyncio.log.logger.error')
+ @mock.patch('trollius.log.logger.error')
@mock.patch('os.read')
def test__read_ready_error(self, m_read, m_logexc):
tr = unix_events._UnixReadPipeTransport(
@@ -487,7 +490,7 @@ class UnixWritePipeTransportTests(test_utils.TestCase):
self.pipe = mock.Mock(spec_set=io.RawIOBase)
self.pipe.fileno.return_value = 5
- blocking_patcher = mock.patch('asyncio.unix_events._set_nonblocking')
+ blocking_patcher = mock.patch('trollius.unix_events._set_nonblocking')
blocking_patcher.start()
self.addCleanup(blocking_patcher.stop)
@@ -573,7 +576,7 @@ class UnixWritePipeTransportTests(test_utils.TestCase):
self.loop.assert_writer(5, tr._write_ready)
self.assertEqual([b'data'], tr._buffer)
- @mock.patch('asyncio.unix_events.logger')
+ @mock.patch('trollius.unix_events.logger')
@mock.patch('os.write')
def test_write_err(self, m_write, m_log):
tr = unix_events._UnixWritePipeTransport(
@@ -673,7 +676,7 @@ class UnixWritePipeTransportTests(test_utils.TestCase):
self.loop.assert_writer(5, tr._write_ready)
self.assertEqual([b'data'], tr._buffer)
- @mock.patch('asyncio.log.logger.error')
+ @mock.patch('trollius.log.logger.error')
@mock.patch('os.write')
def test__write_ready_err(self, m_write, m_logexc):
tr = unix_events._UnixWritePipeTransport(
@@ -795,7 +798,7 @@ class UnixWritePipeTransportTests(test_utils.TestCase):
self.assertFalse(self.protocol.connection_lost.called)
-class AbstractChildWatcherTests(unittest.TestCase):
+class AbstractChildWatcherTests(test_utils.TestCase):
def test_not_implemented(self):
f = mock.Mock()
@@ -814,7 +817,7 @@ class AbstractChildWatcherTests(unittest.TestCase):
NotImplementedError, watcher.__exit__, f, f, f)
-class BaseChildWatcherTests(unittest.TestCase):
+class BaseChildWatcherTests(test_utils.TestCase):
def test_not_implemented(self):
f = mock.Mock()
@@ -884,19 +887,27 @@ class ChildWatcherTestsMixin:
def waitpid_mocks(func):
def wrapped_func(self):
+ exit_stack = []
+
def patch(target, wrapper):
- return mock.patch(target, wraps=wrapper,
- new_callable=mock.Mock)
-
- with patch('os.WTERMSIG', self.WTERMSIG) as m_WTERMSIG, \
- patch('os.WEXITSTATUS', self.WEXITSTATUS) as m_WEXITSTATUS, \
- patch('os.WIFSIGNALED', self.WIFSIGNALED) as m_WIFSIGNALED, \
- patch('os.WIFEXITED', self.WIFEXITED) as m_WIFEXITED, \
- patch('os.waitpid', self.waitpid) as m_waitpid:
+ m = mock.patch(target, wraps=wrapper)
+ exit_stack.append(m)
+ return m.__enter__()
+
+ m_waitpid = patch('os.waitpid', self.waitpid)
+ m_WIFEXITED = patch('os.WIFEXITED', self.WIFEXITED)
+ m_WIFSIGNALED = patch('os.WIFSIGNALED', self.WIFSIGNALED)
+ m_WEXITSTATUS = patch('os.WEXITSTATUS', self.WEXITSTATUS)
+ m_WTERMSIG = patch('os.WTERMSIG', self.WTERMSIG)
+ try:
func(self, WaitPidMocks(m_waitpid,
m_WIFEXITED, m_WIFSIGNALED,
m_WEXITSTATUS, m_WTERMSIG,
))
+ finally:
+ for obj in reversed(exit_stack):
+ obj.__exit__(None, None, None)
+
return wrapped_func
@waitpid_mocks
@@ -1369,17 +1380,18 @@ class ChildWatcherTestsMixin:
callback1 = mock.Mock()
callback2 = mock.Mock()
- with self.ignore_warnings, self.watcher:
- self.running = True
- # child 1 terminates
- self.add_zombie(591, 7)
- # an unknown child terminates
- self.add_zombie(593, 17)
+ with self.ignore_warnings:
+ with self.watcher:
+ self.running = True
+ # child 1 terminates
+ self.add_zombie(591, 7)
+ # an unknown child terminates
+ self.add_zombie(593, 17)
- self.watcher._sig_chld()
+ self.watcher._sig_chld()
- self.watcher.add_child_handler(591, callback1)
- self.watcher.add_child_handler(592, callback2)
+ self.watcher.add_child_handler(591, callback1)
+ self.watcher.add_child_handler(592, callback2)
callback1.assert_called_once_with(591, 7)
self.assertFalse(callback2.called)
@@ -1398,15 +1410,15 @@ class ChildWatcherTestsMixin:
self.loop = self.new_test_loop()
patch = mock.patch.object
- with patch(old_loop, "remove_signal_handler") as m_old_remove, \
- patch(self.loop, "add_signal_handler") as m_new_add:
+ with patch(old_loop, "remove_signal_handler") as m_old_remove:
+ with patch(self.loop, "add_signal_handler") as m_new_add:
- self.watcher.attach_loop(self.loop)
+ self.watcher.attach_loop(self.loop)
- m_old_remove.assert_called_once_with(
- signal.SIGCHLD)
- m_new_add.assert_called_once_with(
- signal.SIGCHLD, self.watcher._sig_chld)
+ m_old_remove.assert_called_once_with(
+ signal.SIGCHLD)
+ m_new_add.assert_called_once_with(
+ signal.SIGCHLD, self.watcher._sig_chld)
# child terminates
self.running = False
@@ -1518,7 +1530,7 @@ class FastChildWatcherTests (ChildWatcherTestsMixin, test_utils.TestCase):
return asyncio.FastChildWatcher()
-class PolicyTests(unittest.TestCase):
+class PolicyTests(test_utils.TestCase):
def create_policy(self):
return asyncio.DefaultEventLoopPolicy()
diff --git a/tests/test_windows_events.py b/tests/test_windows_events.py
index b4d9398..c4c562b 100644
--- a/tests/test_windows_events.py
+++ b/tests/test_windows_events.py
@@ -1,16 +1,17 @@
+from trollius import test_utils
import os
import sys
import unittest
if sys.platform != 'win32':
- raise unittest.SkipTest('Windows only')
+ raise test_utils.SkipTest('Windows only')
-import _winapi
-
-import asyncio
-from asyncio import _overlapped
-from asyncio import test_utils
-from asyncio import windows_events
+import trollius as asyncio
+from trollius import Return, From
+from trollius import _overlapped
+from trollius import py33_winapi as _winapi
+from trollius import windows_events
+from trollius.py33_exceptions import PermissionError, FileNotFoundError
class UpperProto(asyncio.Protocol):
@@ -57,38 +58,38 @@ class ProactorTests(test_utils.TestCase):
ADDRESS = r'\\.\pipe\_test_pipe-%s' % os.getpid()
with self.assertRaises(FileNotFoundError):
- yield from self.loop.create_pipe_connection(
- asyncio.Protocol, ADDRESS)
+ yield From(self.loop.create_pipe_connection(
+ asyncio.Protocol, ADDRESS))
- [server] = yield from self.loop.start_serving_pipe(
- UpperProto, ADDRESS)
+ [server] = yield From(self.loop.start_serving_pipe(
+ UpperProto, ADDRESS))
self.assertIsInstance(server, windows_events.PipeServer)
clients = []
for i in range(5):
stream_reader = asyncio.StreamReader(loop=self.loop)
protocol = asyncio.StreamReaderProtocol(stream_reader)
- trans, proto = yield from self.loop.create_pipe_connection(
- lambda: protocol, ADDRESS)
+ trans, proto = yield From(self.loop.create_pipe_connection(
+ lambda: protocol, ADDRESS))
self.assertIsInstance(trans, asyncio.Transport)
self.assertEqual(protocol, proto)
clients.append((stream_reader, trans))
for i, (r, w) in enumerate(clients):
- w.write('lower-{}\n'.format(i).encode())
+ w.write('lower-{0}\n'.format(i).encode())
for i, (r, w) in enumerate(clients):
- response = yield from r.readline()
- self.assertEqual(response, 'LOWER-{}\n'.format(i).encode())
+ response = yield From(r.readline())
+ self.assertEqual(response, 'LOWER-{0}\n'.format(i).encode())
w.close()
server.close()
with self.assertRaises(FileNotFoundError):
- yield from self.loop.create_pipe_connection(
- asyncio.Protocol, ADDRESS)
+ yield From(self.loop.create_pipe_connection(
+ asyncio.Protocol, ADDRESS))
- return 'done'
+ raise Return('done')
def test_wait_for_handle(self):
event = _overlapped.CreateEvent(None, True, False, None)
diff --git a/tests/test_windows_utils.py b/tests/test_windows_utils.py
index b957949..f40f863 100644
--- a/tests/test_windows_utils.py
+++ b/tests/test_windows_utils.py
@@ -3,20 +3,21 @@
import socket
import sys
import unittest
-from unittest import mock
+if sys.platform != 'win32':
+ from trollius.test_utils import SkipTest
+ raise SkipTest('Windows only')
+
+from trollius import _overlapped
+from trollius import py33_winapi as _winapi
+from trollius import test_utils
+from trollius import windows_utils
+from trollius.test_support import IPV6_ENABLED
+from trollius.test_utils import mock
try:
from test import support # gc_collect, IPV6_ENABLED
except ImportError:
- from asyncio import test_support as support
-
-if sys.platform != 'win32':
- raise unittest.SkipTest('Windows only')
-
-import _winapi
-
-from asyncio import windows_utils
-from asyncio import _overlapped
+ from trollius import test_support as support
class WinsocketpairTests(unittest.TestCase):
@@ -31,14 +32,15 @@ class WinsocketpairTests(unittest.TestCase):
ssock, csock = windows_utils.socketpair()
self.check_winsocketpair(ssock, csock)
- @unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 not supported or enabled')
+ @test_utils.skipUnless(support.IPV6_ENABLED,
+ 'IPv6 not supported or enabled')
def test_winsocketpair_ipv6(self):
ssock, csock = windows_utils.socketpair(family=socket.AF_INET6)
self.check_winsocketpair(ssock, csock)
- @unittest.skipIf(hasattr(socket, 'socketpair'),
- 'socket.socketpair is available')
- @mock.patch('asyncio.windows_utils.socket')
+ @test_utils.skipIf(hasattr(socket, 'socketpair'),
+ 'socket.socketpair is available')
+ @mock.patch('trollius.windows_utils.socket')
def test_winsocketpair_exc(self, m_socket):
m_socket.AF_INET = socket.AF_INET
m_socket.SOCK_STREAM = socket.SOCK_STREAM
@@ -56,9 +58,9 @@ class WinsocketpairTests(unittest.TestCase):
self.assertRaises(ValueError,
windows_utils.socketpair, proto=1)
- @unittest.skipIf(hasattr(socket, 'socketpair'),
- 'socket.socketpair is available')
- @mock.patch('asyncio.windows_utils.socket')
+ @test_utils.skipIf(hasattr(socket, 'socketpair'),
+ 'socket.socketpair is available')
+ @mock.patch('trollius.windows_utils.socket')
def test_winsocketpair_close(self, m_socket):
m_socket.AF_INET = socket.AF_INET
m_socket.SOCK_STREAM = socket.SOCK_STREAM
@@ -84,7 +86,7 @@ class PipeTests(unittest.TestCase):
ERROR_IO_INCOMPLETE = 996
try:
ov1.getresult()
- except OSError as e:
+ except WindowsError as e:
self.assertEqual(e.winerror, ERROR_IO_INCOMPLETE)
else:
raise RuntimeError('expected ERROR_IO_INCOMPLETE')
@@ -94,15 +96,15 @@ class PipeTests(unittest.TestCase):
self.assertEqual(ov2.error, 0)
ov2.WriteFile(h2, b"hello")
- self.assertIn(ov2.error, {0, _winapi.ERROR_IO_PENDING})
+ self.assertIn(ov2.error, set((0, _winapi.ERROR_IO_PENDING)))
- res = _winapi.WaitForMultipleObjects([ov2.event], False, 100)
+ res = _winapi.WaitForSingleObject(ov2.event, 100)
self.assertEqual(res, _winapi.WAIT_OBJECT_0)
self.assertFalse(ov1.pending)
self.assertEqual(ov1.error, ERROR_IO_INCOMPLETE)
self.assertFalse(ov2.pending)
- self.assertIn(ov2.error, {0, _winapi.ERROR_IO_PENDING})
+ self.assertIn(ov2.error, set((0, _winapi.ERROR_IO_PENDING)))
self.assertEqual(ov1.getresult(), b"hello")
finally:
_winapi.CloseHandle(h1)