summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorVictor Stinner <victor.stinner@gmail.com>2014-01-26 00:50:48 +0100
committerVictor Stinner <victor.stinner@gmail.com>2014-01-26 00:50:48 +0100
commit22dcff8da49591903bcdce68b0ce746073af7361 (patch)
tree1089533c79dce4b3edb12d6da3ef7188f7c4df2f
parent043c3cbe9c8190aa95b0e45b7ba7ae13b17bfe9c (diff)
downloadtrollius-22dcff8da49591903bcdce68b0ce746073af7361.tar.gz
Add SubprocessStreamProtocol
-rw-r--r--asyncio/base_subprocess.py14
-rw-r--r--asyncio/protocols.py20
-rw-r--r--asyncio/streams.py91
3 files changed, 114 insertions, 11 deletions
diff --git a/asyncio/base_subprocess.py b/asyncio/base_subprocess.py
index d15fb15..cf3dc93 100644
--- a/asyncio/base_subprocess.py
+++ b/asyncio/base_subprocess.py
@@ -75,17 +75,11 @@ class BaseSubprocessTransport(transports.SubprocessTransport):
proc = self._proc
loop = self._loop
if proc.stdin is not None:
- transp, proto = yield from loop.connect_write_pipe(
- lambda: WriteSubprocessPipeProto(self, STDIN),
- proc.stdin)
+ transp, proto = yield from self._protocol.connect_write_pipe(self._loop, self, STDIN, proc.stdin)
if proc.stdout is not None:
- transp, proto = yield from loop.connect_read_pipe(
- lambda: ReadSubprocessPipeProto(self, STDOUT),
- proc.stdout)
+ transp, proto = yield from self._protocol.connect_read_pipe(self._loop, self, STDOUT, proc.stdout)
if proc.stderr is not None:
- transp, proto = yield from loop.connect_read_pipe(
- lambda: ReadSubprocessPipeProto(self, STDERR),
- proc.stderr)
+ transp, proto = yield from self._protocol.connect_read_pipe(self._loop, self, STDERR, proc.stderr)
if not self._pipes:
self._try_connected()
@@ -115,7 +109,7 @@ class BaseSubprocessTransport(transports.SubprocessTransport):
assert self._returncode is None, self._returncode
self._returncode = returncode
self._loop._subprocess_closed(self)
- self._call(self._protocol.process_exited)
+ self._call(self._protocol.process_exited, returncode)
self._try_finish()
def _try_finish(self):
diff --git a/asyncio/protocols.py b/asyncio/protocols.py
index 3c4f3f4..9fe975c 100644
--- a/asyncio/protocols.py
+++ b/asyncio/protocols.py
@@ -1,4 +1,6 @@
"""Abstract Protocol class."""
+# FIXME: don't create tasks in protocols
+from . import tasks
__all__ = ['BaseProtocol', 'Protocol', 'DatagramProtocol',
'SubprocessProtocol']
@@ -125,5 +127,21 @@ class SubprocessProtocol(BaseProtocol):
fd is the int file descriptor that was closed.
"""
- def process_exited(self):
+ def process_exited(self, returncode):
"""Called when subprocess has exited."""
+
+ # FIXME: remove loop
+ @tasks.coroutine
+ def connect_read_pipe(self, loop, transport, fd, pipe):
+ from . import base_subprocess
+ return (yield from loop.connect_read_pipe(
+ lambda: base_subprocess.ReadSubprocessPipeProto(transport, fd),
+ pipe))
+
+ @tasks.coroutine
+ def connect_write_pipe(self, loop, transport, fd, pipe):
+ from . import base_subprocess
+ return (yield from loop.connect_write_pipe(
+ lambda: base_subprocess.WriteSubprocessPipeProto(transport, fd),
+ pipe))
+
diff --git a/asyncio/streams.py b/asyncio/streams.py
index b38e94e..77e34fc 100644
--- a/asyncio/streams.py
+++ b/asyncio/streams.py
@@ -453,3 +453,94 @@ class StreamReader:
def close(self):
return self._transport.close()
+
+
+class SubprocessStreamProtocol(protocols.SubprocessProtocol):
+ def __init__(self, limit=_DEFAULT_LIMIT):
+ self._pipes = {} # file descriptor (int) => StreamReaderProtocol
+ self.limit = limit
+ self.stdin = None # TODO: _UnixWritePipeTransport, but should be StreamWriter
+ self.stdout = None # StreamReader
+ self.stderr = None # StreamReader
+ self._waiters = [] # list of Future waiting for the exit of the process,
+ # the result is the returncode of the process
+ self._returncode = None
+ self._loop = None
+
+ def connection_made(self, transport):
+ self._loop = transport._loop
+ proc = transport._proc
+ if proc.stdin is not None:
+ # FIXME: implement StreamWriter for stdin
+ # stdin_transport = transport.get_pipe_transport(0) # _UnixWritePipeTransport
+ # stdin_protocol = stdin_transport._protocol # WriteSubprocessPipeProto
+ # class FakeReader:
+ # pass
+ # stdin_reader = FakeReader() # ???
+ # stdin_reader._exception = None
+ # self.stdin = asyncio.StreamWriter(stdin_transport, stdin_protocol, stdin_reader, loop=self._loop)
+ self.stdin = transport.get_pipe_transport(0)
+ if proc.stdout is not None:
+ self.stdout = self._get_protocol(1)._stream_reader
+ if proc.stderr is not None:
+ self.stderr = self._get_protocol(2)._stream_reader
+
+ def get_pipe_reader(self, fd):
+ if fd in self._pipes:
+ return self._pipes[fd]._stream_reader
+ else:
+ return None
+
+ def _get_protocol(self, fd):
+ try:
+ return self._pipes[fd]
+ except KeyError:
+ reader = StreamReader(limit=self.limit)
+ protocol = StreamReaderProtocol(reader, loop=self._loop)
+ self._pipes[fd] = protocol
+ return protocol
+
+ def pipe_data_received(self, fd, data):
+ protocol = self._get_protocol(fd)
+ protocol.data_received(data)
+
+ def pipe_connection_lost(self, fd, exc):
+ protocol = self._get_protocol(fd)
+ protocol.connection_lost(exc)
+
+ @tasks.coroutine
+ def wait(self):
+ """
+ Wait until the process exit and return the process return code.
+ """
+ if self._returncode:
+ return self._returncode
+
+ fut = tasks.Future()
+ self._waiters.append(fut)
+ yield from fut
+ return fut.result()
+
+ def process_exited(self, returncode):
+ self._returncode = returncode
+ # FIXME: not thread safe
+ waiters = self._waiters.copy()
+ self._waiters.clear()
+ for waiter in waiters:
+ waiter.set_result(returncode)
+
+ # FIXME: remove loop
+ @tasks.coroutine
+ def connect_read_pipe(self, loop, transport, fd, pipe):
+ from . import base_subprocess
+ return (yield from loop.connect_read_pipe(
+ lambda: base_subprocess.ReadSubprocessPipeProto(transport, fd),
+ pipe))
+
+ @tasks.coroutine
+ def connect_write_pipe(self, loop, transport, fd, pipe):
+ from . import base_subprocess
+ return (yield from loop.connect_write_pipe(
+ lambda: base_subprocess.WriteSubprocessPipeProto(transport, fd),
+ pipe))
+