summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorYury Selivanov <yury@edgedb.com>2019-09-29 22:30:17 -0700
committerGitHub <noreply@github.com>2019-09-29 22:30:17 -0700
commit1c19d656a79a00f58361ceb61c0a6d1faf90c686 (patch)
tree3547b3ebcb63442c1621a09e3c347e1d034d657a
parent21f24ead90c22d0e2c2ebf14a64b37d99de54b33 (diff)
downloadcpython-git-1c19d656a79a00f58361ceb61c0a6d1faf90c686.tar.gz
bpo-38242: Revert "bpo-36889: Merge asyncio streams (GH-13251)" (#16482) (#16485)
See https://bugs.python.org/issue38242 for more details
-rw-r--r--Doc/library/asyncio-api-index.rst36
-rw-r--r--Doc/library/asyncio-eventloop.rst3
-rw-r--r--Doc/library/asyncio-protocol.rst4
-rw-r--r--Doc/library/asyncio-stream.rst563
-rw-r--r--Lib/asyncio/__init__.py38
-rw-r--r--Lib/asyncio/streams.py1252
-rw-r--r--Lib/asyncio/subprocess.py64
-rw-r--r--Lib/test/test_asyncio/test_buffered_proto.py7
-rw-r--r--Lib/test/test_asyncio/test_pep492.py8
-rw-r--r--Lib/test/test_asyncio/test_streams.py1132
-rw-r--r--Lib/test/test_asyncio/test_subprocess.py12
-rw-r--r--Lib/test/test_asyncio/test_windows_events.py12
-rw-r--r--Misc/NEWS.d/next/Library/2019-09-30-00-15-27.bpo-38242.uPIyAc.rst1
13 files changed, 379 insertions, 2753 deletions
diff --git a/Doc/library/asyncio-api-index.rst b/Doc/library/asyncio-api-index.rst
index 716cf09dc9..d5b5659abc 100644
--- a/Doc/library/asyncio-api-index.rst
+++ b/Doc/library/asyncio-api-index.rst
@@ -132,47 +132,23 @@ High-level APIs to work with network IO.
:widths: 50 50
:class: full-width-table
- * - ``await`` :func:`connect`
- - Establish a TCP connection to send and receive data.
-
* - ``await`` :func:`open_connection`
- - Establish a TCP connection. (Deprecated in favor of :func:`connect`)
-
- * - ``await`` :func:`connect_unix`
- - Establish a Unix socket connection to send and receive data.
+ - Establish a TCP connection.
* - ``await`` :func:`open_unix_connection`
- - Establish a Unix socket connection. (Deprecated in favor of :func:`connect_unix`)
-
- * - :class:`StreamServer`
- - Start a TCP server.
+ - Establish a Unix socket connection.
* - ``await`` :func:`start_server`
- - Start a TCP server. (Deprecated in favor of :class:`StreamServer`)
-
- * - :class:`UnixStreamServer`
- - Start a Unix socket server.
+ - Start a TCP server.
* - ``await`` :func:`start_unix_server`
- - Start a Unix socket server. (Deprecated in favor of :class:`UnixStreamServer`)
-
- * - :func:`connect_read_pipe`
- - Establish a connection to :term:`file-like object <file object>` *pipe*
- to receive data.
-
- * - :func:`connect_write_pipe`
- - Establish a connection to :term:`file-like object <file object>` *pipe*
- to send data.
-
- * - :class:`Stream`
- - Stream is a single object combining APIs of :class:`StreamReader` and
- :class:`StreamWriter`.
+ - Start a Unix socket server.
* - :class:`StreamReader`
- - High-level async/await object to receive network data. (Deprecated in favor of :class:`Stream`)
+ - High-level async/await object to receive network data.
* - :class:`StreamWriter`
- - High-level async/await object to send network data. (Deprecated in favor of :class:`Stream`)
+ - High-level async/await object to send network data.
.. rubric:: Examples
diff --git a/Doc/library/asyncio-eventloop.rst b/Doc/library/asyncio-eventloop.rst
index 8f7974be66..f763fd5f03 100644
--- a/Doc/library/asyncio-eventloop.rst
+++ b/Doc/library/asyncio-eventloop.rst
@@ -1625,7 +1625,8 @@ Wait until a file descriptor received some data using the
:meth:`loop.create_connection` method.
* Another similar :ref:`example <asyncio_example_create_connection-streams>`
- using the high-level :func:`asyncio.connect` function and streams.
+ using the high-level :func:`asyncio.open_connection` function
+ and streams.
.. _asyncio_example_unix_signals:
diff --git a/Doc/library/asyncio-protocol.rst b/Doc/library/asyncio-protocol.rst
index cb0317ea20..67ca121081 100644
--- a/Doc/library/asyncio-protocol.rst
+++ b/Doc/library/asyncio-protocol.rst
@@ -809,7 +809,7 @@ data, and waits until the connection is closed::
.. seealso::
The :ref:`TCP echo client using streams <asyncio-tcp-echo-client-streams>`
- example uses the high-level :func:`asyncio.connect` function.
+ example uses the high-level :func:`asyncio.open_connection` function.
.. _asyncio-udp-echo-server-protocol:
@@ -978,7 +978,7 @@ Wait until a socket receives data using the
The :ref:`register an open socket to wait for data using streams
<asyncio_example_create_connection-streams>` example uses high-level streams
- created by the :func:`asyncio.connect` function in a coroutine.
+ created by the :func:`open_connection` function in a coroutine.
.. _asyncio_example_subprocess_proto:
diff --git a/Doc/library/asyncio-stream.rst b/Doc/library/asyncio-stream.rst
index feebd227eb..471e6e9099 100644
--- a/Doc/library/asyncio-stream.rst
+++ b/Doc/library/asyncio-stream.rst
@@ -18,12 +18,19 @@ streams::
import asyncio
async def tcp_echo_client(message):
- async with asyncio.connect('127.0.0.1', 8888) as stream:
- print(f'Send: {message!r}')
- await stream.write(message.encode())
+ reader, writer = await asyncio.open_connection(
+ '127.0.0.1', 8888)
- data = await stream.read(100)
- print(f'Received: {data.decode()!r}')
+ print(f'Send: {message!r}')
+ writer.write(message.encode())
+ await writer.drain()
+
+ data = await reader.read(100)
+ print(f'Received: {data.decode()!r}')
+
+ print('Close the connection')
+ writer.close()
+ await writer.wait_closed()
asyncio.run(tcp_echo_client('Hello World!'))
@@ -37,31 +44,6 @@ The following top-level asyncio functions can be used to create
and work with streams:
-.. coroutinefunction:: connect(host=None, port=None, \*, \
- limit=2**16, ssl=None, family=0, \
- proto=0, flags=0, sock=None, local_addr=None, \
- server_hostname=None, ssl_handshake_timeout=None, \
- happy_eyeballs_delay=None, interleave=None)
-
- Connect to TCP socket on *host* : *port* address and return a :class:`Stream`
- object of mode :attr:`StreamMode.READWRITE`.
-
- *limit* determines the buffer size limit used by the returned :class:`Stream`
- instance. By default the *limit* is set to 64 KiB.
-
- The rest of the arguments are passed directly to :meth:`loop.create_connection`.
-
- The function can be used with ``await`` to get a connected stream::
-
- stream = await asyncio.connect('127.0.0.1', 8888)
-
- The function can also be used as an async context manager::
-
- async with asyncio.connect('127.0.0.1', 8888) as stream:
- ...
-
- .. versionadded:: 3.8
-
.. coroutinefunction:: open_connection(host=None, port=None, \*, \
loop=None, limit=None, ssl=None, family=0, \
proto=0, flags=0, sock=None, local_addr=None, \
@@ -87,12 +69,8 @@ and work with streams:
The *ssl_handshake_timeout* parameter.
- .. deprecated-removed:: 3.8 3.10
-
- `open_connection()` is deprecated in favor of :func:`connect`.
-
.. coroutinefunction:: start_server(client_connected_cb, host=None, \
- port=None, \*, loop=None, limit=2**16, \
+ port=None, \*, loop=None, limit=None, \
family=socket.AF_UNSPEC, \
flags=socket.AI_PASSIVE, sock=None, \
backlog=100, ssl=None, reuse_address=None, \
@@ -124,60 +102,9 @@ and work with streams:
The *ssl_handshake_timeout* and *start_serving* parameters.
- .. deprecated-removed:: 3.8 3.10
-
- `start_server()` is deprecated if favor of :class:`StreamServer`
-
-.. coroutinefunction:: connect_read_pipe(pipe, *, limit=2**16)
-
- Takes a :term:`file-like object <file object>` *pipe* to return a
- :class:`Stream` object of the mode :attr:`StreamMode.READ` that has
- similar API of :class:`StreamReader`. It can also be used as an async context manager.
-
- *limit* determines the buffer size limit used by the returned :class:`Stream`
- instance. By default the limit is set to 64 KiB.
-
- .. versionadded:: 3.8
-
-.. coroutinefunction:: connect_write_pipe(pipe, *, limit=2**16)
-
- Takes a :term:`file-like object <file object>` *pipe* to return a
- :class:`Stream` object of the mode :attr:`StreamMode.WRITE` that has
- similar API of :class:`StreamWriter`. It can also be used as an async context manager.
-
- *limit* determines the buffer size limit used by the returned :class:`Stream`
- instance. By default the limit is set to 64 KiB.
-
- .. versionadded:: 3.8
.. rubric:: Unix Sockets
-.. function:: connect_unix(path=None, *, limit=2**16, ssl=None, \
- sock=None, server_hostname=None, \
- ssl_handshake_timeout=None)
-
- Establish a Unix socket connection to socket with *path* address and
- return an awaitable :class:`Stream` object of the mode :attr:`StreamMode.READWRITE`
- that can be used as a reader and a writer.
-
- *limit* determines the buffer size limit used by the returned :class:`Stream`
- instance. By default the *limit* is set to 64 KiB.
-
- The rest of the arguments are passed directly to :meth:`loop.create_unix_connection`.
-
- The function can be used with ``await`` to get a connected stream::
-
- stream = await asyncio.connect_unix('/tmp/example.sock')
-
- The function can also be used as an async context manager::
-
- async with asyncio.connect_unix('/tmp/example.sock') as stream:
- ...
-
- .. availability:: Unix.
-
- .. versionadded:: 3.8
-
.. coroutinefunction:: open_unix_connection(path=None, \*, loop=None, \
limit=None, ssl=None, sock=None, \
server_hostname=None, ssl_handshake_timeout=None)
@@ -199,10 +126,6 @@ and work with streams:
The *path* parameter can now be a :term:`path-like object`
- .. deprecated-removed:: 3.8 3.10
-
- ``open_unix_connection()`` is deprecated if favor of :func:`connect_unix`.
-
.. coroutinefunction:: start_unix_server(client_connected_cb, path=None, \
\*, loop=None, limit=None, sock=None, \
@@ -225,349 +148,6 @@ and work with streams:
The *path* parameter can now be a :term:`path-like object`.
- .. deprecated-removed:: 3.8 3.10
-
- ``start_unix_server()`` is deprecated in favor of :class:`UnixStreamServer`.
-
-
----------
-
-StreamServer
-============
-
-.. class:: StreamServer(client_connected_cb, /, host=None, port=None, *, \
- limit=2**16, family=socket.AF_UNSPEC, \
- flags=socket.AI_PASSIVE, sock=None, backlog=100, \
- ssl=None, reuse_address=None, reuse_port=None, \
- ssl_handshake_timeout=None, shutdown_timeout=60)
-
- The *client_connected_cb* callback is called whenever a new client
- connection is established. It receives a :class:`Stream` object of the
- mode :attr:`StreamMode.READWRITE`.
-
- *client_connected_cb* can be a plain callable or a
- :ref:`coroutine function <coroutine>`; if it is a coroutine function,
- it will be automatically scheduled as a :class:`Task`.
-
- *limit* determines the buffer size limit used by the
- returned :class:`Stream` instance. By default the *limit*
- is set to 64 KiB.
-
- The rest of the arguments are passed directly to
- :meth:`loop.create_server`.
-
- .. coroutinemethod:: start_serving()
-
- Binds to the given host and port to start the server.
-
- .. coroutinemethod:: serve_forever()
-
- Start accepting connections until the coroutine is cancelled.
- Cancellation of ``serve_forever`` task causes the server
- to be closed.
-
- This method can be called if the server is already accepting
- connections. Only one ``serve_forever`` task can exist per
- one *Server* object.
-
- .. method:: is_serving()
-
- Returns ``True`` if the server is bound and currently serving.
-
- .. method:: bind()
-
- Bind the server to the given *host* and *port*. This method is
- automatically called during ``__aenter__`` when :class:`StreamServer` is
- used as an async context manager.
-
- .. method:: is_bound()
-
- Return ``True`` if the server is bound.
-
- .. coroutinemethod:: abort()
-
- Closes the connection and cancels all pending tasks.
-
- .. coroutinemethod:: close()
-
- Closes the connection. This method is automatically called during
- ``__aexit__`` when :class:`StreamServer` is used as an async context
- manager.
-
- .. attribute:: sockets
-
- Returns a tuple of socket objects the server is bound to.
-
- .. versionadded:: 3.8
-
-
-UnixStreamServer
-================
-
-.. class:: UnixStreamServer(client_connected_cb, /, path=None, *, \
- limit=2**16, sock=None, backlog=100, \
- ssl=None, ssl_handshake_timeout=None, shutdown_timeout=60)
-
- The *client_connected_cb* callback is called whenever a new client
- connection is established. It receives a :class:`Stream` object of the
- mode :attr:`StreamMode.READWRITE`.
-
- *client_connected_cb* can be a plain callable or a
- :ref:`coroutine function <coroutine>`; if it is a coroutine function,
- it will be automatically scheduled as a :class:`Task`.
-
- *limit* determines the buffer size limit used by the
- returned :class:`Stream` instance. By default the *limit*
- is set to 64 KiB.
-
- The rest of the arguments are passed directly to
- :meth:`loop.create_unix_server`.
-
- .. coroutinemethod:: start_serving()
-
- Binds to the given host and port to start the server.
-
- .. method:: is_serving()
-
- Returns ``True`` if the server is bound and currently serving.
-
- .. method:: bind()
-
- Bind the server to the given *host* and *port*. This method is
- automatically called during ``__aenter__`` when :class:`UnixStreamServer` is
- used as an async context manager.
-
- .. method:: is_bound()
-
- Return ``True`` if the server is bound.
-
- .. coroutinemethod:: abort()
-
- Closes the connection and cancels all pending tasks.
-
- .. coroutinemethod:: close()
-
- Closes the connection. This method is automatically called during
- ``__aexit__`` when :class:`UnixStreamServer` is used as an async context
- manager.
-
- .. attribute:: sockets
-
- Returns a tuple of socket objects the server is bound to.
-
- .. availability:: Unix.
-
- .. versionadded:: 3.8
-
-Stream
-======
-
-.. class:: Stream
-
- Represents a Stream object that provides APIs to read and write data
- to the IO stream . It includes the API provided by :class:`StreamReader`
- and :class:`StreamWriter`. It can also be used as :term:`asynchronous iterator`
- where :meth:`readline` is used. It raises :exc:`StopAsyncIteration` when
- :meth:`readline` returns empty data.
-
- Do not instantiate *Stream* objects directly; use API like :func:`connect`
- and :class:`StreamServer` instead.
-
- .. versionadded:: 3.8
-
- .. attribute:: mode
-
- Returns the mode of the stream which is a :class:`StreamMode` value. It could
- be one of the below:
-
- * :attr:`StreamMode.READ` - Connection can receive data.
- * :attr:`StreamMode.WRITE` - Connection can send data.
- * :attr:`StreamMode.READWRITE` - Connection can send and receive data.
-
- .. coroutinemethod:: abort()
-
- Aborts the connection immediately, without waiting for the send buffer to drain.
-
- .. method:: at_eof()
-
- Return ``True`` if the buffer is empty.
-
- .. method:: can_write_eof()
-
- Return *True* if the underlying transport supports
- the :meth:`write_eof` method, *False* otherwise.
-
- .. method:: close()
-
- The method closes the stream and the underlying socket.
-
- It is possible to directly await on the `close()` method::
-
- await stream.close()
-
- The ``await`` pauses the current coroutine until the stream and the underlying
- socket are closed (and SSL shutdown is performed for a secure connection).
-
- .. coroutinemethod:: drain()
-
- Wait until it is appropriate to resume writing to the stream.
- Example::
-
- stream.write(data)
- await stream.drain()
-
- This is a flow control method that interacts with the underlying
- IO write buffer. When the size of the buffer reaches
- the high watermark, *drain()* blocks until the size of the
- buffer is drained down to the low watermark and writing can
- be resumed. When there is nothing to wait for, the :meth:`drain`
- returns immediately.
-
- .. deprecated:: 3.8
-
- It is recommended to directly await on the `write()` method instead::
-
- await stream.write(data)
-
- .. method:: get_extra_info(name, default=None)
-
- Access optional transport information; see
- :meth:`BaseTransport.get_extra_info` for details.
-
- .. method:: is_closing()
-
- Return ``True`` if the stream is closed or in the process of
- being closed.
-
- .. coroutinemethod:: read(n=-1)
-
- Read up to *n* bytes. If *n* is not provided, or set to ``-1``,
- read until EOF and return all read bytes.
-
- If EOF was received and the internal buffer is empty,
- return an empty ``bytes`` object.
-
- .. coroutinemethod:: readexactly(n)
-
- Read exactly *n* bytes.
-
- Raise an :exc:`IncompleteReadError` if EOF is reached before *n*
- can be read. Use the :attr:`IncompleteReadError.partial`
- attribute to get the partially read data.
-
- .. coroutinemethod:: readline()
-
- Read one line, where "line" is a sequence of bytes
- ending with ``\n``.
-
- If EOF is received and ``\n`` was not found, the method
- returns partially read data.
-
- If EOF is received and the internal buffer is empty,
- return an empty ``bytes`` object.
-
- .. coroutinemethod:: readuntil(separator=b'\\n')
-
- Read data from the stream until *separator* is found.
-
- On success, the data and separator will be removed from the
- internal buffer (consumed). Returned data will include the
- separator at the end.
-
- If the amount of data read exceeds the configured stream limit, a
- :exc:`LimitOverrunError` exception is raised, and the data
- is left in the internal buffer and can be read again.
-
- If EOF is reached before the complete separator is found,
- an :exc:`IncompleteReadError` exception is raised, and the internal
- buffer is reset. The :attr:`IncompleteReadError.partial` attribute
- may contain a portion of the separator.
-
- .. coroutinemethod:: sendfile(file, offset=0, count=None, *, fallback=True)
-
- Sends a *file* over the stream using an optimized syscall if available.
-
- For other parameters meaning please see :meth:`AbstractEventloop.sendfile`.
-
- .. coroutinemethod:: start_tls(sslcontext, *, server_hostname=None, \
- ssl_handshake_timeout=None)
-
- Upgrades the existing transport-based connection to TLS.
-
- For other parameters meaning please see :meth:`AbstractEventloop.start_tls`.
-
- .. coroutinemethod:: wait_closed()
-
- Wait until the stream is closed.
-
- Should be called after :meth:`close` to wait until the underlying
- connection is closed.
-
- .. coroutinemethod:: write(data)
-
- Write *data* to the underlying socket; wait until the data is sent, e.g.::
-
- await stream.write(data)
-
- .. method:: write(data)
-
- The method attempts to write the *data* to the underlying socket immediately.
- If that fails, the data is queued in an internal write buffer until it can be
- sent. :meth:`drain` can be used to flush the underlying buffer once writing is
- available::
-
- stream.write(data)
- await stream.drain()
-
- .. deprecated:: 3.8
-
- It is recommended to directly await on the `write()` method instead::
-
- await stream.write(data)
-
- .. method:: writelines(data)
-
- The method writes a list (or any iterable) of bytes to the underlying socket
- immediately.
- If that fails, the data is queued in an internal write buffer until it can be
- sent.
-
- It is possible to directly await on the `writelines()` method::
-
- await stream.writelines(lines)
-
- The ``await`` pauses the current coroutine until the data is written to the
- socket.
-
- .. method:: write_eof()
-
- Close the write end of the stream after the buffered write
- data is flushed.
-
-
-StreamMode
-==========
-
-.. class:: StreamMode
-
- A subclass of :class:`enum.Flag` that defines a set of values that can be
- used to determine the ``mode`` of :class:`Stream` objects.
-
- .. data:: READ
-
- The stream object is readable and provides the API of :class:`StreamReader`.
-
- .. data:: WRITE
-
- The stream object is writeable and provides the API of :class:`StreamWriter`.
-
- .. data:: READWRITE
-
- The stream object is readable and writeable and provides the API of both
- :class:`StreamReader` and :class:`StreamWriter`.
-
- .. versionadded:: 3.8
-
StreamReader
============
@@ -629,7 +209,8 @@ StreamReader
.. method:: at_eof()
- Return ``True`` if the buffer is empty.
+ Return ``True`` if the buffer is empty and :meth:`feed_eof`
+ was called.
StreamWriter
@@ -650,22 +231,11 @@ StreamWriter
If that fails, the data is queued in an internal write buffer until it can be
sent.
- Starting with Python 3.8, it is possible to directly await on the `write()`
- method::
-
- await stream.write(data)
-
- The ``await`` pauses the current coroutine until the data is written to the
- socket.
-
- Below is an equivalent code that works with Python <= 3.7::
+ The method should be used along with the ``drain()`` method::
stream.write(data)
await stream.drain()
- .. versionchanged:: 3.8
- Support ``await stream.write(...)`` syntax.
-
.. method:: writelines(data)
The method writes a list (or any iterable) of bytes to the underlying socket
@@ -673,42 +243,20 @@ StreamWriter
If that fails, the data is queued in an internal write buffer until it can be
sent.
- Starting with Python 3.8, it is possible to directly await on the `writelines()`
- method::
-
- await stream.writelines(lines)
-
- The ``await`` pauses the current coroutine until the data is written to the
- socket.
-
- Below is an equivalent code that works with Python <= 3.7::
+ The method should be used along with the ``drain()`` method::
stream.writelines(lines)
await stream.drain()
- .. versionchanged:: 3.8
- Support ``await stream.writelines()`` syntax.
-
.. method:: close()
The method closes the stream and the underlying socket.
- Starting with Python 3.8, it is possible to directly await on the `close()`
- method::
-
- await stream.close()
-
- The ``await`` pauses the current coroutine until the stream and the underlying
- socket are closed (and SSL shutdown is performed for a secure connection).
-
- Below is an equivalent code that works with Python <= 3.7::
+ The method should be used along with the ``wait_closed()`` method::
stream.close()
await stream.wait_closed()
- .. versionchanged:: 3.8
- Support ``await stream.close()`` syntax.
-
.. method:: can_write_eof()
Return *True* if the underlying transport supports
@@ -768,17 +316,22 @@ Examples
TCP echo client using streams
-----------------------------
-TCP echo client using the :func:`asyncio.connect` function::
+TCP echo client using the :func:`asyncio.open_connection` function::
import asyncio
async def tcp_echo_client(message):
- async with asyncio.connect('127.0.0.1', 8888) as stream:
- print(f'Send: {message!r}')
- await stream.write(message.encode())
+ reader, writer = await asyncio.open_connection(
+ '127.0.0.1', 8888)
+
+ print(f'Send: {message!r}')
+ writer.write(message.encode())
+
+ data = await reader.read(100)
+ print(f'Received: {data.decode()!r}')
- data = await stream.read(100)
- print(f'Received: {data.decode()!r}')
+ print('Close the connection')
+ writer.close()
asyncio.run(tcp_echo_client('Hello World!'))
@@ -794,28 +347,32 @@ TCP echo client using the :func:`asyncio.connect` function::
TCP echo server using streams
-----------------------------
-TCP echo server using the :class:`asyncio.StreamServer` class::
+TCP echo server using the :func:`asyncio.start_server` function::
import asyncio
- async def handle_echo(stream):
- data = await stream.read(100)
+ async def handle_echo(reader, writer):
+ data = await reader.read(100)
message = data.decode()
- addr = stream.get_extra_info('peername')
+ addr = writer.get_extra_info('peername')
print(f"Received {message!r} from {addr!r}")
print(f"Send: {message!r}")
- await stream.write(data)
+ writer.write(data)
+ await writer.drain()
print("Close the connection")
- await stream.close()
+ writer.close()
async def main():
- async with asyncio.StreamServer(
- handle_echo, '127.0.0.1', 8888) as server:
- addr = server.sockets[0].getsockname()
- print(f'Serving on {addr}')
+ server = await asyncio.start_server(
+ handle_echo, '127.0.0.1', 8888)
+
+ addr = server.sockets[0].getsockname()
+ print(f'Serving on {addr}')
+
+ async with server:
await server.serve_forever()
asyncio.run(main())
@@ -839,9 +396,11 @@ Simple example querying HTTP headers of the URL passed on the command line::
async def print_http_headers(url):
url = urllib.parse.urlsplit(url)
if url.scheme == 'https':
- stream = await asyncio.connect(url.hostname, 443, ssl=True)
+ reader, writer = await asyncio.open_connection(
+ url.hostname, 443, ssl=True)
else:
- stream = await asyncio.connect(url.hostname, 80)
+ reader, writer = await asyncio.open_connection(
+ url.hostname, 80)
query = (
f"HEAD {url.path or '/'} HTTP/1.0\r\n"
@@ -849,14 +408,18 @@ Simple example querying HTTP headers of the URL passed on the command line::
f"\r\n"
)
- stream.write(query.encode('latin-1'))
- while (line := await stream.readline()):
+ writer.write(query.encode('latin-1'))
+ while True:
+ line = await reader.readline()
+ if not line:
+ break
+
line = line.decode('latin1').rstrip()
if line:
print(f'HTTP header> {line}')
# Ignore the body, close the socket
- await stream.close()
+ writer.close()
url = sys.argv[1]
asyncio.run(print_http_headers(url))
@@ -877,7 +440,7 @@ Register an open socket to wait for data using streams
------------------------------------------------------
Coroutine waiting until a socket receives data using the
-:func:`asyncio.connect` function::
+:func:`open_connection` function::
import asyncio
import socket
@@ -891,15 +454,17 @@ Coroutine waiting until a socket receives data using the
rsock, wsock = socket.socketpair()
# Register the open socket to wait for data.
- async with asyncio.connect(sock=rsock) as stream:
- # Simulate the reception of data from the network
- loop.call_soon(wsock.send, 'abc'.encode())
+ reader, writer = await asyncio.open_connection(sock=rsock)
+
+ # Simulate the reception of data from the network
+ loop.call_soon(wsock.send, 'abc'.encode())
- # Wait for data
- data = await stream.read(100)
+ # Wait for data
+ data = await reader.read(100)
- # Got data, we are done: close the socket
- print("Received:", data.decode())
+ # Got data, we are done: close the socket
+ print("Received:", data.decode())
+ writer.close()
# Close the second socket
wsock.close()
diff --git a/Lib/asyncio/__init__.py b/Lib/asyncio/__init__.py
index a6a29dbfec..28c2e2c429 100644
--- a/Lib/asyncio/__init__.py
+++ b/Lib/asyncio/__init__.py
@@ -3,7 +3,6 @@
# flake8: noqa
import sys
-import warnings
# This relies on each of the submodules having an __all__ variable.
from .base_events import *
@@ -44,40 +43,3 @@ if sys.platform == 'win32': # pragma: no cover
else:
from .unix_events import * # pragma: no cover
__all__ += unix_events.__all__
-
-
-__all__ += ('StreamReader', 'StreamWriter', 'StreamReaderProtocol') # deprecated
-
-
-def __getattr__(name):
- global StreamReader, StreamWriter, StreamReaderProtocol
- if name == 'StreamReader':
- warnings.warn("StreamReader is deprecated since Python 3.8 "
- "in favor of Stream, and scheduled for removal "
- "in Python 3.10",
- DeprecationWarning,
- stacklevel=2)
- from .streams import StreamReader as sr
- StreamReader = sr
- return StreamReader
- if name == 'StreamWriter':
- warnings.warn("StreamWriter is deprecated since Python 3.8 "
- "in favor of Stream, and scheduled for removal "
- "in Python 3.10",
- DeprecationWarning,
- stacklevel=2)
- from .streams import StreamWriter as sw
- StreamWriter = sw
- return StreamWriter
- if name == 'StreamReaderProtocol':
- warnings.warn("Using asyncio internal class StreamReaderProtocol "
- "is deprecated since Python 3.8 "
- " and scheduled for removal "
- "in Python 3.10",
- DeprecationWarning,
- stacklevel=2)
- from .streams import StreamReaderProtocol as srp
- StreamReaderProtocol = srp
- return StreamReaderProtocol
-
- raise AttributeError(f"module {__name__} has no attribute {name}")
diff --git a/Lib/asyncio/streams.py b/Lib/asyncio/streams.py
index 20c642a001..795530e6f6 100644
--- a/Lib/asyncio/streams.py
+++ b/Lib/asyncio/streams.py
@@ -1,19 +1,14 @@
__all__ = (
- 'Stream', 'StreamMode',
- 'open_connection', 'start_server',
- 'connect', 'connect_read_pipe', 'connect_write_pipe',
- 'StreamServer')
+ 'StreamReader', 'StreamWriter', 'StreamReaderProtocol',
+ 'open_connection', 'start_server')
-import enum
import socket
import sys
import warnings
import weakref
if hasattr(socket, 'AF_UNIX'):
- __all__ += ('open_unix_connection', 'start_unix_server',
- 'connect_unix',
- 'UnixStreamServer')
+ __all__ += ('open_unix_connection', 'start_unix_server')
from . import coroutines
from . import events
@@ -21,155 +16,12 @@ from . import exceptions
from . import format_helpers
from . import protocols
from .log import logger
-from . import tasks
+from .tasks import sleep
_DEFAULT_LIMIT = 2 ** 16 # 64 KiB
-class StreamMode(enum.Flag):
- READ = enum.auto()
- WRITE = enum.auto()
- READWRITE = READ | WRITE
-
-
-def _ensure_can_read(mode):
- if not mode & StreamMode.READ:
- raise RuntimeError("The stream is write-only")
-
-
-def _ensure_can_write(mode):
- if not mode & StreamMode.WRITE:
- raise RuntimeError("The stream is read-only")
-
-
-class _ContextManagerHelper:
- __slots__ = ('_awaitable', '_result')
-
- def __init__(self, awaitable):
- self._awaitable = awaitable
- self._result = None
-
- def __await__(self):
- return self._awaitable.__await__()
-
- async def __aenter__(self):
- ret = await self._awaitable
- result = await ret.__aenter__()
- self._result = result
- return result
-
- async def __aexit__(self, exc_type, exc_val, exc_tb):
- return await self._result.__aexit__(exc_type, exc_val, exc_tb)
-
-
-def connect(host=None, port=None, *,
- limit=_DEFAULT_LIMIT,
- ssl=None, family=0, proto=0,
- flags=0, sock=None, local_addr=None,
- server_hostname=None,
- ssl_handshake_timeout=None,
- happy_eyeballs_delay=None, interleave=None):
- """Connect to TCP socket on *host* : *port* address to send and receive data.
-
- *limit* determines the buffer size limit used by the returned `Stream`
- instance. By default the *limit* is set to 64 KiB.
-
- The rest of the arguments are passed directly to `loop.create_connection()`.
- """
- # Design note:
- # Don't use decorator approach but explicit non-async
- # function to fail fast and explicitly
- # if passed arguments don't match the function signature
- return _ContextManagerHelper(_connect(host, port, limit,
- ssl, family, proto,
- flags, sock, local_addr,
- server_hostname,
- ssl_handshake_timeout,
- happy_eyeballs_delay,
- interleave))
-
-
-async def _connect(host, port,
- limit,
- ssl, family, proto,
- flags, sock, local_addr,
- server_hostname,
- ssl_handshake_timeout,
- happy_eyeballs_delay, interleave):
- loop = events.get_running_loop()
- stream = Stream(mode=StreamMode.READWRITE,
- limit=limit,
- loop=loop,
- _asyncio_internal=True)
- await loop.create_connection(
- lambda: _StreamProtocol(stream, loop=loop,
- _asyncio_internal=True),
- host, port,
- ssl=ssl, family=family, proto=proto,
- flags=flags, sock=sock, local_addr=local_addr,
- server_hostname=server_hostname,
- ssl_handshake_timeout=ssl_handshake_timeout,
- happy_eyeballs_delay=happy_eyeballs_delay, interleave=interleave)
- return stream
-
-
-def connect_read_pipe(pipe, *, limit=_DEFAULT_LIMIT):
- """Establish a connection to a file-like object *pipe* to receive data.
-
- Takes a file-like object *pipe* to return a Stream object of the mode
- StreamMode.READ that has similar API of StreamReader. It can also be used
- as an async context manager.
- """
-
- # Design note:
- # Don't use decorator approach but explicit non-async
- # function to fail fast and explicitly
- # if passed arguments don't match the function signature
- return _ContextManagerHelper(_connect_read_pipe(pipe, limit))
-
-
-async def _connect_read_pipe(pipe, limit):
- loop = events.get_running_loop()
- stream = Stream(mode=StreamMode.READ,
- limit=limit,
- loop=loop,
- _asyncio_internal=True)
- await loop.connect_read_pipe(
- lambda: _StreamProtocol(stream, loop=loop,
- _asyncio_internal=True),
- pipe)
- return stream
-
-
-def connect_write_pipe(pipe, *, limit=_DEFAULT_LIMIT):
- """Establish a connection to a file-like object *pipe* to send data.
-
- Takes a file-like object *pipe* to return a Stream object of the mode
- StreamMode.WRITE that has similar API of StreamWriter. It can also be used
- as an async context manager.
- """
-
- # Design note:
- # Don't use decorator approach but explicit non-async
- # function to fail fast and explicitly
- # if passed arguments don't match the function signature
- return _ContextManagerHelper(_connect_write_pipe(pipe, limit))
-
-
-async def _connect_write_pipe(pipe, limit):
- loop = events.get_running_loop()
- stream = Stream(mode=StreamMode.WRITE,
- limit=limit,
- loop=loop,
- _asyncio_internal=True)
- await loop.connect_write_pipe(
- lambda: _StreamProtocol(stream, loop=loop,
- _asyncio_internal=True),
- pipe)
- return stream
-
-
async def open_connection(host=None, port=None, *,
loop=None, limit=_DEFAULT_LIMIT, **kwds):
"""A wrapper for create_connection() returning a (reader, writer) pair.
@@ -189,11 +41,6 @@ async def open_connection(host=None, port=None, *,
StreamReaderProtocol classes, just copy the code -- there's
really nothing special here except some convenience.)
"""
- warnings.warn("open_connection() is deprecated since Python 3.8 "
- "in favor of connect(), and scheduled for removal "
- "in Python 3.10",
- DeprecationWarning,
- stacklevel=2)
if loop is None:
loop = events.get_event_loop()
else:
@@ -201,7 +48,7 @@ async def open_connection(host=None, port=None, *,
"and scheduled for removal in Python 3.10.",
DeprecationWarning, stacklevel=2)
reader = StreamReader(limit=limit, loop=loop)
- protocol = StreamReaderProtocol(reader, loop=loop, _asyncio_internal=True)
+ protocol = StreamReaderProtocol(reader, loop=loop)
transport, _ = await loop.create_connection(
lambda: protocol, host, port, **kwds)
writer = StreamWriter(transport, protocol, reader, loop)
@@ -231,11 +78,6 @@ async def start_server(client_connected_cb, host=None, port=None, *,
The return value is the same as loop.create_server(), i.e. a
Server object which can be used to stop the service.
"""
- warnings.warn("start_server() is deprecated since Python 3.8 "
- "in favor of StreamServer(), and scheduled for removal "
- "in Python 3.10",
- DeprecationWarning,
- stacklevel=2)
if loop is None:
loop = events.get_event_loop()
else:
@@ -246,201 +88,18 @@ async def start_server(client_connected_cb, host=None, port=None, *,
def factory():
reader = StreamReader(limit=limit, loop=loop)
protocol = StreamReaderProtocol(reader, client_connected_cb,
- loop=loop,
- _asyncio_internal=True)
+ loop=loop)
return protocol
return await loop.create_server(factory, host, port, **kwds)
-class _BaseStreamServer:
- # Design notes.
- # StreamServer and UnixStreamServer are exposed as FINAL classes,
- # not function factories.
- # async with serve(host, port) as server:
- # server.start_serving()
- # looks ugly.
- # The class doesn't provide API for enumerating connected streams
- # It can be a subject for improvements in Python 3.9
-
- _server_impl = None
-
- def __init__(self, client_connected_cb,
- /,
- limit=_DEFAULT_LIMIT,
- shutdown_timeout=60,
- _asyncio_internal=False):
- if not _asyncio_internal:
- raise RuntimeError("_ServerStream is a private asyncio class")
- self._client_connected_cb = client_connected_cb
- self._limit = limit
- self._loop = events.get_running_loop()
- self._streams = {}
- self._shutdown_timeout = shutdown_timeout
-
- def __init_subclass__(cls):
- if not cls.__module__.startswith('asyncio.'):
- raise TypeError(f"asyncio.{cls.__name__} "
- "class cannot be inherited from")
-
- async def bind(self):
- if self._server_impl is not None:
- return
- self._server_impl = await self._bind()
-
- def is_bound(self):
- return self._server_impl is not None
-
- @property
- def sockets(self):
- # multiple value for socket bound to both IPv4 and IPv6 families
- if self._server_impl is None:
- return ()
- return self._server_impl.sockets
-
- def is_serving(self):
- if self._server_impl is None:
- return False
- return self._server_impl.is_serving()
-
- async def start_serving(self):
- await self.bind()
- await self._server_impl.start_serving()
-
- async def serve_forever(self):
- await self.start_serving()
- await self._server_impl.serve_forever()
-
- async def close(self):
- if self._server_impl is None:
- return
- self._server_impl.close()
- streams = list(self._streams.keys())
- active_tasks = list(self._streams.values())
- if streams:
- await tasks.wait([stream.close() for stream in streams])
- await self._server_impl.wait_closed()
- self._server_impl = None
- await self._shutdown_active_tasks(active_tasks)
-
- async def abort(self):
- if self._server_impl is None:
- return
- self._server_impl.close()
- streams = list(self._streams.keys())
- active_tasks = list(self._streams.values())
- if streams:
- await tasks.wait([stream.abort() for stream in streams])
- await self._server_impl.wait_closed()
- self._server_impl = None
- await self._shutdown_active_tasks(active_tasks)
-
- async def __aenter__(self):
- await self.bind()
- return self
-
- async def __aexit__(self, exc_type, exc_value, exc_tb):
- await self.close()
-
- def _attach(self, stream, task):
- self._streams[stream] = task
-
- def _detach(self, stream, task):
- del self._streams[stream]
-
- async def _shutdown_active_tasks(self, active_tasks):
- if not active_tasks:
- return
- # NOTE: tasks finished with exception are reported
- # by the Task.__del__() method.
- done, pending = await tasks.wait(active_tasks,
- timeout=self._shutdown_timeout)
- if not pending:
- return
- for task in pending:
- task.cancel()
- done, pending = await tasks.wait(pending,
- timeout=self._shutdown_timeout)
- for task in pending:
- self._loop.call_exception_handler({
- "message": (f'{task!r} ignored cancellation request '
- f'from a closing {self!r}'),
- "stream_server": self
- })
-
- def __repr__(self):
- ret = [f'{self.__class__.__name__}']
- if self.is_serving():
- ret.append('serving')
- if self.sockets:
- ret.append(f'sockets={self.sockets!r}')
- return '<' + ' '.join(ret) + '>'
-
- def __del__(self, _warn=warnings.warn):
- if self._server_impl is not None:
- _warn(f"unclosed stream server {self!r}",
- ResourceWarning, source=self)
- self._server_impl.close()
-
-
-class StreamServer(_BaseStreamServer):
-
- def __init__(self, client_connected_cb, /, host=None, port=None, *,
- limit=_DEFAULT_LIMIT,
- family=socket.AF_UNSPEC,
- flags=socket.AI_PASSIVE, sock=None, backlog=100,
- ssl=None, reuse_address=None, reuse_port=None,
- ssl_handshake_timeout=None,
- shutdown_timeout=60):
- super().__init__(client_connected_cb,
- limit=limit,
- shutdown_timeout=shutdown_timeout,
- _asyncio_internal=True)
- self._host = host
- self._port = port
- self._family = family
- self._flags = flags
- self._sock = sock
- self._backlog = backlog
- self._ssl = ssl
- self._reuse_address = reuse_address
- self._reuse_port = reuse_port
- self._ssl_handshake_timeout = ssl_handshake_timeout
-
- async def _bind(self):
- def factory():
- protocol = _ServerStreamProtocol(self,
- self._limit,
- self._client_connected_cb,
- loop=self._loop,
- _asyncio_internal=True)
- return protocol
- return await self._loop.create_server(
- factory,
- self._host,
- self._port,
- start_serving=False,
- family=self._family,
- flags=self._flags,
- sock=self._sock,
- backlog=self._backlog,
- ssl=self._ssl,
- reuse_address=self._reuse_address,
- reuse_port=self._reuse_port,
- ssl_handshake_timeout=self._ssl_handshake_timeout)
-
-
if hasattr(socket, 'AF_UNIX'):
# UNIX Domain Sockets are supported on this platform
async def open_unix_connection(path=None, *,
loop=None, limit=_DEFAULT_LIMIT, **kwds):
"""Similar to `open_connection` but works with UNIX Domain Sockets."""
- warnings.warn("open_unix_connection() is deprecated since Python 3.8 "
- "in favor of connect_unix(), and scheduled for removal "
- "in Python 3.10",
- DeprecationWarning,
- stacklevel=2)
if loop is None:
loop = events.get_event_loop()
else:
@@ -448,62 +107,15 @@ if hasattr(socket, 'AF_UNIX'):
"and scheduled for removal in Python 3.10.",
DeprecationWarning, stacklevel=2)
reader = StreamReader(limit=limit, loop=loop)
- protocol = StreamReaderProtocol(reader, loop=loop,
- _asyncio_internal=True)
+ protocol = StreamReaderProtocol(reader, loop=loop)
transport, _ = await loop.create_unix_connection(
lambda: protocol, path, **kwds)
writer = StreamWriter(transport, protocol, reader, loop)
return reader, writer
-
- def connect_unix(path=None, *,
- limit=_DEFAULT_LIMIT,
- ssl=None, sock=None,
- server_hostname=None,
- ssl_handshake_timeout=None):
- """Similar to `connect()` but works with UNIX Domain Sockets."""
- # Design note:
- # Don't use decorator approach but explicit non-async
- # function to fail fast and explicitly
- # if passed arguments don't match the function signature
- return _ContextManagerHelper(_connect_unix(path,
- limit,
- ssl, sock,
- server_hostname,
- ssl_handshake_timeout))
-
-
- async def _connect_unix(path,
- limit,
- ssl, sock,
- server_hostname,
- ssl_handshake_timeout):
- """Similar to `connect()` but works with UNIX Domain Sockets."""
- loop = events.get_running_loop()
- stream = Stream(mode=StreamMode.READWRITE,
- limit=limit,
- loop=loop,
- _asyncio_internal=True)
- await loop.create_unix_connection(
- lambda: _StreamProtocol(stream,
- loop=loop,
- _asyncio_internal=True),
- path,
- ssl=ssl,
- sock=sock,
- server_hostname=server_hostname,
- ssl_handshake_timeout=ssl_handshake_timeout)
- return stream
-
-
async def start_unix_server(client_connected_cb, path=None, *,
loop=None, limit=_DEFAULT_LIMIT, **kwds):
"""Similar to `start_server` but works with UNIX Domain Sockets."""
- warnings.warn("start_unix_server() is deprecated since Python 3.8 "
- "in favor of UnixStreamServer(), and scheduled "
- "for removal in Python 3.10",
- DeprecationWarning,
- stacklevel=2)
if loop is None:
loop = events.get_event_loop()
else:
@@ -514,48 +126,11 @@ if hasattr(socket, 'AF_UNIX'):
def factory():
reader = StreamReader(limit=limit, loop=loop)
protocol = StreamReaderProtocol(reader, client_connected_cb,
- loop=loop,
- _asyncio_internal=True)
+ loop=loop)
return protocol
return await loop.create_unix_server(factory, path, **kwds)
- class UnixStreamServer(_BaseStreamServer):
-
- def __init__(self, client_connected_cb, /, path=None, *,
- limit=_DEFAULT_LIMIT,
- sock=None,
- backlog=100,
- ssl=None,
- ssl_handshake_timeout=None,
- shutdown_timeout=60):
- super().__init__(client_connected_cb,
- limit=limit,
- shutdown_timeout=shutdown_timeout,
- _asyncio_internal=True)
- self._path = path
- self._sock = sock
- self._backlog = backlog
- self._ssl = ssl
- self._ssl_handshake_timeout = ssl_handshake_timeout
-
- async def _bind(self):
- def factory():
- protocol = _ServerStreamProtocol(self,
- self._limit,
- self._client_connected_cb,
- loop=self._loop,
- _asyncio_internal=True)
- return protocol
- return await self._loop.create_unix_server(
- factory,
- self._path,
- start_serving=False,
- sock=self._sock,
- backlog=self._backlog,
- ssl=self._ssl,
- ssl_handshake_timeout=self._ssl_handshake_timeout)
-
class FlowControlMixin(protocols.Protocol):
"""Reusable flow control logic for StreamWriter.drain().
@@ -567,20 +142,11 @@ class FlowControlMixin(protocols.Protocol):
StreamWriter.drain() must wait for _drain_helper() coroutine.
"""
- def __init__(self, loop=None, *, _asyncio_internal=False):
+ def __init__(self, loop=None):
if loop is None:
self._loop = events.get_event_loop()
else:
self._loop = loop
- if not _asyncio_internal:
- # NOTE:
- # Avoid inheritance from FlowControlMixin
- # Copy-paste the code to your project
- # if you need flow control helpers
- warnings.warn(f"{self.__class__} should be instaniated "
- "by asyncio internals only, "
- "please avoid its creation from user code",
- DeprecationWarning)
self._paused = False
self._drain_waiter = None
self._connection_lost = False
@@ -634,8 +200,6 @@ class FlowControlMixin(protocols.Protocol):
raise NotImplementedError
-# begin legacy stream APIs
-
class StreamReaderProtocol(FlowControlMixin, protocols.Protocol):
"""Helper class to adapt between Protocol and StreamReader.
@@ -645,47 +209,103 @@ class StreamReaderProtocol(FlowControlMixin, protocols.Protocol):
call inappropriate methods of the protocol.)
"""
- def __init__(self, stream_reader, client_connected_cb=None, loop=None,
- *, _asyncio_internal=False):
- super().__init__(loop=loop, _asyncio_internal=_asyncio_internal)
- self._stream_reader = stream_reader
+ _source_traceback = None
+
+ def __init__(self, stream_reader, client_connected_cb=None, loop=None):
+ super().__init__(loop=loop)
+ if stream_reader is not None:
+ self._stream_reader_wr = weakref.ref(stream_reader,
+ self._on_reader_gc)
+ self._source_traceback = stream_reader._source_traceback
+ else:
+ self._stream_reader_wr = None
+ if client_connected_cb is not None:
+ # This is a stream created by the `create_server()` function.
+ # Keep a strong reference to the reader until a connection
+ # is established.
+ self._strong_reader = stream_reader
+ self._reject_connection = False
self._stream_writer = None
+ self._transport = None
self._client_connected_cb = client_connected_cb
self._over_ssl = False
self._closed = self._loop.create_future()
+ def _on_reader_gc(self, wr):
+ transport = self._transport
+ if transport is not None:
+ # connection_made was called
+ context = {
+ 'message': ('An open stream object is being garbage '
+ 'collected; call "stream.close()" explicitly.')
+ }
+ if self._source_traceback:
+ context['source_traceback'] = self._source_traceback
+ self._loop.call_exception_handler(context)
+ transport.abort()
+ else:
+ self._reject_connection = True
+ self._stream_reader_wr = None
+
+ @property
+ def _stream_reader(self):
+ if self._stream_reader_wr is None:
+ return None
+ return self._stream_reader_wr()
+
def connection_made(self, transport):
- self._stream_reader.set_transport(transport)
+ if self._reject_connection:
+ context = {
+ 'message': ('An open stream was garbage collected prior to '
+ 'establishing network connection; '
+ 'call "stream.close()" explicitly.')
+ }
+ if self._source_traceback:
+ context['source_traceback'] = self._source_traceback
+ self._loop.call_exception_handler(context)
+ transport.abort()
+ return
+ self._transport = transport
+ reader = self._stream_reader
+ if reader is not None:
+ reader.set_transport(transport)
self._over_ssl = transport.get_extra_info('sslcontext') is not None
if self._client_connected_cb is not None:
self._stream_writer = StreamWriter(transport, self,
- self._stream_reader,
+ reader,
self._loop)
- res = self._client_connected_cb(self._stream_reader,
+ res = self._client_connected_cb(reader,
self._stream_writer)
if coroutines.iscoroutine(res):
self._loop.create_task(res)
+ self._strong_reader = None
def connection_lost(self, exc):
- if self._stream_reader is not None:
+ reader = self._stream_reader
+ if reader is not None:
if exc is None:
- self._stream_reader.feed_eof()
+ reader.feed_eof()
else:
- self._stream_reader.set_exception(exc)
+ reader.set_exception(exc)
if not self._closed.done():
if exc is None:
self._closed.set_result(None)
else:
self._closed.set_exception(exc)
super().connection_lost(exc)
- self._stream_reader = None
+ self._stream_reader_wr = None
self._stream_writer = None
+ self._transport = None
def data_received(self, data):
- self._stream_reader.feed_data(data)
+ reader = self._stream_reader
+ if reader is not None:
+ reader.feed_data(data)
def eof_received(self):
- self._stream_reader.feed_eof()
+ reader = self._stream_reader
+ if reader is not None:
+ reader.feed_eof()
if self._over_ssl:
# Prevent a warning in SSLProtocol.eof_received:
# "returning true from eof_received()
@@ -693,6 +313,9 @@ class StreamReaderProtocol(FlowControlMixin, protocols.Protocol):
return False
return True
+ def _get_close_waiter(self, stream):
+ return self._closed
+
def __del__(self):
# Prevent reports about unhandled exceptions.
# Better than self._closed._log_traceback = False hack
@@ -718,6 +341,8 @@ class StreamWriter:
assert reader is None or isinstance(reader, StreamReader)
self._reader = reader
self._loop = loop
+ self._complete_fut = self._loop.create_future()
+ self._complete_fut.set_result(None)
def __repr__(self):
info = [self.__class__.__name__, f'transport={self._transport!r}']
@@ -748,7 +373,7 @@ class StreamWriter:
return self._transport.is_closing()
async def wait_closed(self):
- await self._protocol._closed
+ await self._protocol._get_close_waiter(self)
def get_extra_info(self, name, default=None):
return self._transport.get_extra_info(name, default)
@@ -766,561 +391,24 @@ class StreamWriter:
if exc is not None:
raise exc
if self._transport.is_closing():
+ # Wait for protocol.connection_lost() call
+ # Raise connection closing error if any,
+ # ConnectionResetError otherwise
# Yield to the event loop so connection_lost() may be
# called. Without this, _drain_helper() would return
# immediately, and code that calls
# write(...); await drain()
# in a loop would never call connection_lost(), so it
# would not see an error when the socket is closed.
- await tasks.sleep(0, loop=self._loop)
+ await sleep(0)
await self._protocol._drain_helper()
class StreamReader:
- def __init__(self, limit=_DEFAULT_LIMIT, loop=None):
- # The line length limit is a security feature;
- # it also doubles as half the buffer limit.
-
- if limit <= 0:
- raise ValueError('Limit cannot be <= 0')
-
- self._limit = limit
- if loop is None:
- self._loop = events.get_event_loop()
- else:
- self._loop = loop
- self._buffer = bytearray()
- self._eof = False # Whether we're done.
- self._waiter = None # A future used by _wait_for_data()
- self._exception = None
- self._transport = None
- self._paused = False
-
- def __repr__(self):
- info = ['StreamReader']
- if self._buffer:
- info.append(f'{len(self._buffer)} bytes')
- if self._eof:
- info.append('eof')
- if self._limit != _DEFAULT_LIMIT:
- info.append(f'limit={self._limit}')
- if self._waiter:
- info.append(f'waiter={self._waiter!r}')
- if self._exception:
- info.append(f'exception={self._exception!r}')
- if self._transport:
- info.append(f'transport={self._transport!r}')
- if self._paused:
- info.append('paused')
- return '<{}>'.format(' '.join(info))
-
- def exception(self):
- return self._exception
-
- def set_exception(self, exc):
- self._exception = exc
-
- waiter = self._waiter
- if waiter is not None:
- self._waiter = None
- if not waiter.cancelled():
- waiter.set_exception(exc)
-
- def _wakeup_waiter(self):
- """Wakeup read*() functions waiting for data or EOF."""
- waiter = self._waiter
- if waiter is not None:
- self._waiter = None
- if not waiter.cancelled():
- waiter.set_result(None)
-
- def set_transport(self, transport):
- assert self._transport is None, 'Transport already set'
- self._transport = transport
-
- def _maybe_resume_transport(self):
- if self._paused and len(self._buffer) <= self._limit:
- self._paused = False
- self._transport.resume_reading()
-
- def feed_eof(self):
- self._eof = True
- self._wakeup_waiter()
-
- def at_eof(self):
- """Return True if the buffer is empty and 'feed_eof' was called."""
- return self._eof and not self._buffer
-
- def feed_data(self, data):
- assert not self._eof, 'feed_data after feed_eof'
-
- if not data:
- return
-
- self._buffer.extend(data)
- self._wakeup_waiter()
-
- if (self._transport is not None and
- not self._paused and
- len(self._buffer) > 2 * self._limit):
- try:
- self._transport.pause_reading()
- except NotImplementedError:
- # The transport can't be paused.
- # We'll just have to buffer all data.
- # Forget the transport so we don't keep trying.
- self._transport = None
- else:
- self._paused = True
-
- async def _wait_for_data(self, func_name):
- """Wait until feed_data() or feed_eof() is called.
-
- If stream was paused, automatically resume it.
- """
- # StreamReader uses a future to link the protocol feed_data() method
- # to a read coroutine. Running two read coroutines at the same time
- # would have an unexpected behaviour. It would not possible to know
- # which coroutine would get the next data.
- if self._waiter is not None:
- raise RuntimeError(
- f'{func_name}() called while another coroutine is '
- f'already waiting for incoming data')
-
- assert not self._eof, '_wait_for_data after EOF'
-
- # Waiting for data while paused will make deadlock, so prevent it.
- # This is essential for readexactly(n) for case when n > self._limit.
- if self._paused:
- self._paused = False
- self._transport.resume_reading()
-
- self._waiter = self._loop.create_future()
- try:
- await self._waiter
- finally:
- self._waiter = None
-
- async def readline(self):
- """Read chunk of data from the stream until newline (b'\n') is found.
-
- On success, return chunk that ends with newline. If only partial
- line can be read due to EOF, return incomplete line without
- terminating newline. When EOF was reached while no bytes read, empty
- bytes object is returned.
-
- If limit is reached, ValueError will be raised. In that case, if
- newline was found, complete line including newline will be removed
- from internal buffer. Else, internal buffer will be cleared. Limit is
- compared against part of the line without newline.
-
- If stream was paused, this function will automatically resume it if
- needed.
- """
- sep = b'\n'
- seplen = len(sep)
- try:
- line = await self.readuntil(sep)
- except exceptions.IncompleteReadError as e:
- return e.partial
- except exceptions.LimitOverrunError as e:
- if self._buffer.startswith(sep, e.consumed):
- del self._buffer[:e.consumed + seplen]
- else:
- self._buffer.clear()
- self._maybe_resume_transport()
- raise ValueError(e.args[0])
- return line
-
- async def readuntil(self, separator=b'\n'):
- """Read data from the stream until ``separator`` is found.
-
- On success, the data and separator will be removed from the
- internal buffer (consumed). Returned data will include the
- separator at the end.
-
- Configured stream limit is used to check result. Limit sets the
- maximal length of data that can be returned, not counting the
- separator.
-
- If an EOF occurs and the complete separator is still not found,
- an IncompleteReadError exception will be raised, and the internal
- buffer will be reset. The IncompleteReadError.partial attribute
- may contain the separator partially.
-
- If the data cannot be read because of over limit, a
- LimitOverrunError exception will be raised, and the data
- will be left in the internal buffer, so it can be read again.
- """
- seplen = len(separator)
- if seplen == 0:
- raise ValueError('Separator should be at least one-byte string')
-
- if self._exception is not None:
- raise self._exception
-
- # Consume whole buffer except last bytes, which length is
- # one less than seplen. Let's check corner cases with
- # separator='SEPARATOR':
- # * we have received almost complete separator (without last
- # byte). i.e buffer='some textSEPARATO'. In this case we
- # can safely consume len(separator) - 1 bytes.
- # * last byte of buffer is first byte of separator, i.e.
- # buffer='abcdefghijklmnopqrS'. We may safely consume
- # everything except that last byte, but this require to
- # analyze bytes of buffer that match partial separator.
- # This is slow and/or require FSM. For this case our
- # implementation is not optimal, since require rescanning
- # of data that is known to not belong to separator. In
- # real world, separator will not be so long to notice
- # performance problems. Even when reading MIME-encoded
- # messages :)
-
- # `offset` is the number of bytes from the beginning of the buffer
- # where there is no occurrence of `separator`.
- offset = 0
-
- # Loop until we find `separator` in the buffer, exceed the buffer size,
- # or an EOF has happened.
- while True:
- buflen = len(self._buffer)
-
- # Check if we now have enough data in the buffer for `separator` to
- # fit.
- if buflen - offset >= seplen:
- isep = self._buffer.find(separator, offset)
-
- if isep != -1:
- # `separator` is in the buffer. `isep` will be used later
- # to retrieve the data.
- break
-
- # see upper comment for explanation.
- offset = buflen + 1 - seplen
- if offset > self._limit:
- raise exceptions.LimitOverrunError(
- 'Separator is not found, and chunk exceed the limit',
- offset)
-
- # Complete message (with full separator) may be present in buffer
- # even when EOF flag is set. This may happen when the last chunk
- # adds data which makes separator be found. That's why we check for
- # EOF *ater* inspecting the buffer.
- if self._eof:
- chunk = bytes(self._buffer)
- self._buffer.clear()
- raise exceptions.IncompleteReadError(chunk, None)
-
- # _wait_for_data() will resume reading if stream was paused.
- await self._wait_for_data('readuntil')
-
- if isep > self._limit:
- raise exceptions.LimitOverrunError(
- 'Separator is found, but chunk is longer than limit', isep)
-
- chunk = self._buffer[:isep + seplen]
- del self._buffer[:isep + seplen]
- self._maybe_resume_transport()
- return bytes(chunk)
-
- async def read(self, n=-1):
- """Read up to `n` bytes from the stream.
-
- If n is not provided, or set to -1, read until EOF and return all read
- bytes. If the EOF was received and the internal buffer is empty, return
- an empty bytes object.
-
- If n is zero, return empty bytes object immediately.
-
- If n is positive, this function try to read `n` bytes, and may return
- less or equal bytes than requested, but at least one byte. If EOF was
- received before any byte is read, this function returns empty byte
- object.
-
- Returned value is not limited with limit, configured at stream
- creation.
-
- If stream was paused, this function will automatically resume it if
- needed.
- """
-
- if self._exception is not None:
- raise self._exception
-
- if n == 0:
- return b''
-
- if n < 0:
- # This used to just loop creating a new waiter hoping to
- # collect everything in self._buffer, but that would
- # deadlock if the subprocess sends more than self.limit
- # bytes. So just call self.read(self._limit) until EOF.
- blocks = []
- while True:
- block = await self.read(self._limit)
- if not block:
- break
- blocks.append(block)
- return b''.join(blocks)
-
- if not self._buffer and not self._eof:
- await self._wait_for_data('read')
-
- # This will work right even if buffer is less than n bytes
- data = bytes(self._buffer[:n])
- del self._buffer[:n]
-
- self._maybe_resume_transport()
- return data
-
- async def readexactly(self, n):
- """Read exactly `n` bytes.
-
- Raise an IncompleteReadError if EOF is reached before `n` bytes can be
- read. The IncompleteReadError.partial attribute of the exception will
- contain the partial read bytes.
-
- if n is zero, return empty bytes object.
-
- Returned value is not limited with limit, configured at stream
- creation.
-
- If stream was paused, this function will automatically resume it if
- needed.
- """
- if n < 0:
- raise ValueError('readexactly size can not be less than zero')
-
- if self._exception is not None:
- raise self._exception
-
- if n == 0:
- return b''
-
- while len(self._buffer) < n:
- if self._eof:
- incomplete = bytes(self._buffer)
- self._buffer.clear()
- raise exceptions.IncompleteReadError(incomplete, n)
-
- await self._wait_for_data('readexactly')
-
- if len(self._buffer) == n:
- data = bytes(self._buffer)
- self._buffer.clear()
- else:
- data = bytes(self._buffer[:n])
- del self._buffer[:n]
- self._maybe_resume_transport()
- return data
-
- def __aiter__(self):
- return self
-
- async def __anext__(self):
- val = await self.readline()
- if val == b'':
- raise StopAsyncIteration
- return val
-
-
-# end legacy stream APIs
-
-
-class _BaseStreamProtocol(FlowControlMixin, protocols.Protocol):
- """Helper class to adapt between Protocol and StreamReader.
-
- (This is a helper class instead of making StreamReader itself a
- Protocol subclass, because the StreamReader has other potential
- uses, and to prevent the user of the StreamReader to accidentally
- call inappropriate methods of the protocol.)
- """
-
- _stream = None # initialized in derived classes
-
- def __init__(self, loop=None,
- *, _asyncio_internal=False):
- super().__init__(loop=loop, _asyncio_internal=_asyncio_internal)
- self._transport = None
- self._over_ssl = False
- self._closed = self._loop.create_future()
-
- def connection_made(self, transport):
- self._transport = transport
- self._over_ssl = transport.get_extra_info('sslcontext') is not None
-
- def connection_lost(self, exc):
- stream = self._stream
- if stream is not None:
- if exc is None:
- stream._feed_eof()
- else:
- stream._set_exception(exc)
- if not self._closed.done():
- if exc is None:
- self._closed.set_result(None)
- else:
- self._closed.set_exception(exc)
- super().connection_lost(exc)
- self._transport = None
-
- def data_received(self, data):
- stream = self._stream
- if stream is not None:
- stream._feed_data(data)
-
- def eof_received(self):
- stream = self._stream
- if stream is not None:
- stream._feed_eof()
- if self._over_ssl:
- # Prevent a warning in SSLProtocol.eof_received:
- # "returning true from eof_received()
- # has no effect when using ssl"
- return False
- return True
-
- def _get_close_waiter(self, stream):
- return self._closed
-
- def __del__(self):
- # Prevent reports about unhandled exceptions.
- # Better than self._closed._log_traceback = False hack
- closed = self._get_close_waiter(self._stream)
- if closed.done() and not closed.cancelled():
- closed.exception()
-
-
-class _StreamProtocol(_BaseStreamProtocol):
- _source_traceback = None
-
- def __init__(self, stream, loop=None,
- *, _asyncio_internal=False):
- super().__init__(loop=loop, _asyncio_internal=_asyncio_internal)
- self._source_traceback = stream._source_traceback
- self._stream_wr = weakref.ref(stream, self._on_gc)
- self._reject_connection = False
-
- def _on_gc(self, wr):
- transport = self._transport
- if transport is not None:
- # connection_made was called
- context = {
- 'message': ('An open stream object is being garbage '
- 'collected; call "stream.close()" explicitly.')
- }
- if self._source_traceback:
- context['source_traceback'] = self._source_traceback
- self._loop.call_exception_handler(context)
- transport.abort()
- else:
- self._reject_connection = True
- self._stream_wr = None
-
- @property
- def _stream(self):
- if self._stream_wr is None:
- return None
- return self._stream_wr()
-
- def connection_made(self, transport):
- if self._reject_connection:
- context = {
- 'message': ('An open stream was garbage collected prior to '
- 'establishing network connection; '
- 'call "stream.close()" explicitly.')
- }
- if self._source_traceback:
- context['source_traceback'] = self._source_traceback
- self._loop.call_exception_handler(context)
- transport.abort()
- return
- super().connection_made(transport)
- stream = self._stream
- if stream is None:
- return
- stream._set_transport(transport)
- stream._protocol = self
-
- def connection_lost(self, exc):
- super().connection_lost(exc)
- self._stream_wr = None
-
-
-class _ServerStreamProtocol(_BaseStreamProtocol):
- def __init__(self, server, limit, client_connected_cb, loop=None,
- *, _asyncio_internal=False):
- super().__init__(loop=loop, _asyncio_internal=_asyncio_internal)
- assert self._closed
- self._client_connected_cb = client_connected_cb
- self._limit = limit
- self._server = server
- self._task = None
-
- def connection_made(self, transport):
- super().connection_made(transport)
- stream = Stream(mode=StreamMode.READWRITE,
- transport=transport,
- protocol=self,
- limit=self._limit,
- loop=self._loop,
- is_server_side=True,
- _asyncio_internal=True)
- self._stream = stream
- # If self._client_connected_cb(self._stream) fails
- # the exception is logged by transport
- self._task = self._loop.create_task(
- self._client_connected_cb(self._stream))
- self._server._attach(stream, self._task)
-
- def connection_lost(self, exc):
- super().connection_lost(exc)
- self._server._detach(self._stream, self._task)
- self._stream = None
-
-
-class _OptionalAwait:
- # The class doesn't create a coroutine
- # if not awaited
- # It prevents "coroutine is never awaited" message
-
- __slots___ = ('_method',)
-
- def __init__(self, method):
- self._method = method
-
- def __await__(self):
- return self._method().__await__()
-
-
-class Stream:
- """Wraps a Transport.
-
- This exposes write(), writelines(), [can_]write_eof(),
- get_extra_info() and close(). It adds drain() which returns an
- optional Future on which you can wait for flow control. It also
- adds a transport property which references the Transport
- directly.
- """
-
_source_traceback = None
- def __init__(self, mode, *,
- transport=None,
- protocol=None,
- loop=None,
- limit=_DEFAULT_LIMIT,
- is_server_side=False,
- _asyncio_internal=False):
- if not _asyncio_internal:
- raise RuntimeError(f"{self.__class__} should be instantiated "
- "by asyncio internals only")
- self._mode = mode
- self._transport = transport
- self._protocol = protocol
- self._is_server_side = is_server_side
-
+ def __init__(self, limit=_DEFAULT_LIMIT, loop=None):
# The line length limit is a security feature;
# it also doubles as half the buffer limit.
@@ -1336,17 +424,14 @@ class Stream:
self._eof = False # Whether we're done.
self._waiter = None # A future used by _wait_for_data()
self._exception = None
+ self._transport = None
self._paused = False
- self._complete_fut = self._loop.create_future()
- self._complete_fut.set_result(None)
-
if self._loop.get_debug():
self._source_traceback = format_helpers.extract_stack(
sys._getframe(1))
def __repr__(self):
- info = [self.__class__.__name__]
- info.append(f'mode={self._mode}')
+ info = ['StreamReader']
if self._buffer:
info.append(f'{len(self._buffer)} bytes')
if self._eof:
@@ -1363,127 +448,10 @@ class Stream:
info.append('paused')
return '<{}>'.format(' '.join(info))
- @property
- def mode(self):
- return self._mode
-
- def is_server_side(self):
- return self._is_server_side
-
- @property
- def transport(self):
- warnings.warn("Stream.transport attribute is deprecated "
- "since Python 3.8 and is scheduled for removal in 3.10; "
- "it is an internal API",
- DeprecationWarning,
- stacklevel=2)
- return self._transport
-
- def write(self, data):
- _ensure_can_write(self._mode)
- self._transport.write(data)
- return self._fast_drain()
-
- def writelines(self, data):
- _ensure_can_write(self._mode)
- self._transport.writelines(data)
- return self._fast_drain()
-
- def _fast_drain(self):
- # The helper tries to use fast-path to return already existing
- # complete future object if underlying transport is not paused
- # and actual waiting for writing resume is not needed
- exc = self.exception()
- if exc is not None:
- fut = self._loop.create_future()
- fut.set_exception(exc)
- return fut
- if not self._transport.is_closing():
- if self._protocol._connection_lost:
- fut = self._loop.create_future()
- fut.set_exception(ConnectionResetError('Connection lost'))
- return fut
- if not self._protocol._paused:
- # fast path, the stream is not paused
- # no need to wait for resume signal
- return self._complete_fut
- return _OptionalAwait(self.drain)
-
- def write_eof(self):
- _ensure_can_write(self._mode)
- return self._transport.write_eof()
-
- def can_write_eof(self):
- if not self._mode.is_write():
- return False
- return self._transport.can_write_eof()
-
- def close(self):
- self._transport.close()
- return _OptionalAwait(self.wait_closed)
-
- def is_closing(self):
- return self._transport.is_closing()
-
- async def abort(self):
- self._transport.abort()
- await self.wait_closed()
-
- async def wait_closed(self):
- await self._protocol._get_close_waiter(self)
-
- def get_extra_info(self, name, default=None):
- return self._transport.get_extra_info(name, default)
-
- async def drain(self):
- """Flush the write buffer.
-
- The intended use is to write
-
- w.write(data)
- await w.drain()
- """
- _ensure_can_write(self._mode)
- exc = self.exception()
- if exc is not None:
- raise exc
- if self._transport.is_closing():
- # Wait for protocol.connection_lost() call
- # Raise connection closing error if any,
- # ConnectionResetError otherwise
- await tasks.sleep(0)
- await self._protocol._drain_helper()
-
- async def sendfile(self, file, offset=0, count=None, *, fallback=True):
- await self.drain() # check for stream mode and exceptions
- return await self._loop.sendfile(self._transport, file,
- offset, count, fallback=fallback)
-
- async def start_tls(self, sslcontext, *,
- server_hostname=None,
- ssl_handshake_timeout=None):
- await self.drain() # check for stream mode and exceptions
- transport = await self._loop.start_tls(
- self._transport, self._protocol, sslcontext,
- server_side=self._is_server_side,
- server_hostname=server_hostname,
- ssl_handshake_timeout=ssl_handshake_timeout)
- self._transport = transport
- self._protocol._transport = transport
- self._protocol._over_ssl = True
-
def exception(self):
return self._exception
def set_exception(self, exc):
- warnings.warn("Stream.set_exception() is deprecated "
- "since Python 3.8 and is scheduled for removal in 3.10; "
- "it is an internal API",
- DeprecationWarning,
- stacklevel=2)
- self._set_exception(exc)
-
- def _set_exception(self, exc):
self._exception = exc
waiter = self._waiter
@@ -1501,16 +469,6 @@ class Stream:
waiter.set_result(None)
def set_transport(self, transport):
- warnings.warn("Stream.set_transport() is deprecated "
- "since Python 3.8 and is scheduled for removal in 3.10; "
- "it is an internal API",
- DeprecationWarning,
- stacklevel=2)
- self._set_transport(transport)
-
- def _set_transport(self, transport):
- if transport is self._transport:
- return
assert self._transport is None, 'Transport already set'
self._transport = transport
@@ -1520,14 +478,6 @@ class Stream:
self._transport.resume_reading()
def feed_eof(self):
- warnings.warn("Stream.feed_eof() is deprecated "
- "since Python 3.8 and is scheduled for removal in 3.10; "
- "it is an internal API",
- DeprecationWarning,
- stacklevel=2)
- self._feed_eof()
-
- def _feed_eof(self):
self._eof = True
self._wakeup_waiter()
@@ -1536,15 +486,6 @@ class Stream:
return self._eof and not self._buffer
def feed_data(self, data):
- warnings.warn("Stream.feed_data() is deprecated "
- "since Python 3.8 and is scheduled for removal in 3.10; "
- "it is an internal API",
- DeprecationWarning,
- stacklevel=2)
- self._feed_data(data)
-
- def _feed_data(self, data):
- _ensure_can_read(self._mode)
assert not self._eof, 'feed_data after feed_eof'
if not data:
@@ -1610,7 +551,6 @@ class Stream:
If stream was paused, this function will automatically resume it if
needed.
"""
- _ensure_can_read(self._mode)
sep = b'\n'
seplen = len(sep)
try:
@@ -1646,7 +586,6 @@ class Stream:
LimitOverrunError exception will be raised, and the data
will be left in the internal buffer, so it can be read again.
"""
- _ensure_can_read(self._mode)
seplen = len(separator)
if seplen == 0:
raise ValueError('Separator should be at least one-byte string')
@@ -1738,7 +677,6 @@ class Stream:
If stream was paused, this function will automatically resume it if
needed.
"""
- _ensure_can_read(self._mode)
if self._exception is not None:
raise self._exception
@@ -1784,7 +722,6 @@ class Stream:
If stream was paused, this function will automatically resume it if
needed.
"""
- _ensure_can_read(self._mode)
if n < 0:
raise ValueError('readexactly size can not be less than zero')
@@ -1812,7 +749,6 @@ class Stream:
return data
def __aiter__(self):
- _ensure_can_read(self._mode)
return self
async def __anext__(self):
@@ -1820,9 +756,3 @@ class Stream:
if val == b'':
raise StopAsyncIteration
return val
-
- async def __aenter__(self):
- return self
-
- async def __aexit__(self, exc_type, exc_val, exc_tb):
- await self.close()
diff --git a/Lib/asyncio/subprocess.py b/Lib/asyncio/subprocess.py
index ce504b8b0c..c9506b1583 100644
--- a/Lib/asyncio/subprocess.py
+++ b/Lib/asyncio/subprocess.py
@@ -19,16 +19,14 @@ class SubprocessStreamProtocol(streams.FlowControlMixin,
protocols.SubprocessProtocol):
"""Like StreamReaderProtocol, but for a subprocess."""
- def __init__(self, limit, loop, *, _asyncio_internal=False):
- super().__init__(loop=loop, _asyncio_internal=_asyncio_internal)
+ def __init__(self, limit, loop):
+ super().__init__(loop=loop)
self._limit = limit
self.stdin = self.stdout = self.stderr = None
self._transport = None
self._process_exited = False
self._pipe_fds = []
self._stdin_closed = self._loop.create_future()
- self._stdout_closed = self._loop.create_future()
- self._stderr_closed = self._loop.create_future()
def __repr__(self):
info = [self.__class__.__name__]
@@ -42,35 +40,27 @@ class SubprocessStreamProtocol(streams.FlowControlMixin,
def connection_made(self, transport):
self._transport = transport
+
stdout_transport = transport.get_pipe_transport(1)
if stdout_transport is not None:
- self.stdout = streams.Stream(mode=streams.StreamMode.READ,
- transport=stdout_transport,
- protocol=self,
- limit=self._limit,
- loop=self._loop,
- _asyncio_internal=True)
- self.stdout._set_transport(stdout_transport)
+ self.stdout = streams.StreamReader(limit=self._limit,
+ loop=self._loop)
+ self.stdout.set_transport(stdout_transport)
self._pipe_fds.append(1)
stderr_transport = transport.get_pipe_transport(2)
if stderr_transport is not None:
- self.stderr = streams.Stream(mode=streams.StreamMode.READ,
- transport=stderr_transport,
- protocol=self,
- limit=self._limit,
- loop=self._loop,
- _asyncio_internal=True)
- self.stderr._set_transport(stderr_transport)
+ self.stderr = streams.StreamReader(limit=self._limit,
+ loop=self._loop)
+ self.stderr.set_transport(stderr_transport)
self._pipe_fds.append(2)
stdin_transport = transport.get_pipe_transport(0)
if stdin_transport is not None:
- self.stdin = streams.Stream(mode=streams.StreamMode.WRITE,
- transport=stdin_transport,
- protocol=self,
- loop=self._loop,
- _asyncio_internal=True)
+ self.stdin = streams.StreamWriter(stdin_transport,
+ protocol=self,
+ reader=None,
+ loop=self._loop)
def pipe_data_received(self, fd, data):
if fd == 1:
@@ -80,7 +70,7 @@ class SubprocessStreamProtocol(streams.FlowControlMixin,
else:
reader = None
if reader is not None:
- reader._feed_data(data)
+ reader.feed_data(data)
def pipe_connection_lost(self, fd, exc):
if fd == 0:
@@ -101,9 +91,9 @@ class SubprocessStreamProtocol(streams.FlowControlMixin,
reader = None
if reader is not None:
if exc is None:
- reader._feed_eof()
+ reader.feed_eof()
else:
- reader._set_exception(exc)
+ reader.set_exception(exc)
if fd in self._pipe_fds:
self._pipe_fds.remove(fd)
@@ -121,20 +111,10 @@ class SubprocessStreamProtocol(streams.FlowControlMixin,
def _get_close_waiter(self, stream):
if stream is self.stdin:
return self._stdin_closed
- elif stream is self.stdout:
- return self._stdout_closed
- elif stream is self.stderr:
- return self._stderr_closed
class Process:
- def __init__(self, transport, protocol, loop, *, _asyncio_internal=False):
- if not _asyncio_internal:
- warnings.warn(f"{self.__class__} should be instaniated "
- "by asyncio internals only, "
- "please avoid its creation from user code",
- DeprecationWarning)
-
+ def __init__(self, transport, protocol, loop):
self._transport = transport
self._protocol = protocol
self._loop = loop
@@ -232,13 +212,12 @@ async def create_subprocess_shell(cmd, stdin=None, stdout=None, stderr=None,
)
protocol_factory = lambda: SubprocessStreamProtocol(limit=limit,
- loop=loop,
- _asyncio_internal=True)
+ loop=loop)
transport, protocol = await loop.subprocess_shell(
protocol_factory,
cmd, stdin=stdin, stdout=stdout,
stderr=stderr, **kwds)
- return Process(transport, protocol, loop, _asyncio_internal=True)
+ return Process(transport, protocol, loop)
async def create_subprocess_exec(program, *args, stdin=None, stdout=None,
@@ -253,11 +232,10 @@ async def create_subprocess_exec(program, *args, stdin=None, stdout=None,
stacklevel=2
)
protocol_factory = lambda: SubprocessStreamProtocol(limit=limit,
- loop=loop,
- _asyncio_internal=True)
+ loop=loop)
transport, protocol = await loop.subprocess_exec(
protocol_factory,
program, *args,
stdin=stdin, stdout=stdout,
stderr=stderr, **kwds)
- return Process(transport, protocol, loop, _asyncio_internal=True)
+ return Process(transport, protocol, loop)
diff --git a/Lib/test/test_asyncio/test_buffered_proto.py b/Lib/test/test_asyncio/test_buffered_proto.py
index b1531fb934..f24e363ebf 100644
--- a/Lib/test/test_asyncio/test_buffered_proto.py
+++ b/Lib/test/test_asyncio/test_buffered_proto.py
@@ -58,10 +58,9 @@ class BaseTestBufferedProtocol(func_tests.FunctionalTestCaseMixin):
writer.close()
await writer.wait_closed()
- with self.assertWarns(DeprecationWarning):
- srv = self.loop.run_until_complete(
- asyncio.start_server(
- on_server_client, '127.0.0.1', 0))
+ srv = self.loop.run_until_complete(
+ asyncio.start_server(
+ on_server_client, '127.0.0.1', 0))
addr = srv.sockets[0].getsockname()
self.loop.run_until_complete(
diff --git a/Lib/test/test_asyncio/test_pep492.py b/Lib/test/test_asyncio/test_pep492.py
index 58a6094442..a1f27dd572 100644
--- a/Lib/test/test_asyncio/test_pep492.py
+++ b/Lib/test/test_asyncio/test_pep492.py
@@ -95,11 +95,9 @@ class StreamReaderTests(BaseTest):
def test_readline(self):
DATA = b'line1\nline2\nline3'
- stream = asyncio.Stream(mode=asyncio.StreamMode.READ,
- loop=self.loop,
- _asyncio_internal=True)
- stream._feed_data(DATA)
- stream._feed_eof()
+ stream = asyncio.StreamReader(loop=self.loop)
+ stream.feed_data(DATA)
+ stream.feed_eof()
async def reader():
data = []
diff --git a/Lib/test/test_asyncio/test_streams.py b/Lib/test/test_asyncio/test_streams.py
index 6325ee3983..b9413ab35f 100644
--- a/Lib/test/test_asyncio/test_streams.py
+++ b/Lib/test/test_asyncio/test_streams.py
@@ -1,8 +1,6 @@
"""Tests for streams.py."""
-import contextlib
import gc
-import io
import os
import queue
import pickle
@@ -18,7 +16,6 @@ except ImportError:
ssl = None
import asyncio
-from asyncio.streams import _StreamProtocol, _ensure_can_read, _ensure_can_write
from test.test_asyncio import utils as test_utils
@@ -26,24 +23,6 @@ def tearDownModule():
asyncio.set_event_loop_policy(None)
-class StreamModeTests(unittest.TestCase):
- def test__ensure_can_read_ok(self):
- self.assertIsNone(_ensure_can_read(asyncio.StreamMode.READ))
- self.assertIsNone(_ensure_can_read(asyncio.StreamMode.READWRITE))
-
- def test__ensure_can_read_fail(self):
- with self.assertRaisesRegex(RuntimeError, "The stream is write-only"):
- _ensure_can_read(asyncio.StreamMode.WRITE)
-
- def test__ensure_can_write_ok(self):
- self.assertIsNone(_ensure_can_write(asyncio.StreamMode.WRITE))
- self.assertIsNone(_ensure_can_write(asyncio.StreamMode.READWRITE))
-
- def test__ensure_can_write_fail(self):
- with self.assertRaisesRegex(RuntimeError, "The stream is read-only"):
- _ensure_can_write(asyncio.StreamMode.READ)
-
-
class StreamTests(test_utils.TestCase):
DATA = b'line1\nline2\nline3\n'
@@ -63,8 +42,7 @@ class StreamTests(test_utils.TestCase):
@mock.patch('asyncio.streams.events')
def test_ctor_global_loop(self, m_events):
- stream = asyncio.Stream(mode=asyncio.StreamMode.READ,
- _asyncio_internal=True)
+ stream = asyncio.StreamReader()
self.assertIs(stream._loop, m_events.get_event_loop.return_value)
def _basetest_open_connection(self, open_connection_fut):
@@ -100,8 +78,7 @@ class StreamTests(test_utils.TestCase):
self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx))
try:
with self.assertWarns(DeprecationWarning):
- reader, writer = self.loop.run_until_complete(
- open_connection_fut)
+ reader, writer = self.loop.run_until_complete(open_connection_fut)
finally:
asyncio.set_event_loop(None)
writer.write(b'GET / HTTP/1.0\r\n\r\n')
@@ -161,27 +138,21 @@ class StreamTests(test_utils.TestCase):
self._basetest_open_connection_error(conn_fut)
def test_feed_empty_data(self):
- stream = asyncio.Stream(mode=asyncio.StreamMode.READ,
- loop=self.loop,
- _asyncio_internal=True)
+ stream = asyncio.StreamReader(loop=self.loop)
- stream._feed_data(b'')
+ stream.feed_data(b'')
self.assertEqual(b'', stream._buffer)
def test_feed_nonempty_data(self):
- stream = asyncio.Stream(mode=asyncio.StreamMode.READ,
- loop=self.loop,
- _asyncio_internal=True)
+ stream = asyncio.StreamReader(loop=self.loop)
- stream._feed_data(self.DATA)
+ stream.feed_data(self.DATA)
self.assertEqual(self.DATA, stream._buffer)
def test_read_zero(self):
# Read zero bytes.
- stream = asyncio.Stream(mode=asyncio.StreamMode.READ,
- loop=self.loop,
- _asyncio_internal=True)
- stream._feed_data(self.DATA)
+ stream = asyncio.StreamReader(loop=self.loop)
+ stream.feed_data(self.DATA)
data = self.loop.run_until_complete(stream.read(0))
self.assertEqual(b'', data)
@@ -189,13 +160,11 @@ class StreamTests(test_utils.TestCase):
def test_read(self):
# Read bytes.
- stream = asyncio.Stream(mode=asyncio.StreamMode.READ,
- loop=self.loop,
- _asyncio_internal=True)
+ stream = asyncio.StreamReader(loop=self.loop)
read_task = self.loop.create_task(stream.read(30))
def cb():
- stream._feed_data(self.DATA)
+ stream.feed_data(self.DATA)
self.loop.call_soon(cb)
data = self.loop.run_until_complete(read_task)
@@ -204,11 +173,9 @@ class StreamTests(test_utils.TestCase):
def test_read_line_breaks(self):
# Read bytes without line breaks.
- stream = asyncio.Stream(mode=asyncio.StreamMode.READ,
- loop=self.loop,
- _asyncio_internal=True)
- stream._feed_data(b'line1')
- stream._feed_data(b'line2')
+ stream = asyncio.StreamReader(loop=self.loop)
+ stream.feed_data(b'line1')
+ stream.feed_data(b'line2')
data = self.loop.run_until_complete(stream.read(5))
@@ -217,13 +184,11 @@ class StreamTests(test_utils.TestCase):
def test_read_eof(self):
# Read bytes, stop at eof.
- stream = asyncio.Stream(mode=asyncio.StreamMode.READ,
- loop=self.loop,
- _asyncio_internal=True)
+ stream = asyncio.StreamReader(loop=self.loop)
read_task = self.loop.create_task(stream.read(1024))
def cb():
- stream._feed_eof()
+ stream.feed_eof()
self.loop.call_soon(cb)
data = self.loop.run_until_complete(read_task)
@@ -232,15 +197,13 @@ class StreamTests(test_utils.TestCase):
def test_read_until_eof(self):
# Read all bytes until eof.
- stream = asyncio.Stream(mode=asyncio.StreamMode.READ,
- loop=self.loop,
- _asyncio_internal=True)
+ stream = asyncio.StreamReader(loop=self.loop)
read_task = self.loop.create_task(stream.read(-1))
def cb():
- stream._feed_data(b'chunk1\n')
- stream._feed_data(b'chunk2')
- stream._feed_eof()
+ stream.feed_data(b'chunk1\n')
+ stream.feed_data(b'chunk2')
+ stream.feed_eof()
self.loop.call_soon(cb)
data = self.loop.run_until_complete(read_task)
@@ -249,34 +212,26 @@ class StreamTests(test_utils.TestCase):
self.assertEqual(b'', stream._buffer)
def test_read_exception(self):
- stream = asyncio.Stream(mode=asyncio.StreamMode.READ,
- loop=self.loop,
- _asyncio_internal=True)
- stream._feed_data(b'line\n')
+ stream = asyncio.StreamReader(loop=self.loop)
+ stream.feed_data(b'line\n')
data = self.loop.run_until_complete(stream.read(2))
self.assertEqual(b'li', data)
- stream._set_exception(ValueError())
+ stream.set_exception(ValueError())
self.assertRaises(
ValueError, self.loop.run_until_complete, stream.read(2))
def test_invalid_limit(self):
with self.assertRaisesRegex(ValueError, 'imit'):
- asyncio.Stream(mode=asyncio.StreamMode.READ,
- limit=0, loop=self.loop,
- _asyncio_internal=True)
+ asyncio.StreamReader(limit=0, loop=self.loop)
with self.assertRaisesRegex(ValueError, 'imit'):
- asyncio.Stream(mode=asyncio.StreamMode.READ,
- limit=-1, loop=self.loop,
- _asyncio_internal=True)
+ asyncio.StreamReader(limit=-1, loop=self.loop)
def test_read_limit(self):
- stream = asyncio.Stream(mode=asyncio.StreamMode.READ,
- limit=3, loop=self.loop,
- _asyncio_internal=True)
- stream._feed_data(b'chunk')
+ stream = asyncio.StreamReader(limit=3, loop=self.loop)
+ stream.feed_data(b'chunk')
data = self.loop.run_until_complete(stream.read(5))
self.assertEqual(b'chunk', data)
self.assertEqual(b'', stream._buffer)
@@ -284,16 +239,14 @@ class StreamTests(test_utils.TestCase):
def test_readline(self):
# Read one line. 'readline' will need to wait for the data
# to come from 'cb'
- stream = asyncio.Stream(mode=asyncio.StreamMode.READ,
- loop=self.loop,
- _asyncio_internal=True)
- stream._feed_data(b'chunk1 ')
+ stream = asyncio.StreamReader(loop=self.loop)
+ stream.feed_data(b'chunk1 ')
read_task = self.loop.create_task(stream.readline())
def cb():
- stream._feed_data(b'chunk2 ')
- stream._feed_data(b'chunk3 ')
- stream._feed_data(b'\n chunk4')
+ stream.feed_data(b'chunk2 ')
+ stream.feed_data(b'chunk3 ')
+ stream.feed_data(b'\n chunk4')
self.loop.call_soon(cb)
line = self.loop.run_until_complete(read_task)
@@ -301,26 +254,22 @@ class StreamTests(test_utils.TestCase):
self.assertEqual(b' chunk4', stream._buffer)
def test_readline_limit_with_existing_data(self):
- # Read one line. The data is in Stream's buffer
+ # Read one line. The data is in StreamReader's buffer
# before the event loop is run.
- stream = asyncio.Stream(mode=asyncio.StreamMode.READ,
- limit=3, loop=self.loop,
- _asyncio_internal=True)
- stream._feed_data(b'li')
- stream._feed_data(b'ne1\nline2\n')
+ stream = asyncio.StreamReader(limit=3, loop=self.loop)
+ stream.feed_data(b'li')
+ stream.feed_data(b'ne1\nline2\n')
self.assertRaises(
ValueError, self.loop.run_until_complete, stream.readline())
# The buffer should contain the remaining data after exception
self.assertEqual(b'line2\n', stream._buffer)
- stream = asyncio.Stream(mode=asyncio.StreamMode.READ,
- limit=3, loop=self.loop,
- _asyncio_internal=True)
- stream._feed_data(b'li')
- stream._feed_data(b'ne1')
- stream._feed_data(b'li')
+ stream = asyncio.StreamReader(limit=3, loop=self.loop)
+ stream.feed_data(b'li')
+ stream.feed_data(b'ne1')
+ stream.feed_data(b'li')
self.assertRaises(
ValueError, self.loop.run_until_complete, stream.readline())
@@ -332,34 +281,30 @@ class StreamTests(test_utils.TestCase):
self.assertEqual(b'', stream._buffer)
def test_at_eof(self):
- stream = asyncio.Stream(mode=asyncio.StreamMode.READ,
- loop=self.loop,
- _asyncio_internal=True)
+ stream = asyncio.StreamReader(loop=self.loop)
self.assertFalse(stream.at_eof())
- stream._feed_data(b'some data\n')
+ stream.feed_data(b'some data\n')
self.assertFalse(stream.at_eof())
self.loop.run_until_complete(stream.readline())
self.assertFalse(stream.at_eof())
- stream._feed_data(b'some data\n')
- stream._feed_eof()
+ stream.feed_data(b'some data\n')
+ stream.feed_eof()
self.loop.run_until_complete(stream.readline())
self.assertTrue(stream.at_eof())
def test_readline_limit(self):
- # Read one line. Streams are fed with data after
+ # Read one line. StreamReaders are fed with data after
# their 'readline' methods are called.
- stream = asyncio.Stream(mode=asyncio.StreamMode.READ,
- limit=7, loop=self.loop,
- _asyncio_internal=True)
+ stream = asyncio.StreamReader(limit=7, loop=self.loop)
def cb():
- stream._feed_data(b'chunk1')
- stream._feed_data(b'chunk2')
- stream._feed_data(b'chunk3\n')
- stream._feed_eof()
+ stream.feed_data(b'chunk1')
+ stream.feed_data(b'chunk2')
+ stream.feed_data(b'chunk3\n')
+ stream.feed_eof()
self.loop.call_soon(cb)
self.assertRaises(
@@ -368,14 +313,12 @@ class StreamTests(test_utils.TestCase):
# a ValueError it should be empty.
self.assertEqual(b'', stream._buffer)
- stream = asyncio.Stream(mode=asyncio.StreamMode.READ,
- limit=7, loop=self.loop,
- _asyncio_internal=True)
+ stream = asyncio.StreamReader(limit=7, loop=self.loop)
def cb():
- stream._feed_data(b'chunk1')
- stream._feed_data(b'chunk2\n')
- stream._feed_data(b'chunk3\n')
- stream._feed_eof()
+ stream.feed_data(b'chunk1')
+ stream.feed_data(b'chunk2\n')
+ stream.feed_data(b'chunk3\n')
+ stream.feed_eof()
self.loop.call_soon(cb)
self.assertRaises(
@@ -383,20 +326,18 @@ class StreamTests(test_utils.TestCase):
self.assertEqual(b'chunk3\n', stream._buffer)
# check strictness of the limit
- stream = asyncio.Stream(mode=asyncio.StreamMode.READ,
- limit=7, loop=self.loop,
- _asyncio_internal=True)
- stream._feed_data(b'1234567\n')
+ stream = asyncio.StreamReader(limit=7, loop=self.loop)
+ stream.feed_data(b'1234567\n')
line = self.loop.run_until_complete(stream.readline())
self.assertEqual(b'1234567\n', line)
self.assertEqual(b'', stream._buffer)
- stream._feed_data(b'12345678\n')
+ stream.feed_data(b'12345678\n')
with self.assertRaises(ValueError) as cm:
self.loop.run_until_complete(stream.readline())
self.assertEqual(b'', stream._buffer)
- stream._feed_data(b'12345678')
+ stream.feed_data(b'12345678')
with self.assertRaises(ValueError) as cm:
self.loop.run_until_complete(stream.readline())
self.assertEqual(b'', stream._buffer)
@@ -404,11 +345,9 @@ class StreamTests(test_utils.TestCase):
def test_readline_nolimit_nowait(self):
# All needed data for the first 'readline' call will be
# in the buffer.
- stream = asyncio.Stream(mode=asyncio.StreamMode.READ,
- loop=self.loop,
- _asyncio_internal=True)
- stream._feed_data(self.DATA[:6])
- stream._feed_data(self.DATA[6:])
+ stream = asyncio.StreamReader(loop=self.loop)
+ stream.feed_data(self.DATA[:6])
+ stream.feed_data(self.DATA[6:])
line = self.loop.run_until_complete(stream.readline())
@@ -416,29 +355,23 @@ class StreamTests(test_utils.TestCase):
self.assertEqual(b'line2\nline3\n', stream._buffer)
def test_readline_eof(self):
- stream = asyncio.Stream(mode=asyncio.StreamMode.READ,
- loop=self.loop,
- _asyncio_internal=True)
- stream._feed_data(b'some data')
- stream._feed_eof()
+ stream = asyncio.StreamReader(loop=self.loop)
+ stream.feed_data(b'some data')
+ stream.feed_eof()
line = self.loop.run_until_complete(stream.readline())
self.assertEqual(b'some data', line)
def test_readline_empty_eof(self):
- stream = asyncio.Stream(mode=asyncio.StreamMode.READ,
- loop=self.loop,
- _asyncio_internal=True)
- stream._feed_eof()
+ stream = asyncio.StreamReader(loop=self.loop)
+ stream.feed_eof()
line = self.loop.run_until_complete(stream.readline())
self.assertEqual(b'', line)
def test_readline_read_byte_count(self):
- stream = asyncio.Stream(mode=asyncio.StreamMode.READ,
- loop=self.loop,
- _asyncio_internal=True)
- stream._feed_data(self.DATA)
+ stream = asyncio.StreamReader(loop=self.loop)
+ stream.feed_data(self.DATA)
self.loop.run_until_complete(stream.readline())
@@ -448,89 +381,79 @@ class StreamTests(test_utils.TestCase):
self.assertEqual(b'ine3\n', stream._buffer)
def test_readline_exception(self):
- stream = asyncio.Stream(mode=asyncio.StreamMode.READ,
- loop=self.loop,
- _asyncio_internal=True)
- stream._feed_data(b'line\n')
+ stream = asyncio.StreamReader(loop=self.loop)
+ stream.feed_data(b'line\n')
data = self.loop.run_until_complete(stream.readline())
self.assertEqual(b'line\n', data)
- stream._set_exception(ValueError())
+ stream.set_exception(ValueError())
self.assertRaises(
ValueError, self.loop.run_until_complete, stream.readline())
self.assertEqual(b'', stream._buffer)
def test_readuntil_separator(self):
- stream = asyncio.Stream(mode=asyncio.StreamMode.READ,
- loop=self.loop,
- _asyncio_internal=True)
+ stream = asyncio.StreamReader(loop=self.loop)
with self.assertRaisesRegex(ValueError, 'Separator should be'):
self.loop.run_until_complete(stream.readuntil(separator=b''))
def test_readuntil_multi_chunks(self):
- stream = asyncio.Stream(mode=asyncio.StreamMode.READ,
- loop=self.loop,
- _asyncio_internal=True)
+ stream = asyncio.StreamReader(loop=self.loop)
- stream._feed_data(b'lineAAA')
+ stream.feed_data(b'lineAAA')
data = self.loop.run_until_complete(stream.readuntil(separator=b'AAA'))
self.assertEqual(b'lineAAA', data)
self.assertEqual(b'', stream._buffer)
- stream._feed_data(b'lineAAA')
+ stream.feed_data(b'lineAAA')
data = self.loop.run_until_complete(stream.readuntil(b'AAA'))
self.assertEqual(b'lineAAA', data)
self.assertEqual(b'', stream._buffer)
- stream._feed_data(b'lineAAAxxx')
+ stream.feed_data(b'lineAAAxxx')
data = self.loop.run_until_complete(stream.readuntil(b'AAA'))
self.assertEqual(b'lineAAA', data)
self.assertEqual(b'xxx', stream._buffer)
def test_readuntil_multi_chunks_1(self):
- stream = asyncio.Stream(mode=asyncio.StreamMode.READ,
- loop=self.loop,
- _asyncio_internal=True)
+ stream = asyncio.StreamReader(loop=self.loop)
- stream._feed_data(b'QWEaa')
- stream._feed_data(b'XYaa')
- stream._feed_data(b'a')
+ stream.feed_data(b'QWEaa')
+ stream.feed_data(b'XYaa')
+ stream.feed_data(b'a')
data = self.loop.run_until_complete(stream.readuntil(b'aaa'))
self.assertEqual(b'QWEaaXYaaa', data)
self.assertEqual(b'', stream._buffer)
- stream._feed_data(b'QWEaa')
- stream._feed_data(b'XYa')
- stream._feed_data(b'aa')
+ stream.feed_data(b'QWEaa')
+ stream.feed_data(b'XYa')
+ stream.feed_data(b'aa')
data = self.loop.run_until_complete(stream.readuntil(b'aaa'))
self.assertEqual(b'QWEaaXYaaa', data)
self.assertEqual(b'', stream._buffer)
- stream._feed_data(b'aaa')
+ stream.feed_data(b'aaa')
data = self.loop.run_until_complete(stream.readuntil(b'aaa'))
self.assertEqual(b'aaa', data)
self.assertEqual(b'', stream._buffer)
- stream._feed_data(b'Xaaa')
+ stream.feed_data(b'Xaaa')
data = self.loop.run_until_complete(stream.readuntil(b'aaa'))
self.assertEqual(b'Xaaa', data)
self.assertEqual(b'', stream._buffer)
- stream._feed_data(b'XXX')
- stream._feed_data(b'a')
- stream._feed_data(b'a')
- stream._feed_data(b'a')
+ stream.feed_data(b'XXX')
+ stream.feed_data(b'a')
+ stream.feed_data(b'a')
+ stream.feed_data(b'a')
data = self.loop.run_until_complete(stream.readuntil(b'aaa'))
self.assertEqual(b'XXXaaa', data)
self.assertEqual(b'', stream._buffer)
def test_readuntil_eof(self):
- stream = asyncio.Stream(mode=asyncio.StreamMode.READ,
- loop=self.loop,
- _asyncio_internal=True)
- stream._feed_data(b'some dataAA')
- stream._feed_eof()
+ stream = asyncio.StreamReader(loop=self.loop)
+ stream.feed_data(b'some dataAA')
+ stream.feed_eof()
with self.assertRaises(asyncio.IncompleteReadError) as cm:
self.loop.run_until_complete(stream.readuntil(b'AAA'))
@@ -539,18 +462,15 @@ class StreamTests(test_utils.TestCase):
self.assertEqual(b'', stream._buffer)
def test_readuntil_limit_found_sep(self):
- stream = asyncio.Stream(mode=asyncio.StreamMode.READ,
- loop=self.loop, limit=3,
- _asyncio_internal=True)
- stream._feed_data(b'some dataAA')
-
+ stream = asyncio.StreamReader(loop=self.loop, limit=3)
+ stream.feed_data(b'some dataAA')
with self.assertRaisesRegex(asyncio.LimitOverrunError,
'not found') as cm:
self.loop.run_until_complete(stream.readuntil(b'AAA'))
self.assertEqual(b'some dataAA', stream._buffer)
- stream._feed_data(b'A')
+ stream.feed_data(b'A')
with self.assertRaisesRegex(asyncio.LimitOverrunError,
'is found') as cm:
self.loop.run_until_complete(stream.readuntil(b'AAA'))
@@ -559,10 +479,8 @@ class StreamTests(test_utils.TestCase):
def test_readexactly_zero_or_less(self):
# Read exact number of bytes (zero or less).
- stream = asyncio.Stream(mode=asyncio.StreamMode.READ,
- loop=self.loop,
- _asyncio_internal=True)
- stream._feed_data(self.DATA)
+ stream = asyncio.StreamReader(loop=self.loop)
+ stream.feed_data(self.DATA)
data = self.loop.run_until_complete(stream.readexactly(0))
self.assertEqual(b'', data)
@@ -574,17 +492,15 @@ class StreamTests(test_utils.TestCase):
def test_readexactly(self):
# Read exact number of bytes.
- stream = asyncio.Stream(mode=asyncio.StreamMode.READ,
- loop=self.loop,
- _asyncio_internal=True)
+ stream = asyncio.StreamReader(loop=self.loop)
n = 2 * len(self.DATA)
read_task = self.loop.create_task(stream.readexactly(n))
def cb():
- stream._feed_data(self.DATA)
- stream._feed_data(self.DATA)
- stream._feed_data(self.DATA)
+ stream.feed_data(self.DATA)
+ stream.feed_data(self.DATA)
+ stream.feed_data(self.DATA)
self.loop.call_soon(cb)
data = self.loop.run_until_complete(read_task)
@@ -592,25 +508,21 @@ class StreamTests(test_utils.TestCase):
self.assertEqual(self.DATA, stream._buffer)
def test_readexactly_limit(self):
- stream = asyncio.Stream(mode=asyncio.StreamMode.READ,
- limit=3, loop=self.loop,
- _asyncio_internal=True)
- stream._feed_data(b'chunk')
+ stream = asyncio.StreamReader(limit=3, loop=self.loop)
+ stream.feed_data(b'chunk')
data = self.loop.run_until_complete(stream.readexactly(5))
self.assertEqual(b'chunk', data)
self.assertEqual(b'', stream._buffer)
def test_readexactly_eof(self):
# Read exact number of bytes (eof).
- stream = asyncio.Stream(mode=asyncio.StreamMode.READ,
- loop=self.loop,
- _asyncio_internal=True)
+ stream = asyncio.StreamReader(loop=self.loop)
n = 2 * len(self.DATA)
read_task = self.loop.create_task(stream.readexactly(n))
def cb():
- stream._feed_data(self.DATA)
- stream._feed_eof()
+ stream.feed_data(self.DATA)
+ stream.feed_eof()
self.loop.call_soon(cb)
with self.assertRaises(asyncio.IncompleteReadError) as cm:
@@ -622,35 +534,29 @@ class StreamTests(test_utils.TestCase):
self.assertEqual(b'', stream._buffer)
def test_readexactly_exception(self):
- stream = asyncio.Stream(mode=asyncio.StreamMode.READ,
- loop=self.loop,
- _asyncio_internal=True)
- stream._feed_data(b'line\n')
+ stream = asyncio.StreamReader(loop=self.loop)
+ stream.feed_data(b'line\n')
data = self.loop.run_until_complete(stream.readexactly(2))
self.assertEqual(b'li', data)
- stream._set_exception(ValueError())
+ stream.set_exception(ValueError())
self.assertRaises(
ValueError, self.loop.run_until_complete, stream.readexactly(2))
def test_exception(self):
- stream = asyncio.Stream(mode=asyncio.StreamMode.READ,
- loop=self.loop,
- _asyncio_internal=True)
+ stream = asyncio.StreamReader(loop=self.loop)
self.assertIsNone(stream.exception())
exc = ValueError()
- stream._set_exception(exc)
+ stream.set_exception(exc)
self.assertIs(stream.exception(), exc)
def test_exception_waiter(self):
- stream = asyncio.Stream(mode=asyncio.StreamMode.READ,
- loop=self.loop,
- _asyncio_internal=True)
+ stream = asyncio.StreamReader(loop=self.loop)
async def set_err():
- stream._set_exception(ValueError())
+ stream.set_exception(ValueError())
t1 = self.loop.create_task(stream.readline())
t2 = self.loop.create_task(set_err())
@@ -660,16 +566,14 @@ class StreamTests(test_utils.TestCase):
self.assertRaises(ValueError, t1.result)
def test_exception_cancel(self):
- stream = asyncio.Stream(mode=asyncio.StreamMode.READ,
- loop=self.loop,
- _asyncio_internal=True)
+ stream = asyncio.StreamReader(loop=self.loop)
t = self.loop.create_task(stream.readline())
test_utils.run_briefly(self.loop)
t.cancel()
test_utils.run_briefly(self.loop)
# The following line fails if set_exception() isn't careful.
- stream._set_exception(RuntimeError('message'))
+ stream.set_exception(RuntimeError('message'))
test_utils.run_briefly(self.loop)
self.assertIs(stream._waiter, None)
@@ -829,7 +733,7 @@ class StreamTests(test_utils.TestCase):
def test_read_all_from_pipe_reader(self):
# See asyncio issue 168. This test is derived from the example
# subprocess_attach_read_pipe.py, but we configure the
- # Stream's limit so that twice it is less than the size
+ # StreamReader's limit so that twice it is less than the size
# of the data writter. Also we must explicitly attach a child
# watcher to the event loop.
@@ -843,11 +747,8 @@ os.close(fd)
args = [sys.executable, '-c', code, str(wfd)]
pipe = open(rfd, 'rb', 0)
- stream = asyncio.Stream(mode=asyncio.StreamMode.READ,
- loop=self.loop, limit=1,
- _asyncio_internal=True)
- protocol = _StreamProtocol(stream, loop=self.loop,
- _asyncio_internal=True)
+ reader = asyncio.StreamReader(loop=self.loop, limit=1)
+ protocol = asyncio.StreamReaderProtocol(reader, loop=self.loop)
transport, _ = self.loop.run_until_complete(
self.loop.connect_read_pipe(lambda: protocol, pipe))
@@ -865,30 +766,29 @@ os.close(fd)
asyncio.set_child_watcher(None)
os.close(wfd)
- data = self.loop.run_until_complete(stream.read(-1))
+ data = self.loop.run_until_complete(reader.read(-1))
self.assertEqual(data, b'data')
def test_streamreader_constructor(self):
self.addCleanup(asyncio.set_event_loop, None)
asyncio.set_event_loop(self.loop)
- # asyncio issue #184: Ensure that _StreamProtocol constructor
+ # asyncio issue #184: Ensure that StreamReaderProtocol constructor
# retrieves the current loop if the loop parameter is not set
- reader = asyncio.Stream(mode=asyncio.StreamMode.READ,
- _asyncio_internal=True)
+ reader = asyncio.StreamReader()
self.assertIs(reader._loop, self.loop)
def test_streamreaderprotocol_constructor(self):
self.addCleanup(asyncio.set_event_loop, None)
asyncio.set_event_loop(self.loop)
- # asyncio issue #184: Ensure that _StreamProtocol constructor
+ # asyncio issue #184: Ensure that StreamReaderProtocol constructor
# retrieves the current loop if the loop parameter is not set
- stream = mock.Mock()
- protocol = _StreamProtocol(stream, _asyncio_internal=True)
+ reader = mock.Mock()
+ protocol = asyncio.StreamReaderProtocol(reader)
self.assertIs(protocol._loop, self.loop)
- def test_drain_raises_deprecated(self):
+ def test_drain_raises(self):
# See http://bugs.python.org/issue25441
# This test should not use asyncio for the mock server; the
@@ -902,7 +802,7 @@ os.close(fd)
def server():
# Runs in a separate thread.
- with socket.create_server(('127.0.0.1', 0)) as sock:
+ with socket.create_server(('localhost', 0)) as sock:
addr = sock.getsockname()
q.put(addr)
clt, _ = sock.accept()
@@ -933,106 +833,48 @@ os.close(fd)
thread.join()
self.assertEqual([], messages)
- def test_drain_raises(self):
- # See http://bugs.python.org/issue25441
-
- # This test should not use asyncio for the mock server; the
- # whole point of the test is to test for a bug in drain()
- # where it never gives up the event loop but the socket is
- # closed on the server side.
-
- messages = []
- self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx))
- q = queue.Queue()
-
- def server():
- # Runs in a separate thread.
- with socket.create_server(('localhost', 0)) as sock:
- addr = sock.getsockname()
- q.put(addr)
- clt, _ = sock.accept()
- clt.close()
-
- async def client(host, port):
- stream = await asyncio.connect(host, port)
-
- while True:
- stream.write(b"foo\n")
- await stream.drain()
-
- # Start the server thread and wait for it to be listening.
- thread = threading.Thread(target=server)
- thread.setDaemon(True)
- thread.start()
- addr = q.get()
-
- # Should not be stuck in an infinite loop.
- with self.assertRaises((ConnectionResetError, ConnectionAbortedError,
- BrokenPipeError)):
- self.loop.run_until_complete(client(*addr))
-
- # Clean up the thread. (Only on success; on failure, it may
- # be stuck in accept().)
- thread.join()
- self.assertEqual([], messages)
-
def test___repr__(self):
- stream = asyncio.Stream(mode=asyncio.StreamMode.READ,
- loop=self.loop,
- _asyncio_internal=True)
- self.assertEqual("<Stream mode=StreamMode.READ>", repr(stream))
+ stream = asyncio.StreamReader(loop=self.loop)
+ self.assertEqual("<StreamReader>", repr(stream))
def test___repr__nondefault_limit(self):
- stream = asyncio.Stream(mode=asyncio.StreamMode.READ,
- loop=self.loop, limit=123,
- _asyncio_internal=True)
- self.assertEqual("<Stream mode=StreamMode.READ limit=123>", repr(stream))
+ stream = asyncio.StreamReader(loop=self.loop, limit=123)
+ self.assertEqual("<StreamReader limit=123>", repr(stream))
def test___repr__eof(self):
- stream = asyncio.Stream(mode=asyncio.StreamMode.READ,
- loop=self.loop,
- _asyncio_internal=True)
- stream._feed_eof()
- self.assertEqual("<Stream mode=StreamMode.READ eof>", repr(stream))
+ stream = asyncio.StreamReader(loop=self.loop)
+ stream.feed_eof()
+ self.assertEqual("<StreamReader eof>", repr(stream))
def test___repr__data(self):
- stream = asyncio.Stream(mode=asyncio.StreamMode.READ,
- loop=self.loop,
- _asyncio_internal=True)
- stream._feed_data(b'data')
- self.assertEqual("<Stream mode=StreamMode.READ 4 bytes>", repr(stream))
+ stream = asyncio.StreamReader(loop=self.loop)
+ stream.feed_data(b'data')
+ self.assertEqual("<StreamReader 4 bytes>", repr(stream))
def test___repr__exception(self):
- stream = asyncio.Stream(mode=asyncio.StreamMode.READ,
- loop=self.loop,
- _asyncio_internal=True)
+ stream = asyncio.StreamReader(loop=self.loop)
exc = RuntimeError()
- stream._set_exception(exc)
- self.assertEqual("<Stream mode=StreamMode.READ exception=RuntimeError()>",
+ stream.set_exception(exc)
+ self.assertEqual("<StreamReader exception=RuntimeError()>",
repr(stream))
def test___repr__waiter(self):
- stream = asyncio.Stream(mode=asyncio.StreamMode.READ,
- loop=self.loop,
- _asyncio_internal=True)
- stream._waiter = self.loop.create_future()
+ stream = asyncio.StreamReader(loop=self.loop)
+ stream._waiter = asyncio.Future(loop=self.loop)
self.assertRegex(
repr(stream),
- r"<Stream .+ waiter=<Future pending[\S ]*>>")
+ r"<StreamReader waiter=<Future pending[\S ]*>>")
stream._waiter.set_result(None)
self.loop.run_until_complete(stream._waiter)
stream._waiter = None
- self.assertEqual("<Stream mode=StreamMode.READ>", repr(stream))
+ self.assertEqual("<StreamReader>", repr(stream))
def test___repr__transport(self):
- stream = asyncio.Stream(mode=asyncio.StreamMode.READ,
- loop=self.loop,
- _asyncio_internal=True)
+ stream = asyncio.StreamReader(loop=self.loop)
stream._transport = mock.Mock()
stream._transport.__repr__ = mock.Mock()
stream._transport.__repr__.return_value = "<Transport>"
- self.assertEqual("<Stream mode=StreamMode.READ transport=<Transport>>",
- repr(stream))
+ self.assertEqual("<StreamReader transport=<Transport>>", repr(stream))
def test_IncompleteReadError_pickleable(self):
e = asyncio.IncompleteReadError(b'abc', 10)
@@ -1051,7 +893,7 @@ os.close(fd)
self.assertEqual(str(e), str(e2))
self.assertEqual(e.consumed, e2.consumed)
- def test_wait_closed_on_close_deprecated(self):
+ def test_wait_closed_on_close(self):
with test_utils.run_test_server() as httpd:
with self.assertWarns(DeprecationWarning):
rd, wr = self.loop.run_until_complete(
@@ -1069,24 +911,7 @@ os.close(fd)
self.assertTrue(wr.is_closing())
self.loop.run_until_complete(wr.wait_closed())
- def test_wait_closed_on_close(self):
- with test_utils.run_test_server() as httpd:
- stream = self.loop.run_until_complete(
- asyncio.connect(*httpd.address))
-
- stream.write(b'GET / HTTP/1.0\r\n\r\n')
- f = stream.readline()
- data = self.loop.run_until_complete(f)
- self.assertEqual(data, b'HTTP/1.0 200 OK\r\n')
- f = stream.read()
- data = self.loop.run_until_complete(f)
- self.assertTrue(data.endswith(b'\r\n\r\nTest message'))
- self.assertFalse(stream.is_closing())
- stream.close()
- self.assertTrue(stream.is_closing())
- self.loop.run_until_complete(stream.wait_closed())
-
- def test_wait_closed_on_close_with_unread_data_deprecated(self):
+ def test_wait_closed_on_close_with_unread_data(self):
with test_utils.run_test_server() as httpd:
with self.assertWarns(DeprecationWarning):
rd, wr = self.loop.run_until_complete(
@@ -1099,44 +924,33 @@ os.close(fd)
wr.close()
self.loop.run_until_complete(wr.wait_closed())
- def test_wait_closed_on_close_with_unread_data(self):
- with test_utils.run_test_server() as httpd:
- stream = self.loop.run_until_complete(
- asyncio.connect(*httpd.address))
-
- stream.write(b'GET / HTTP/1.0\r\n\r\n')
- f = stream.readline()
- data = self.loop.run_until_complete(f)
- self.assertEqual(data, b'HTTP/1.0 200 OK\r\n')
- stream.close()
- self.loop.run_until_complete(stream.wait_closed())
-
def test_del_stream_before_sock_closing(self):
messages = []
self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx))
- async def test():
-
- with test_utils.run_test_server() as httpd:
- stream = await asyncio.connect(*httpd.address)
- sock = stream.get_extra_info('socket')
- self.assertNotEqual(sock.fileno(), -1)
+ with test_utils.run_test_server() as httpd:
+ with self.assertWarns(DeprecationWarning):
+ rd, wr = self.loop.run_until_complete(
+ asyncio.open_connection(*httpd.address, loop=self.loop))
+ sock = wr.get_extra_info('socket')
+ self.assertNotEqual(sock.fileno(), -1)
- await stream.write(b'GET / HTTP/1.0\r\n\r\n')
- data = await stream.readline()
- self.assertEqual(data, b'HTTP/1.0 200 OK\r\n')
+ wr.write(b'GET / HTTP/1.0\r\n\r\n')
+ f = rd.readline()
+ data = self.loop.run_until_complete(f)
+ self.assertEqual(data, b'HTTP/1.0 200 OK\r\n')
- # drop refs to reader/writer
- del stream
- gc.collect()
- # make a chance to close the socket
- await asyncio.sleep(0)
+ # drop refs to reader/writer
+ del rd
+ del wr
+ gc.collect()
+ # make a chance to close the socket
+ test_utils.run_briefly(self.loop)
- self.assertEqual(1, len(messages), messages)
- self.assertEqual(sock.fileno(), -1)
+ self.assertEqual(1, len(messages))
+ self.assertEqual(sock.fileno(), -1)
- self.loop.run_until_complete(test())
- self.assertEqual(1, len(messages), messages)
+ self.assertEqual(1, len(messages))
self.assertEqual('An open stream object is being garbage '
'collected; call "stream.close()" explicitly.',
messages[0]['message'])
@@ -1146,12 +960,9 @@ os.close(fd)
self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx))
with test_utils.run_test_server() as httpd:
- stream = asyncio.Stream(mode=asyncio.StreamMode.READ,
- loop=self.loop,
- _asyncio_internal=True)
- pr = _StreamProtocol(stream, loop=self.loop,
- _asyncio_internal=True)
- del stream
+ rd = asyncio.StreamReader(loop=self.loop)
+ pr = asyncio.StreamReaderProtocol(rd, loop=self.loop)
+ del rd
gc.collect()
tr, _ = self.loop.run_until_complete(
self.loop.create_connection(
@@ -1168,14 +979,15 @@ os.close(fd)
def test_async_writer_api(self):
async def inner(httpd):
- stream = await asyncio.connect(*httpd.address)
+ rd, wr = await asyncio.open_connection(*httpd.address)
- await stream.write(b'GET / HTTP/1.0\r\n\r\n')
- data = await stream.readline()
+ wr.write(b'GET / HTTP/1.0\r\n\r\n')
+ data = await rd.readline()
self.assertEqual(data, b'HTTP/1.0 200 OK\r\n')
- data = await stream.read()
+ data = await rd.read()
self.assertTrue(data.endswith(b'\r\n\r\nTest message'))
- await stream.close()
+ wr.close()
+ await wr.wait_closed()
messages = []
self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx))
@@ -1187,16 +999,17 @@ os.close(fd)
def test_async_writer_api_exception_after_close(self):
async def inner(httpd):
- stream = await asyncio.connect(*httpd.address)
+ rd, wr = await asyncio.open_connection(*httpd.address)
- await stream.write(b'GET / HTTP/1.0\r\n\r\n')
- data = await stream.readline()
+ wr.write(b'GET / HTTP/1.0\r\n\r\n')
+ data = await rd.readline()
self.assertEqual(data, b'HTTP/1.0 200 OK\r\n')
- data = await stream.read()
+ data = await rd.read()
self.assertTrue(data.endswith(b'\r\n\r\nTest message'))
- stream.close()
+ wr.close()
with self.assertRaises(ConnectionResetError):
- await stream.write(b'data')
+ wr.write(b'data')
+ await wr.drain()
messages = []
self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx))
@@ -1227,587 +1040,6 @@ os.close(fd)
self.assertEqual(messages, [])
- def test_stream_reader_create_warning(self):
- with contextlib.suppress(AttributeError):
- del asyncio.StreamReader
- with self.assertWarns(DeprecationWarning):
- asyncio.StreamReader
-
- def test_stream_writer_create_warning(self):
- with contextlib.suppress(AttributeError):
- del asyncio.StreamWriter
- with self.assertWarns(DeprecationWarning):
- asyncio.StreamWriter
-
- def test_stream_reader_forbidden_ops(self):
- async def inner():
- stream = asyncio.Stream(mode=asyncio.StreamMode.READ,
- _asyncio_internal=True)
- with self.assertRaisesRegex(RuntimeError, "The stream is read-only"):
- await stream.write(b'data')
- with self.assertRaisesRegex(RuntimeError, "The stream is read-only"):
- await stream.writelines([b'data', b'other'])
- with self.assertRaisesRegex(RuntimeError, "The stream is read-only"):
- stream.write_eof()
- with self.assertRaisesRegex(RuntimeError, "The stream is read-only"):
- await stream.drain()
-
- self.loop.run_until_complete(inner())
-
- def test_stream_writer_forbidden_ops(self):
- async def inner():
- stream = asyncio.Stream(mode=asyncio.StreamMode.WRITE,
- _asyncio_internal=True)
- with self.assertRaisesRegex(RuntimeError, "The stream is write-only"):
- stream._feed_data(b'data')
- with self.assertRaisesRegex(RuntimeError, "The stream is write-only"):
- await stream.readline()
- with self.assertRaisesRegex(RuntimeError, "The stream is write-only"):
- await stream.readuntil()
- with self.assertRaisesRegex(RuntimeError, "The stream is write-only"):
- await stream.read()
- with self.assertRaisesRegex(RuntimeError, "The stream is write-only"):
- await stream.readexactly(10)
- with self.assertRaisesRegex(RuntimeError, "The stream is write-only"):
- async for chunk in stream:
- pass
-
- self.loop.run_until_complete(inner())
-
- def _basetest_connect(self, stream):
- messages = []
- self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx))
-
- stream.write(b'GET / HTTP/1.0\r\n\r\n')
- f = stream.readline()
- data = self.loop.run_until_complete(f)
- self.assertEqual(data, b'HTTP/1.0 200 OK\r\n')
- f = stream.read()
- data = self.loop.run_until_complete(f)
- self.assertTrue(data.endswith(b'\r\n\r\nTest message'))
- stream.close()
- self.loop.run_until_complete(stream.wait_closed())
-
- self.assertEqual([], messages)
-
- def test_connect(self):
- with test_utils.run_test_server() as httpd:
- stream = self.loop.run_until_complete(
- asyncio.connect(*httpd.address))
- self.assertFalse(stream.is_server_side())
- self._basetest_connect(stream)
-
- @support.skip_unless_bind_unix_socket
- def test_connect_unix(self):
- with test_utils.run_test_unix_server() as httpd:
- stream = self.loop.run_until_complete(
- asyncio.connect_unix(httpd.address))
- self._basetest_connect(stream)
-
- def test_stream_async_context_manager(self):
- async def test(httpd):
- stream = await asyncio.connect(*httpd.address)
- async with stream:
- await stream.write(b'GET / HTTP/1.0\r\n\r\n')
- data = await stream.readline()
- self.assertEqual(data, b'HTTP/1.0 200 OK\r\n')
- data = await stream.read()
- self.assertTrue(data.endswith(b'\r\n\r\nTest message'))
- self.assertTrue(stream.is_closing())
-
- with test_utils.run_test_server() as httpd:
- self.loop.run_until_complete(test(httpd))
-
- def test_connect_async_context_manager(self):
- async def test(httpd):
- async with asyncio.connect(*httpd.address) as stream:
- await stream.write(b'GET / HTTP/1.0\r\n\r\n')
- data = await stream.readline()
- self.assertEqual(data, b'HTTP/1.0 200 OK\r\n')
- data = await stream.read()
- self.assertTrue(data.endswith(b'\r\n\r\nTest message'))
- self.assertTrue(stream.is_closing())
-
- with test_utils.run_test_server() as httpd:
- self.loop.run_until_complete(test(httpd))
-
- @support.skip_unless_bind_unix_socket
- def test_connect_unix_async_context_manager(self):
- async def test(httpd):
- async with asyncio.connect_unix(httpd.address) as stream:
- await stream.write(b'GET / HTTP/1.0\r\n\r\n')
- data = await stream.readline()
- self.assertEqual(data, b'HTTP/1.0 200 OK\r\n')
- data = await stream.read()
- self.assertTrue(data.endswith(b'\r\n\r\nTest message'))
- self.assertTrue(stream.is_closing())
-
- with test_utils.run_test_unix_server() as httpd:
- self.loop.run_until_complete(test(httpd))
-
- def test_stream_server(self):
-
- async def handle_client(stream):
- self.assertTrue(stream.is_server_side())
- data = await stream.readline()
- await stream.write(data)
- await stream.close()
-
- async def client(srv):
- addr = srv.sockets[0].getsockname()
- stream = await asyncio.connect(*addr)
- # send a line
- await stream.write(b"hello world!\n")
- # read it back
- msgback = await stream.readline()
- await stream.close()
- self.assertEqual(msgback, b"hello world!\n")
- await srv.close()
-
- async def test():
- async with asyncio.StreamServer(handle_client, '127.0.0.1', 0) as server:
- await server.start_serving()
- task = asyncio.create_task(client(server))
- with contextlib.suppress(asyncio.CancelledError):
- await server.serve_forever()
- await task
-
- messages = []
- self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx))
- self.loop.run_until_complete(test())
- self.assertEqual(messages, [])
-
- @support.skip_unless_bind_unix_socket
- def test_unix_stream_server(self):
-
- async def handle_client(stream):
- data = await stream.readline()
- await stream.write(data)
- await stream.close()
-
- async def client(srv):
- addr = srv.sockets[0].getsockname()
- stream = await asyncio.connect_unix(addr)
- # send a line
- await stream.write(b"hello world!\n")
- # read it back
- msgback = await stream.readline()
- await stream.close()
- self.assertEqual(msgback, b"hello world!\n")
- await srv.close()
-
- async def test():
- with test_utils.unix_socket_path() as path:
- async with asyncio.UnixStreamServer(handle_client, path) as server:
- await server.start_serving()
- task = asyncio.create_task(client(server))
- with contextlib.suppress(asyncio.CancelledError):
- await server.serve_forever()
- await task
-
- messages = []
- self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx))
- self.loop.run_until_complete(test())
- self.assertEqual(messages, [])
-
- def test_stream_server_inheritance_forbidden(self):
- with self.assertRaises(TypeError):
- class MyServer(asyncio.StreamServer):
- pass
-
- @support.skip_unless_bind_unix_socket
- def test_unix_stream_server_inheritance_forbidden(self):
- with self.assertRaises(TypeError):
- class MyServer(asyncio.UnixStreamServer):
- pass
-
- def test_stream_server_bind(self):
- async def handle_client(stream):
- await stream.close()
-
- async def test():
- srv = asyncio.StreamServer(handle_client, '127.0.0.1', 0)
- self.assertFalse(srv.is_bound())
- self.assertEqual(0, len(srv.sockets))
- await srv.bind()
- self.assertTrue(srv.is_bound())
- self.assertEqual(1, len(srv.sockets))
- await srv.close()
- self.assertFalse(srv.is_bound())
- self.assertEqual(0, len(srv.sockets))
-
- messages = []
- self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx))
- self.loop.run_until_complete(test())
- self.assertEqual(messages, [])
-
- def test_stream_server_bind_async_with(self):
- async def handle_client(stream):
- await stream.close()
-
- async def test():
- async with asyncio.StreamServer(handle_client, '127.0.0.1', 0) as srv:
- self.assertTrue(srv.is_bound())
- self.assertEqual(1, len(srv.sockets))
-
- messages = []
- self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx))
- self.loop.run_until_complete(test())
- self.assertEqual(messages, [])
-
- def test_stream_server_start_serving(self):
- async def handle_client(stream):
- await stream.close()
-
- async def test():
- async with asyncio.StreamServer(handle_client, '127.0.0.1', 0) as srv:
- self.assertFalse(srv.is_serving())
- await srv.start_serving()
- self.assertTrue(srv.is_serving())
- await srv.close()
- self.assertFalse(srv.is_serving())
-
- messages = []
- self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx))
- self.loop.run_until_complete(test())
- self.assertEqual(messages, [])
-
- def test_stream_server_close(self):
- server_stream_aborted = False
- fut1 = self.loop.create_future()
- fut2 = self.loop.create_future()
-
- async def handle_client(stream):
- data = await stream.readexactly(4)
- self.assertEqual(b'data', data)
- fut1.set_result(None)
- await fut2
- self.assertEqual(b'', await stream.readline())
- nonlocal server_stream_aborted
- server_stream_aborted = True
-
- async def client(srv):
- addr = srv.sockets[0].getsockname()
- stream = await asyncio.connect(*addr)
- await stream.write(b'data')
- await fut2
- self.assertEqual(b'', await stream.readline())
- await stream.close()
-
- async def test():
- async with asyncio.StreamServer(handle_client, '127.0.0.1', 0) as server:
- await server.start_serving()
- task = asyncio.create_task(client(server))
- await fut1
- fut2.set_result(None)
- await server.close()
- await task
-
- messages = []
- self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx))
- self.loop.run_until_complete(asyncio.wait_for(test(), 60.0))
- self.assertEqual(messages, [])
- self.assertTrue(fut1.done())
- self.assertTrue(fut2.done())
- self.assertTrue(server_stream_aborted)
-
- def test_stream_server_abort(self):
- server_stream_aborted = False
- fut1 = self.loop.create_future()
- fut2 = self.loop.create_future()
-
- async def handle_client(stream):
- data = await stream.readexactly(4)
- self.assertEqual(b'data', data)
- fut1.set_result(None)
- await fut2
- self.assertEqual(b'', await stream.readline())
- nonlocal server_stream_aborted
- server_stream_aborted = True
-
- async def client(srv):
- addr = srv.sockets[0].getsockname()
- stream = await asyncio.connect(*addr)
- await stream.write(b'data')
- await fut2
- self.assertEqual(b'', await stream.readline())
- await stream.close()
-
- async def test():
- async with asyncio.StreamServer(handle_client, '127.0.0.1', 0) as server:
- await server.start_serving()
- task = asyncio.create_task(client(server))
- await fut1
- fut2.set_result(None)
- await server.abort()
- await task
-
- messages = []
- self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx))
- self.loop.run_until_complete(asyncio.wait_for(test(), 60.0))
- self.assertEqual(messages, [])
- self.assertTrue(fut1.done())
- self.assertTrue(fut2.done())
- self.assertTrue(server_stream_aborted)
-
- def test_stream_shutdown_hung_task(self):
- fut1 = self.loop.create_future()
- fut2 = self.loop.create_future()
- cancelled = self.loop.create_future()
-
- async def handle_client(stream):
- data = await stream.readexactly(4)
- self.assertEqual(b'data', data)
- fut1.set_result(None)
- await fut2
- try:
- while True:
- await asyncio.sleep(0.01)
- except asyncio.CancelledError:
- cancelled.set_result(None)
- raise
-
- async def client(srv):
- addr = srv.sockets[0].getsockname()
- stream = await asyncio.connect(*addr)
- await stream.write(b'data')
- await fut2
- self.assertEqual(b'', await stream.readline())
- await stream.close()
-
- async def test():
- async with asyncio.StreamServer(handle_client,
- '127.0.0.1',
- 0,
- shutdown_timeout=0.3) as server:
- await server.start_serving()
- task = asyncio.create_task(client(server))
- await fut1
- fut2.set_result(None)
- await server.close()
- await task
- await cancelled
-
- messages = []
- self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx))
- self.loop.run_until_complete(asyncio.wait_for(test(), 60.0))
- self.assertEqual(messages, [])
- self.assertTrue(fut1.done())
- self.assertTrue(fut2.done())
- self.assertTrue(cancelled.done())
-
- def test_stream_shutdown_hung_task_prevents_cancellation(self):
- fut1 = self.loop.create_future()
- fut2 = self.loop.create_future()
- cancelled = self.loop.create_future()
- do_handle_client = True
-
- async def handle_client(stream):
- data = await stream.readexactly(4)
- self.assertEqual(b'data', data)
- fut1.set_result(None)
- await fut2
- while do_handle_client:
- with contextlib.suppress(asyncio.CancelledError):
- await asyncio.sleep(0.01)
- cancelled.set_result(None)
-
- async def client(srv):
- addr = srv.sockets[0].getsockname()
- stream = await asyncio.connect(*addr)
- await stream.write(b'data')
- await fut2
- self.assertEqual(b'', await stream.readline())
- await stream.close()
-
- async def test():
- async with asyncio.StreamServer(handle_client,
- '127.0.0.1',
- 0,
- shutdown_timeout=0.3) as server:
- await server.start_serving()
- task = asyncio.create_task(client(server))
- await fut1
- fut2.set_result(None)
- await server.close()
- nonlocal do_handle_client
- do_handle_client = False
- await task
- await cancelled
-
- messages = []
- self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx))
- self.loop.run_until_complete(asyncio.wait_for(test(), 60.0))
- self.assertEqual(1, len(messages))
- self.assertRegex(messages[0]['message'],
- "<Task pending .+ ignored cancellation request")
- self.assertTrue(fut1.done())
- self.assertTrue(fut2.done())
- self.assertTrue(cancelled.done())
-
- def test_sendfile(self):
- messages = []
- self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx))
-
- with open(support.TESTFN, 'wb') as fp:
- fp.write(b'data\n')
- self.addCleanup(support.unlink, support.TESTFN)
-
- async def serve_callback(stream):
- data = await stream.readline()
- await stream.write(b'ack-' + data)
- data = await stream.readline()
- await stream.write(b'ack-' + data)
- data = await stream.readline()
- await stream.write(b'ack-' + data)
- await stream.close()
-
- async def do_connect(host, port):
- stream = await asyncio.connect(host, port)
- await stream.write(b'begin\n')
- data = await stream.readline()
- self.assertEqual(b'ack-begin\n', data)
- with open(support.TESTFN, 'rb') as fp:
- await stream.sendfile(fp)
- data = await stream.readline()
- self.assertEqual(b'ack-data\n', data)
- await stream.write(b'end\n')
- data = await stream.readline()
- self.assertEqual(data, b'ack-end\n')
- await stream.close()
-
- async def test():
- async with asyncio.StreamServer(serve_callback, '127.0.0.1', 0) as srv:
- await srv.start_serving()
- await do_connect(*srv.sockets[0].getsockname())
-
- self.loop.run_until_complete(test())
-
- self.assertEqual([], messages)
-
-
- @unittest.skipIf(ssl is None, 'No ssl module')
- def test_connect_start_tls(self):
- with test_utils.run_test_server(use_ssl=True) as httpd:
- # connect without SSL but upgrade to TLS just after
- # connection is established
- stream = self.loop.run_until_complete(
- asyncio.connect(*httpd.address))
-
- self.loop.run_until_complete(
- stream.start_tls(
- sslcontext=test_utils.dummy_ssl_context()))
- self._basetest_connect(stream)
-
- def test_repr_unbound(self):
- async def serve(stream):
- pass
-
- async def test():
- srv = asyncio.StreamServer(serve)
- self.assertEqual('<StreamServer>', repr(srv))
- await srv.close()
-
- self.loop.run_until_complete(test())
-
- def test_repr_bound(self):
- async def serve(stream):
- pass
-
- async def test():
- srv = asyncio.StreamServer(serve, '127.0.0.1', 0)
- await srv.bind()
- self.assertRegex(repr(srv), r'<StreamServer sockets=\(.+\)>')
- await srv.close()
-
- self.loop.run_until_complete(test())
-
- def test_repr_serving(self):
- async def serve(stream):
- pass
-
- async def test():
- srv = asyncio.StreamServer(serve, '127.0.0.1', 0)
- await srv.start_serving()
- self.assertRegex(repr(srv), r'<StreamServer serving sockets=\(.+\)>')
- await srv.close()
-
- self.loop.run_until_complete(test())
-
-
- @unittest.skipUnless(sys.platform != 'win32',
- "Don't support pipes for Windows")
- def test_read_pipe(self):
- async def test():
- rpipe, wpipe = os.pipe()
- pipeobj = io.open(rpipe, 'rb', 1024)
-
- async with asyncio.connect_read_pipe(pipeobj) as stream:
- self.assertEqual(stream.mode, asyncio.StreamMode.READ)
-
- os.write(wpipe, b'1')
- data = await stream.readexactly(1)
- self.assertEqual(data, b'1')
-
- os.write(wpipe, b'2345')
- data = await stream.readexactly(4)
- self.assertEqual(data, b'2345')
- os.close(wpipe)
-
- self.loop.run_until_complete(test())
-
- @unittest.skipUnless(sys.platform != 'win32',
- "Don't support pipes for Windows")
- def test_write_pipe(self):
- async def test():
- rpipe, wpipe = os.pipe()
- pipeobj = io.open(wpipe, 'wb', 1024)
-
- async with asyncio.connect_write_pipe(pipeobj) as stream:
- self.assertEqual(stream.mode, asyncio.StreamMode.WRITE)
-
- await stream.write(b'1')
- data = os.read(rpipe, 1024)
- self.assertEqual(data, b'1')
-
- await stream.write(b'2345')
- data = os.read(rpipe, 1024)
- self.assertEqual(data, b'2345')
-
- os.close(rpipe)
-
- self.loop.run_until_complete(test())
-
- def test_stream_ctor_forbidden(self):
- with self.assertRaisesRegex(RuntimeError,
- "should be instantiated "
- "by asyncio internals only"):
- asyncio.Stream(asyncio.StreamMode.READWRITE)
-
- def test_deprecated_methods(self):
- async def f():
- return asyncio.Stream(mode=asyncio.StreamMode.READWRITE,
- _asyncio_internal=True)
-
- stream = self.loop.run_until_complete(f())
-
- tr = mock.Mock()
-
- with self.assertWarns(DeprecationWarning):
- stream.set_transport(tr)
-
- with self.assertWarns(DeprecationWarning):
- stream.transport is tr
-
- with self.assertWarns(DeprecationWarning):
- stream.feed_data(b'data')
-
- with self.assertWarns(DeprecationWarning):
- stream.feed_eof()
-
- with self.assertWarns(DeprecationWarning):
- stream.set_exception(ConnectionResetError("test"))
-
if __name__ == '__main__':
unittest.main()
diff --git a/Lib/test/test_asyncio/test_subprocess.py b/Lib/test/test_asyncio/test_subprocess.py
index 3ad18e5c51..fe8cfa61b1 100644
--- a/Lib/test/test_asyncio/test_subprocess.py
+++ b/Lib/test/test_asyncio/test_subprocess.py
@@ -582,18 +582,6 @@ class SubprocessMixin:
self.loop.run_until_complete(execute())
- def test_subprocess_protocol_create_warning(self):
- with self.assertWarns(DeprecationWarning):
- subprocess.SubprocessStreamProtocol(limit=10, loop=self.loop)
-
- def test_process_create_warning(self):
- proto = subprocess.SubprocessStreamProtocol(limit=10, loop=self.loop,
- _asyncio_internal=True)
- transp = mock.Mock()
-
- with self.assertWarns(DeprecationWarning):
- subprocess.Process(transp, proto, loop=self.loop)
-
def test_create_subprocess_exec_text_mode_fails(self):
async def execute():
with self.assertRaises(ValueError):
diff --git a/Lib/test/test_asyncio/test_windows_events.py b/Lib/test/test_asyncio/test_windows_events.py
index d0ba19391f..9ed10fc20f 100644
--- a/Lib/test/test_asyncio/test_windows_events.py
+++ b/Lib/test/test_asyncio/test_windows_events.py
@@ -15,7 +15,6 @@ import _winapi
import asyncio
from asyncio import windows_events
-from asyncio.streams import _StreamProtocol
from test.test_asyncio import utils as test_utils
@@ -118,16 +117,14 @@ class ProactorTests(test_utils.TestCase):
clients = []
for i in range(5):
- stream = asyncio.Stream(mode=asyncio.StreamMode.READ,
- loop=self.loop, _asyncio_internal=True)
- protocol = _StreamProtocol(stream,
- loop=self.loop,
- _asyncio_internal=True)
+ stream_reader = asyncio.StreamReader(loop=self.loop)
+ protocol = asyncio.StreamReaderProtocol(stream_reader,
+ loop=self.loop)
trans, proto = await self.loop.create_pipe_connection(
lambda: protocol, ADDRESS)
self.assertIsInstance(trans, asyncio.Transport)
self.assertEqual(protocol, proto)
- clients.append((stream, trans))
+ clients.append((stream_reader, trans))
for i, (r, w) in enumerate(clients):
w.write('lower-{}\n'.format(i).encode())
@@ -136,7 +133,6 @@ class ProactorTests(test_utils.TestCase):
response = await r.readline()
self.assertEqual(response, 'LOWER-{}\n'.format(i).encode())
w.close()
- await r.close()
server.close()
diff --git a/Misc/NEWS.d/next/Library/2019-09-30-00-15-27.bpo-38242.uPIyAc.rst b/Misc/NEWS.d/next/Library/2019-09-30-00-15-27.bpo-38242.uPIyAc.rst
new file mode 100644
index 0000000000..be9da891b8
--- /dev/null
+++ b/Misc/NEWS.d/next/Library/2019-09-30-00-15-27.bpo-38242.uPIyAc.rst
@@ -0,0 +1 @@
+Revert the new asyncio Streams API