diff options
author | Victor Stinner <victor.stinner@gmail.com> | 2014-01-27 00:14:21 +0100 |
---|---|---|
committer | Victor Stinner <victor.stinner@gmail.com> | 2014-01-27 00:14:21 +0100 |
commit | a503d1536efd91f6e31260e56972ef2190fe8a04 (patch) | |
tree | b73771e2e402000f75c9729d49d8d74a81784b3d | |
parent | 80b908deb40ebcb4e54ff5e9dd8902b7a148f3ca (diff) | |
parent | cf89b14fc3e56223f6c06cd1448d12e1051fac28 (diff) | |
download | trollius-a503d1536efd91f6e31260e56972ef2190fe8a04.tar.gz |
Merge with default
-rw-r--r-- | asyncio/__init__.py | 2 | ||||
-rw-r--r-- | asyncio/base_subprocess.py | 74 | ||||
-rw-r--r-- | asyncio/protocols.py | 7 | ||||
-rw-r--r-- | asyncio/subprocess_stream.py | 188 | ||||
-rw-r--r-- | asyncio/transports.py | 52 | ||||
-rw-r--r-- | asyncio/unix_events.py | 13 | ||||
-rw-r--r-- | examples/subprocess_stream.py | 33 |
7 files changed, 311 insertions, 58 deletions
diff --git a/asyncio/__init__.py b/asyncio/__init__.py index eb22c38..e24766b 100644 --- a/asyncio/__init__.py +++ b/asyncio/__init__.py @@ -24,6 +24,7 @@ from .locks import * from .protocols import * from .queues import * from .streams import * +from .subprocess_stream import * from .tasks import * from .transports import * @@ -39,5 +40,6 @@ __all__ = (events.__all__ + protocols.__all__ + queues.__all__ + streams.__all__ + + subprocess_stream.__all__ + tasks.__all__ + transports.__all__) diff --git a/asyncio/base_subprocess.py b/asyncio/base_subprocess.py index c5efda7..6c65678 100644 --- a/asyncio/base_subprocess.py +++ b/asyncio/base_subprocess.py @@ -1,3 +1,5 @@ +__all__ = ['SubprocessTransport'] + import collections import subprocess @@ -11,7 +13,65 @@ STDOUT = 1 STDERR = 2 -class BaseSubprocessTransport(transports.SubprocessTransport): +class SubprocessTransport(transports.BaseTransport): + + def create_read_pipe_protocol(self, transport, fd): + return ReadSubprocessPipeProto(transport, fd) + + def create_write_pipe_protocol(self, transport, fd): + return WriteSubprocessPipeProto(transport, fd) + + def get_pid(self): + """Get subprocess id.""" + raise NotImplementedError + + def get_returncode(self): + """Get subprocess returncode. + + See also + http://docs.python.org/3/library/subprocess#subprocess.Popen.returncode + """ + raise NotImplementedError + + def get_pipe_transport(self, fd): + """Get transport for pipe with number fd.""" + raise NotImplementedError + + def send_signal(self, signal): + """Send signal to subprocess. + + See also: + docs.python.org/3/library/subprocess#subprocess.Popen.send_signal + """ + raise NotImplementedError + + def terminate(self): + """Stop the subprocess. + + Alias for close() method. + + On Posix OSs the method sends SIGTERM to the subprocess. + On Windows the Win32 API function TerminateProcess() + is called to stop the subprocess. + + See also: + http://docs.python.org/3/library/subprocess#subprocess.Popen.terminate + """ + raise NotImplementedError + + def kill(self): + """Kill the subprocess. + + On Posix OSs the function sends SIGKILL to the subprocess. + On Windows kill() is an alias for terminate(). + + See also: + http://docs.python.org/3/library/subprocess#subprocess.Popen.kill + """ + raise NotImplementedError + + +class BaseSubprocessTransport(SubprocessTransport): def __init__(self, loop, protocol, args, shell, stdin, stdout, stderr, bufsize, @@ -75,16 +135,16 @@ class BaseSubprocessTransport(transports.SubprocessTransport): proc = self._proc loop = self._loop if proc.stdin is not None: - transp, proto = yield from loop.connect_write_pipe( - lambda: WriteSubprocessPipeProto(self, STDIN), + yield from loop.connect_write_pipe( + lambda: self._protocol.create_write_pipe_protocol(self, STDIN), proc.stdin) if proc.stdout is not None: - transp, proto = yield from loop.connect_read_pipe( - lambda: ReadSubprocessPipeProto(self, STDOUT), + yield from loop.connect_read_pipe( + lambda: self._protocol.create_read_pipe_protocol(self, STDOUT), proc.stdout) if proc.stderr is not None: - transp, proto = yield from loop.connect_read_pipe( - lambda: ReadSubprocessPipeProto(self, STDERR), + yield from loop.connect_read_pipe( + lambda: self._protocol.create_read_pipe_protocol(self, STDERR), proc.stderr) if not self._pipes: self._try_connected() diff --git a/asyncio/protocols.py b/asyncio/protocols.py index 3c4f3f4..99f0c82 100644 --- a/asyncio/protocols.py +++ b/asyncio/protocols.py @@ -111,6 +111,12 @@ class DatagramProtocol(BaseProtocol): class SubprocessProtocol(BaseProtocol): """Interface for protocol for subprocess calls.""" + read_pipe_protocol = None + write_pipe_protocol = None + + def pipe_connection_made(self, fd, pipe): + pass + def pipe_data_received(self, fd, data): """Called when the subprocess writes data into stdout/stderr pipe. @@ -127,3 +133,4 @@ class SubprocessProtocol(BaseProtocol): def process_exited(self): """Called when subprocess has exited.""" + diff --git a/asyncio/subprocess_stream.py b/asyncio/subprocess_stream.py new file mode 100644 index 0000000..ff22f8f --- /dev/null +++ b/asyncio/subprocess_stream.py @@ -0,0 +1,188 @@ +__all__ = ['SubprocessStreamProtocol'] + +from . import base_subprocess +from . import events +from . import protocols +from . import streams +from . import tasks + +class WriteSubprocessPipeStreamProto(base_subprocess.WriteSubprocessPipeProto): + def __init__(self, process_transport, fd): + base_subprocess.WriteSubprocessPipeProto.__init__(self, process_transport, fd) + self._drain_waiter = None + self._paused = False + self.writer = WritePipeStream(None, self, None) + + def connection_made(self, transport): + super().connection_made(transport) + self.writer._transport = transport + self.writer._loop = transport._loop + + def connection_lost(self, exc): + # FIXME: call super().connection_lost(exc) + # Also wake up the writing side. + if self._paused: + waiter = self._drain_waiter + if waiter is not None: + self._drain_waiter = None + if not waiter.done(): + if exc is None: + waiter.set_result(None) + else: + waiter.set_exception(exc) + + def pause_writing(self): + assert not self._paused + self._paused = True + + def resume_writing(self): + assert self._paused + self._paused = False + waiter = self._drain_waiter + if waiter is not None: + self._drain_waiter = None + if not waiter.done(): + waiter.set_result(None) + + +class WritePipeStream: + """Wraps a Transport. + + This exposes write(), writelines(), [can_]write_eof(), + get_extra_info() and close(). It adds drain() which returns an + optional Future on which you can wait for flow control. It also + adds a transport property which references the Transport + directly. + """ + + def __init__(self, transport, protocol, loop): + self._transport = transport + self._protocol = protocol + self._loop = loop + + @property + def transport(self): + return self._transport + + def write(self, data): + self._transport.write(data) + + def writelines(self, data): + self._transport.writelines(data) + + def write_eof(self): + return self._transport.write_eof() + + def can_write_eof(self): + return self._transport.can_write_eof() + + def close(self): + return self._transport.close() + + def get_extra_info(self, name, default=None): + return self._transport.get_extra_info(name, default) + + def drain(self): + """This method has an unusual return value. + + The intended use is to write + + w.write(data) + yield from w.drain() + + When there's nothing to wait for, drain() returns (), and the + yield-from continues immediately. When the transport buffer + is full (the protocol is paused), drain() creates and returns + a Future and the yield-from will block until that Future is + completed, which will happen when the buffer is (partially) + drained and the protocol is resumed. + """ + if self._transport._conn_lost: # Uses private variable. + raise ConnectionResetError('Connection lost') + if not self._protocol._paused: + return () + waiter = self._protocol._drain_waiter + assert waiter is None or waiter.cancelled() + waiter = futures.Future(loop=self._loop) + self._protocol._drain_waiter = waiter + return waiter + +class ReadSubprocessPipeStreamProto(base_subprocess.ReadSubprocessPipeProto): + def __init__(self, proc, fd, limit=streams._DEFAULT_LIMIT): + super().__init__(proc, fd) + self._stream_reader = streams.StreamReader(limit=limit) + + def connection_made(self, transport): + super().connection_made(transport) + self._stream_reader.set_transport(transport) + + def connection_lost(self, exc): + # FIXME: call super().connection_lost(exc) + if exc is None: + self._stream_reader.feed_eof() + else: + self._stream_reader.set_exception(exc) + + def data_received(self, data): + self._stream_reader.feed_data(data) + + def eof_received(self): + self._stream_reader.feed_eof() + + +class SubprocessStreamProtocol(protocols.SubprocessProtocol): + def __init__(self, limit=streams._DEFAULT_LIMIT): + self._pipes = {} + self.limit = limit + self.stdin = None + self.stdout = None + self.stderr = None + self._waiters = [] + self._transport = None + + def create_read_pipe_protocol(self, transport, fd): + pipe = ReadSubprocessPipeStreamProto(transport, fd, self.limit) + if fd == 1: + self.stdout = pipe._stream_reader + elif fd == 2: + self.stderr = pipe._stream_reader + return pipe + + def create_write_pipe_protocol(self, transport, fd): + pipe = WriteSubprocessPipeStreamProto(transport, fd) + if fd == 0: + self.stdin = pipe.writer + return pipe + + def connection_made(self, transport): + self._transport = transport + + def pipe_data_received(self, fd, data): + pipe = self._pipes[fd] + pipe.data_received(data) + + def pipe_connection_lost(self, fd, exc): + pipe = self._pipes[fd] + pipe.connection_lost(exc) + + @tasks.coroutine + def wait(self): + """ + Wait until the process exit and return the process return code. + """ + returncode = self._transport.get_returncode() + if returncode is not None: + return returncode + + fut = tasks.Future() + self._waiters.append(fut) + yield from fut + return fut.result() + + def process_exited(self): + returncode = self._transport.get_returncode() + # FIXME: not thread safe + waiters = self._waiters.copy() + self._waiters.clear() + for waiter in waiters: + waiter.set_result(returncode) diff --git a/asyncio/transports.py b/asyncio/transports.py index 67ae7fd..95a14e4 100644 --- a/asyncio/transports.py +++ b/asyncio/transports.py @@ -5,7 +5,7 @@ import sys _PY34 = sys.version_info >= (3, 4) __all__ = ['BaseTransport', 'ReadTransport', 'WriteTransport', - 'Transport', 'DatagramTransport', 'SubprocessTransport', + 'Transport', 'DatagramTransport', ] @@ -169,53 +169,3 @@ class DatagramTransport(BaseTransport): raise NotImplementedError -class SubprocessTransport(BaseTransport): - - def get_pid(self): - """Get subprocess id.""" - raise NotImplementedError - - def get_returncode(self): - """Get subprocess returncode. - - See also - http://docs.python.org/3/library/subprocess#subprocess.Popen.returncode - """ - raise NotImplementedError - - def get_pipe_transport(self, fd): - """Get transport for pipe with number fd.""" - raise NotImplementedError - - def send_signal(self, signal): - """Send signal to subprocess. - - See also: - docs.python.org/3/library/subprocess#subprocess.Popen.send_signal - """ - raise NotImplementedError - - def terminate(self): - """Stop the subprocess. - - Alias for close() method. - - On Posix OSs the method sends SIGTERM to the subprocess. - On Windows the Win32 API function TerminateProcess() - is called to stop the subprocess. - - See also: - http://docs.python.org/3/library/subprocess#subprocess.Popen.terminate - """ - raise NotImplementedError - - def kill(self): - """Kill the subprocess. - - On Posix OSs the function sends SIGKILL to the subprocess. - On Windows kill() is an alias for terminate(). - - See also: - http://docs.python.org/3/library/subprocess#subprocess.Popen.kill - """ - raise NotImplementedError diff --git a/asyncio/unix_events.py b/asyncio/unix_events.py index 219c88a..16de540 100644 --- a/asyncio/unix_events.py +++ b/asyncio/unix_events.py @@ -281,6 +281,16 @@ class _UnixWritePipeTransport(transports.WriteTransport): # Pipe was closed by peer. self._close() + # FIXME: inline this method? + def _pause_protocol(self): + # FIXME: try/except as _SelectorTransport._maybe_pause_protocol? + self._protocol.pause_writing() + + # FIXME: inline this method? + def _resume_protocol(self): + # FIXME: try/except as _SelectorTransport._maybe_resume_protocol? + self._protocol.resume_writing() + def write(self, data): assert isinstance(data, bytes), repr(data) if not data: @@ -310,6 +320,7 @@ class _UnixWritePipeTransport(transports.WriteTransport): self._loop.add_writer(self._fileno, self._write_ready) self._buffer.append(data) + self._pause_protocol() def _write_ready(self): data = b''.join(self._buffer) @@ -329,6 +340,8 @@ class _UnixWritePipeTransport(transports.WriteTransport): else: if n == len(data): self._loop.remove_writer(self._fileno) + # FIXME: move resume after the closing block? + self._resume_protocol() if self._closing: self._loop.remove_reader(self._fileno) self._call_connection_lost(None) diff --git a/examples/subprocess_stream.py b/examples/subprocess_stream.py new file mode 100644 index 0000000..18c64f7 --- /dev/null +++ b/examples/subprocess_stream.py @@ -0,0 +1,33 @@ +import asyncio + +@asyncio.coroutine +def cat(loop): + transport, proc = yield from loop.subprocess_shell(asyncio.SubprocessStreamProtocol, "cat") + print("pid: %s" % transport.get_pid()) + + message = "Hello World!" + print("cat write: %r" % message) + proc.stdin.write(message.encode('ascii')) + yield from proc.stdin.drain() + + proc.stdin.close() + read = yield from proc.stdout.read() + print("cat read: %r" % read.decode('ascii')) + + returncode = yield from proc.wait() + print("exit code: %s" % returncode) + transport.close() + +@asyncio.coroutine +def ls(loop): + transport, proc = yield from loop.subprocess_exec(asyncio.SubprocessStreamProtocol, "ls", stdin=None) + while True: + line = yield from proc.stdout.readline() + if not line: + break + print("ls>>", line.decode('ascii').rstrip()) + transport.close() + +loop = asyncio.get_event_loop() +loop.run_until_complete(cat(loop)) +loop.run_until_complete(ls(loop)) |