| 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
 | import asyncio
import unittest
from unittest import mock
from . import utils as test_utils
class TestPolicy(asyncio.AbstractEventLoopPolicy):
    def __init__(self, loop_factory):
        self.loop_factory = loop_factory
        self.loop = None
    def get_event_loop(self):
        # shouldn't ever be called by asyncio.run()
        raise RuntimeError
    def new_event_loop(self):
        return self.loop_factory()
    def set_event_loop(self, loop):
        if loop is not None:
            # we want to check if the loop is closed
            # in BaseTest.tearDown
            self.loop = loop
class BaseTest(unittest.TestCase):
    def new_loop(self):
        loop = asyncio.BaseEventLoop()
        loop._process_events = mock.Mock()
        loop._selector = mock.Mock()
        loop._selector.select.return_value = ()
        loop.shutdown_ag_run = False
        async def shutdown_asyncgens():
            loop.shutdown_ag_run = True
        loop.shutdown_asyncgens = shutdown_asyncgens
        return loop
    def setUp(self):
        super().setUp()
        policy = TestPolicy(self.new_loop)
        asyncio.set_event_loop_policy(policy)
    def tearDown(self):
        policy = asyncio.get_event_loop_policy()
        if policy.loop is not None:
            self.assertTrue(policy.loop.is_closed())
            self.assertTrue(policy.loop.shutdown_ag_run)
        asyncio.set_event_loop_policy(None)
        super().tearDown()
class RunTests(BaseTest):
    def test_asyncio_run_return(self):
        async def main():
            await asyncio.sleep(0)
            return 42
        self.assertEqual(asyncio.run(main()), 42)
    def test_asyncio_run_raises(self):
        async def main():
            await asyncio.sleep(0)
            raise ValueError('spam')
        with self.assertRaisesRegex(ValueError, 'spam'):
            asyncio.run(main())
    def test_asyncio_run_only_coro(self):
        for o in {1, lambda: None}:
            with self.subTest(obj=o), \
                    self.assertRaisesRegex(ValueError,
                                           'a coroutine was expected'):
                asyncio.run(o)
    def test_asyncio_run_debug(self):
        async def main(expected):
            loop = asyncio.get_event_loop()
            self.assertIs(loop.get_debug(), expected)
        asyncio.run(main(False))
        asyncio.run(main(True), debug=True)
    def test_asyncio_run_from_running_loop(self):
        async def main():
            coro = main()
            try:
                asyncio.run(coro)
            finally:
                coro.close()  # Suppress ResourceWarning
        with self.assertRaisesRegex(RuntimeError,
                                    'cannot be called from a running'):
            asyncio.run(main())
    def test_asyncio_run_cancels_hanging_tasks(self):
        lo_task = None
        async def leftover():
            await asyncio.sleep(0.1)
        async def main():
            nonlocal lo_task
            lo_task = asyncio.create_task(leftover())
            return 123
        self.assertEqual(asyncio.run(main()), 123)
        self.assertTrue(lo_task.done())
    def test_asyncio_run_reports_hanging_tasks_errors(self):
        lo_task = None
        call_exc_handler_mock = mock.Mock()
        async def leftover():
            try:
                await asyncio.sleep(0.1)
            except asyncio.CancelledError:
                1 / 0
        async def main():
            loop = asyncio.get_running_loop()
            loop.call_exception_handler = call_exc_handler_mock
            nonlocal lo_task
            lo_task = asyncio.create_task(leftover())
            return 123
        self.assertEqual(asyncio.run(main()), 123)
        self.assertTrue(lo_task.done())
        call_exc_handler_mock.assert_called_with({
            'message': test_utils.MockPattern(r'asyncio.run.*shutdown'),
            'task': lo_task,
            'exception': test_utils.MockInstanceOf(ZeroDivisionError)
        })
    def test_asyncio_run_closes_gens_after_hanging_tasks_errors(self):
        spinner = None
        lazyboy = None
        class FancyExit(Exception):
            pass
        async def fidget():
            while True:
                yield 1
                await asyncio.sleep(1)
        async def spin():
            nonlocal spinner
            spinner = fidget()
            try:
                async for the_meaning_of_life in spinner:  # NoQA
                    pass
            except asyncio.CancelledError:
                1 / 0
        async def main():
            loop = asyncio.get_running_loop()
            loop.call_exception_handler = mock.Mock()
            nonlocal lazyboy
            lazyboy = asyncio.create_task(spin())
            raise FancyExit
        with self.assertRaises(FancyExit):
            asyncio.run(main())
        self.assertTrue(lazyboy.done())
        self.assertIsNone(spinner.ag_frame)
        self.assertFalse(spinner.ag_running)
 |