From 22dcff8da49591903bcdce68b0ce746073af7361 Mon Sep 17 00:00:00 2001 From: Victor Stinner Date: Sun, 26 Jan 2014 00:50:48 +0100 Subject: Add SubprocessStreamProtocol --- asyncio/base_subprocess.py | 14 ++----- asyncio/protocols.py | 20 +++++++++- asyncio/streams.py | 91 ++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 114 insertions(+), 11 deletions(-) diff --git a/asyncio/base_subprocess.py b/asyncio/base_subprocess.py index d15fb15..cf3dc93 100644 --- a/asyncio/base_subprocess.py +++ b/asyncio/base_subprocess.py @@ -75,17 +75,11 @@ class BaseSubprocessTransport(transports.SubprocessTransport): proc = self._proc loop = self._loop if proc.stdin is not None: - transp, proto = yield from loop.connect_write_pipe( - lambda: WriteSubprocessPipeProto(self, STDIN), - proc.stdin) + transp, proto = yield from self._protocol.connect_write_pipe(self._loop, self, STDIN, proc.stdin) if proc.stdout is not None: - transp, proto = yield from loop.connect_read_pipe( - lambda: ReadSubprocessPipeProto(self, STDOUT), - proc.stdout) + transp, proto = yield from self._protocol.connect_read_pipe(self._loop, self, STDOUT, proc.stdout) if proc.stderr is not None: - transp, proto = yield from loop.connect_read_pipe( - lambda: ReadSubprocessPipeProto(self, STDERR), - proc.stderr) + transp, proto = yield from self._protocol.connect_read_pipe(self._loop, self, STDERR, proc.stderr) if not self._pipes: self._try_connected() @@ -115,7 +109,7 @@ class BaseSubprocessTransport(transports.SubprocessTransport): assert self._returncode is None, self._returncode self._returncode = returncode self._loop._subprocess_closed(self) - self._call(self._protocol.process_exited) + self._call(self._protocol.process_exited, returncode) self._try_finish() def _try_finish(self): diff --git a/asyncio/protocols.py b/asyncio/protocols.py index 3c4f3f4..9fe975c 100644 --- a/asyncio/protocols.py +++ b/asyncio/protocols.py @@ -1,4 +1,6 @@ """Abstract Protocol class.""" +# FIXME: don't create tasks in protocols +from . import tasks __all__ = ['BaseProtocol', 'Protocol', 'DatagramProtocol', 'SubprocessProtocol'] @@ -125,5 +127,21 @@ class SubprocessProtocol(BaseProtocol): fd is the int file descriptor that was closed. """ - def process_exited(self): + def process_exited(self, returncode): """Called when subprocess has exited.""" + + # FIXME: remove loop + @tasks.coroutine + def connect_read_pipe(self, loop, transport, fd, pipe): + from . import base_subprocess + return (yield from loop.connect_read_pipe( + lambda: base_subprocess.ReadSubprocessPipeProto(transport, fd), + pipe)) + + @tasks.coroutine + def connect_write_pipe(self, loop, transport, fd, pipe): + from . import base_subprocess + return (yield from loop.connect_write_pipe( + lambda: base_subprocess.WriteSubprocessPipeProto(transport, fd), + pipe)) + diff --git a/asyncio/streams.py b/asyncio/streams.py index b38e94e..77e34fc 100644 --- a/asyncio/streams.py +++ b/asyncio/streams.py @@ -453,3 +453,94 @@ class StreamReader: def close(self): return self._transport.close() + + +class SubprocessStreamProtocol(protocols.SubprocessProtocol): + def __init__(self, limit=_DEFAULT_LIMIT): + self._pipes = {} # file descriptor (int) => StreamReaderProtocol + self.limit = limit + self.stdin = None # TODO: _UnixWritePipeTransport, but should be StreamWriter + self.stdout = None # StreamReader + self.stderr = None # StreamReader + self._waiters = [] # list of Future waiting for the exit of the process, + # the result is the returncode of the process + self._returncode = None + self._loop = None + + def connection_made(self, transport): + self._loop = transport._loop + proc = transport._proc + if proc.stdin is not None: + # FIXME: implement StreamWriter for stdin + # stdin_transport = transport.get_pipe_transport(0) # _UnixWritePipeTransport + # stdin_protocol = stdin_transport._protocol # WriteSubprocessPipeProto + # class FakeReader: + # pass + # stdin_reader = FakeReader() # ??? + # stdin_reader._exception = None + # self.stdin = asyncio.StreamWriter(stdin_transport, stdin_protocol, stdin_reader, loop=self._loop) + self.stdin = transport.get_pipe_transport(0) + if proc.stdout is not None: + self.stdout = self._get_protocol(1)._stream_reader + if proc.stderr is not None: + self.stderr = self._get_protocol(2)._stream_reader + + def get_pipe_reader(self, fd): + if fd in self._pipes: + return self._pipes[fd]._stream_reader + else: + return None + + def _get_protocol(self, fd): + try: + return self._pipes[fd] + except KeyError: + reader = StreamReader(limit=self.limit) + protocol = StreamReaderProtocol(reader, loop=self._loop) + self._pipes[fd] = protocol + return protocol + + def pipe_data_received(self, fd, data): + protocol = self._get_protocol(fd) + protocol.data_received(data) + + def pipe_connection_lost(self, fd, exc): + protocol = self._get_protocol(fd) + protocol.connection_lost(exc) + + @tasks.coroutine + def wait(self): + """ + Wait until the process exit and return the process return code. + """ + if self._returncode: + return self._returncode + + fut = tasks.Future() + self._waiters.append(fut) + yield from fut + return fut.result() + + def process_exited(self, returncode): + self._returncode = returncode + # FIXME: not thread safe + waiters = self._waiters.copy() + self._waiters.clear() + for waiter in waiters: + waiter.set_result(returncode) + + # FIXME: remove loop + @tasks.coroutine + def connect_read_pipe(self, loop, transport, fd, pipe): + from . import base_subprocess + return (yield from loop.connect_read_pipe( + lambda: base_subprocess.ReadSubprocessPipeProto(transport, fd), + pipe)) + + @tasks.coroutine + def connect_write_pipe(self, loop, transport, fd, pipe): + from . import base_subprocess + return (yield from loop.connect_write_pipe( + lambda: base_subprocess.WriteSubprocessPipeProto(transport, fd), + pipe)) + -- cgit v1.2.1