diff options
Diffstat (limited to 'tests')
-rw-r--r-- | tests/test_proactor_events.py | 6 | ||||
-rw-r--r-- | tests/test_selector_events.py | 4 | ||||
-rw-r--r-- | tests/test_streams.py | 17 |
3 files changed, 25 insertions, 2 deletions
diff --git a/tests/test_proactor_events.py b/tests/test_proactor_events.py index fcd9ab1..0c0d44d 100644 --- a/tests/test_proactor_events.py +++ b/tests/test_proactor_events.py @@ -144,6 +144,12 @@ class ProactorSocketTransportTests(test_utils.TestCase): self.assertEqual(tr._buffer, b'data') self.assertFalse(tr._loop_writing.called) + def test_write_closing(self): + transport = self.socket_transport() + transport.close() + # write() is disallowed after close() + self.assertRaises(RuntimeError, transport.write, b'data') + def test_loop_writing(self): tr = self.socket_transport() tr._buffer = bytearray(b'data') diff --git a/tests/test_selector_events.py b/tests/test_selector_events.py index f0fcdd2..80f93ec 100644 --- a/tests/test_selector_events.py +++ b/tests/test_selector_events.py @@ -1000,8 +1000,8 @@ class SelectorSocketTransportTests(test_utils.TestCase): transport = self.socket_transport() transport.close() self.assertEqual(transport._conn_lost, 1) - transport.write(b'data') - self.assertEqual(transport._conn_lost, 2) + # write() is disallowed after close() + self.assertRaises(RuntimeError, transport.write, b'data') def test_write_ready(self): data = b'data' diff --git a/tests/test_streams.py b/tests/test_streams.py index 242b377..c883ba3 100644 --- a/tests/test_streams.py +++ b/tests/test_streams.py @@ -636,6 +636,23 @@ os.close(fd) protocol = asyncio.StreamReaderProtocol(reader) self.assertIs(protocol._loop, self.loop) + def test_pause_writing_closing(self): + reader = mock.Mock() + transport = asyncio.ReadTransport() + protocol = asyncio.StreamReaderProtocol(reader, loop=self.loop) + protocol.connection_made(transport) + transport._closing = True + self.assertRaises(RuntimeError, protocol.pause_writing) + + def test_resume_writing_closing(self): + reader = mock.Mock() + transport = asyncio.ReadTransport() + protocol = asyncio.StreamReaderProtocol(reader, loop=self.loop) + protocol.connection_made(transport) + protocol.pause_writing() + transport._closing = True + self.assertRaises(RuntimeError, protocol.resume_writing) + if __name__ == '__main__': unittest.main() |