diff options
Diffstat (limited to 'Lib/asyncio/proactor_events.py')
| -rw-r--r-- | Lib/asyncio/proactor_events.py | 34 |
1 files changed, 21 insertions, 13 deletions
diff --git a/Lib/asyncio/proactor_events.py b/Lib/asyncio/proactor_events.py index b6b3be2dc6..fb67155786 100644 --- a/Lib/asyncio/proactor_events.py +++ b/Lib/asyncio/proactor_events.py @@ -29,6 +29,7 @@ class _ProactorBasePipeTransport(transports.BaseTransport): self._buffer = None # None or bytearray. self._read_fut = None self._write_fut = None + self._pending_write = 0 self._conn_lost = 0 self._closing = False # Set when close() called. self._eof_written = False @@ -68,6 +69,7 @@ class _ProactorBasePipeTransport(transports.BaseTransport): if self._read_fut: self._read_fut.cancel() self._write_fut = self._read_fut = None + self._pending_write = 0 self._buffer = None self._loop.call_soon(self._call_connection_lost, exc) @@ -128,11 +130,10 @@ class _ProactorBasePipeTransport(transports.BaseTransport): self._low_water = low def get_write_buffer_size(self): - # NOTE: This doesn't take into account data already passed to - # send() even if send() hasn't finished yet. - if not self._buffer: - return 0 - return len(self._buffer) + size = self._pending_write + if self._buffer is not None: + size += len(self._buffer) + return size class _ProactorReadPipeTransport(_ProactorBasePipeTransport, @@ -206,7 +207,7 @@ class _ProactorReadPipeTransport(_ProactorBasePipeTransport, class _ProactorBaseWritePipeTransport(_ProactorBasePipeTransport, - transports.WriteTransport): + transports.WriteTransport): """Transport for write pipes.""" def write(self, data): @@ -252,6 +253,7 @@ class _ProactorBaseWritePipeTransport(_ProactorBasePipeTransport, try: assert f is self._write_fut self._write_fut = None + self._pending_write = 0 if f: f.result() if data is None: @@ -262,15 +264,21 @@ class _ProactorBaseWritePipeTransport(_ProactorBasePipeTransport, self._loop.call_soon(self._call_connection_lost, None) if self._eof_written: self._sock.shutdown(socket.SHUT_WR) + # Now that we've reduced the buffer size, tell the + # protocol to resume writing if it was paused. Note that + # we do this last since the callback is called immediately + # and it may add more data to the buffer (even causing the + # protocol to be paused again). + self._maybe_resume_protocol() else: self._write_fut = self._loop._proactor.send(self._sock, data) - self._write_fut.add_done_callback(self._loop_writing) - # Now that we've reduced the buffer size, tell the - # protocol to resume writing if it was paused. Note that - # we do this last since the callback is called immediately - # and it may add more data to the buffer (even causing the - # protocol to be paused again). - self._maybe_resume_protocol() + if not self._write_fut.done(): + assert self._pending_write == 0 + self._pending_write = len(data) + self._write_fut.add_done_callback(self._loop_writing) + self._maybe_pause_protocol() + else: + self._write_fut.add_done_callback(self._loop_writing) except ConnectionResetError as exc: self._force_close(exc) except OSError as exc: |
