diff options
| author | Victor Stinner <victor.stinner@gmail.com> | 2014-02-01 22:49:59 +0100 | 
|---|---|---|
| committer | Victor Stinner <victor.stinner@gmail.com> | 2014-02-01 22:49:59 +0100 | 
| commit | 915bcb01110c7db65f8be9139bf887c749fbde75 (patch) | |
| tree | fa24b947b19c1479ed581dc817c2e696386f3fb0 /Lib/asyncio/proactor_events.py | |
| parent | 153d97b24e7253f344860094eb2c98ed93657720 (diff) | |
| download | cpython-git-915bcb01110c7db65f8be9139bf887c749fbde75.tar.gz | |
Issue #20400: Merge Tulip into Python: add the new asyncio.subprocess module
* Add a new asyncio.subprocess module
* Add new create_subprocess_exec() and create_subprocess_shell() functions
* The new asyncio.subprocess.SubprocessStreamProtocol creates stream readers
  for stdout and stderr and a stream writer for stdin.
* The new asyncio.subprocess.Process class offers an API close to the
  subprocess.Popen class:
  - pid, returncode, stdin, stdout and stderr attributes
  - communicate(), wait(), send_signal(), terminate() and kill() methods
* Remove STDIN (0), STDOUT (1) and STDERR (2) constants from base_subprocess
  and unix_events, to not be confused with the symbols with the same name of
  subprocess and asyncio.subprocess modules
* _ProactorBasePipeTransport.get_write_buffer_size() now counts also the size
  of the pending write
* _ProactorBaseWritePipeTransport._loop_writing() may now pause the protocol if
  the write buffer size is greater than the high water mark (64 KB by default)
Diffstat (limited to 'Lib/asyncio/proactor_events.py')
| -rw-r--r-- | Lib/asyncio/proactor_events.py | 34 | 
1 files changed, 21 insertions, 13 deletions
| diff --git a/Lib/asyncio/proactor_events.py b/Lib/asyncio/proactor_events.py index b6b3be2dc6..fb67155786 100644 --- a/Lib/asyncio/proactor_events.py +++ b/Lib/asyncio/proactor_events.py @@ -29,6 +29,7 @@ class _ProactorBasePipeTransport(transports.BaseTransport):          self._buffer = None  # None or bytearray.          self._read_fut = None          self._write_fut = None +        self._pending_write = 0          self._conn_lost = 0          self._closing = False  # Set when close() called.          self._eof_written = False @@ -68,6 +69,7 @@ class _ProactorBasePipeTransport(transports.BaseTransport):          if self._read_fut:              self._read_fut.cancel()          self._write_fut = self._read_fut = None +        self._pending_write = 0          self._buffer = None          self._loop.call_soon(self._call_connection_lost, exc) @@ -128,11 +130,10 @@ class _ProactorBasePipeTransport(transports.BaseTransport):          self._low_water = low      def get_write_buffer_size(self): -        # NOTE: This doesn't take into account data already passed to -        # send() even if send() hasn't finished yet. -        if not self._buffer: -            return 0 -        return len(self._buffer) +        size = self._pending_write +        if self._buffer is not None: +            size += len(self._buffer) +        return size  class _ProactorReadPipeTransport(_ProactorBasePipeTransport, @@ -206,7 +207,7 @@ class _ProactorReadPipeTransport(_ProactorBasePipeTransport,  class _ProactorBaseWritePipeTransport(_ProactorBasePipeTransport, -                                  transports.WriteTransport): +                                      transports.WriteTransport):      """Transport for write pipes."""      def write(self, data): @@ -252,6 +253,7 @@ class _ProactorBaseWritePipeTransport(_ProactorBasePipeTransport,          try:              assert f is self._write_fut              self._write_fut = None +            self._pending_write = 0              if f:                  f.result()              if data is None: @@ -262,15 +264,21 @@ class _ProactorBaseWritePipeTransport(_ProactorBasePipeTransport,                      self._loop.call_soon(self._call_connection_lost, None)                  if self._eof_written:                      self._sock.shutdown(socket.SHUT_WR) +                # Now that we've reduced the buffer size, tell the +                # protocol to resume writing if it was paused.  Note that +                # we do this last since the callback is called immediately +                # and it may add more data to the buffer (even causing the +                # protocol to be paused again). +                self._maybe_resume_protocol()              else:                  self._write_fut = self._loop._proactor.send(self._sock, data) -                self._write_fut.add_done_callback(self._loop_writing) -            # Now that we've reduced the buffer size, tell the -            # protocol to resume writing if it was paused.  Note that -            # we do this last since the callback is called immediately -            # and it may add more data to the buffer (even causing the -            # protocol to be paused again). -            self._maybe_resume_protocol() +                if not self._write_fut.done(): +                    assert self._pending_write == 0 +                    self._pending_write = len(data) +                    self._write_fut.add_done_callback(self._loop_writing) +                    self._maybe_pause_protocol() +                else: +                    self._write_fut.add_done_callback(self._loop_writing)          except ConnectionResetError as exc:              self._force_close(exc)          except OSError as exc: | 
