diff options
author | Victor Stinner <victor.stinner@gmail.com> | 2014-01-27 18:56:39 +0100 |
---|---|---|
committer | Victor Stinner <victor.stinner@gmail.com> | 2014-01-27 18:56:39 +0100 |
commit | 3802b29ca9a76d9cf05ce19d648a26edf9b416b4 (patch) | |
tree | 80e98da79a9dd658120fb9e3e4c296c85b39b170 | |
parent | d6b5084304bcf5f2356e46c7e149b1bacd43942f (diff) | |
download | trollius-3802b29ca9a76d9cf05ce19d648a26edf9b416b4.tar.gz |
Pipe protocols now notify the subprocess protocol using futures instead of
callbacks
-rw-r--r-- | asyncio/base_subprocess.py | 55 | ||||
-rw-r--r-- | asyncio/subprocess_stream.py | 42 |
2 files changed, 54 insertions, 43 deletions
diff --git a/asyncio/base_subprocess.py b/asyncio/base_subprocess.py index 48f7967..6cd6984 100644 --- a/asyncio/base_subprocess.py +++ b/asyncio/base_subprocess.py @@ -3,6 +3,7 @@ __all__ = ['SubprocessProtocol'] import collections import subprocess +from . import futures from . import protocols from . import tasks from . import transports @@ -16,11 +17,11 @@ STDERR = 2 class SubprocessProtocol(protocols.BaseProtocol): """Interface for protocol for subprocess calls.""" - def create_read_pipe_protocol(self, transport, fd): - return ReadSubprocessPipeProto(transport, fd) + def create_read_pipe_protocol(self, transport, fd, waiter=None): + return ReadSubprocessPipeProto(transport, fd, waiter) - def create_write_pipe_protocol(self, transport, fd): - return WriteSubprocessPipeProto(transport, fd) + def create_write_pipe_protocol(self, transport, fd, waiter=None): + return WriteSubprocessPipeProto(transport, fd, waiter) def pipe_data_received(self, fd, data): """Called when the subprocess writes data into stdout/stderr pipe. @@ -104,19 +105,32 @@ class BaseSubprocessTransport(transports.SubprocessTransport): proc = self._proc loop = self._loop if proc.stdin is not None: - yield from loop.connect_write_pipe( - lambda: self._protocol.create_write_pipe_protocol(self, STDIN), + f = futures.Future() + _, pipe = yield from loop.connect_write_pipe( + lambda: self._protocol.create_write_pipe_protocol(self, STDIN, f), proc.stdin) + yield from f + self._pipes[STDIN] = pipe if proc.stdout is not None: - yield from loop.connect_read_pipe( - lambda: self._protocol.create_read_pipe_protocol(self, STDOUT), + f = futures.Future() + _, pipe = yield from loop.connect_read_pipe( + lambda: self._protocol.create_read_pipe_protocol(self, STDOUT, f), proc.stdout) + yield from f + self._pipes[STDOUT] = pipe if proc.stderr is not None: - yield from loop.connect_read_pipe( - lambda: self._protocol.create_read_pipe_protocol(self, STDERR), + f = futures.Future() + _, pipe = yield from loop.connect_read_pipe( + lambda: self._protocol.create_read_pipe_protocol(self, STDERR, f), proc.stderr) - if not self._pipes: - self._try_connected() + yield from f + self._pipes[STDERR] = pipe + + assert self._pending_calls is not None + self._loop.call_soon(self._protocol.connection_made, self) + for callback, data in self._pending_calls: + self._loop.call_soon(callback, *data) + self._pending_calls = None def _call(self, cb, *data): if self._pending_calls is not None: @@ -124,14 +138,6 @@ class BaseSubprocessTransport(transports.SubprocessTransport): else: self._loop.call_soon(cb, *data) - def _try_connected(self): - assert self._pending_calls is not None - if all(p is not None and p.connected for p in self._pipes.values()): - self._loop.call_soon(self._protocol.connection_made, self) - for callback, data in self._pending_calls: - self._loop.call_soon(callback, *data) - self._pending_calls = None - def _pipe_connection_lost(self, fd, exc): self._call(self._protocol.pipe_connection_lost, fd, exc) self._try_finish() @@ -167,17 +173,16 @@ class BaseSubprocessTransport(transports.SubprocessTransport): class WriteSubprocessPipeProto(protocols.BaseProtocol): pipe = None - def __init__(self, proc, fd): + def __init__(self, proc, fd, waiter=None): self.proc = proc self.fd = fd - self.connected = False self.disconnected = False - proc._pipes[fd] = self + self._waiter = waiter def connection_made(self, transport): - self.connected = True self.pipe = transport - self.proc._try_connected() + if self._waiter: + self._waiter.set_result(None) def connection_lost(self, exc): self.disconnected = True diff --git a/asyncio/subprocess_stream.py b/asyncio/subprocess_stream.py index 8adee8c..fbf92ed 100644 --- a/asyncio/subprocess_stream.py +++ b/asyncio/subprocess_stream.py @@ -1,5 +1,7 @@ __all__ = ['SubprocessStreamProtocol'] +import functools + from . import base_subprocess from . import events from . import protocols @@ -7,15 +9,16 @@ from . import streams from . import tasks class WriteSubprocessPipeStreamProto(base_subprocess.WriteSubprocessPipeProto): - def __init__(self, process_transport, fd): - base_subprocess.WriteSubprocessPipeProto.__init__(self, process_transport, fd) + def __init__(self, process_transport, fd, waiter): + base_subprocess.WriteSubprocessPipeProto.__init__(self, process_transport, fd, waiter) self._drain_waiter = None self._paused = False - self.writer = None + self.writer = streams.StreamWriter(None, self, None, None) def connection_made(self, transport): super().connection_made(transport) - self.writer = streams.StreamWriter(transport, self, None, transport._loop) + self.writer._transport = transport + self.writer._loop = transport._loop def connection_lost(self, exc): super().connection_lost(exc) @@ -46,8 +49,8 @@ class WriteSubprocessPipeStreamProto(base_subprocess.WriteSubprocessPipeProto): class ReadSubprocessPipeStreamProto(base_subprocess.WriteSubprocessPipeProto, protocols.Protocol): - def __init__(self, proc, fd, limit=streams._DEFAULT_LIMIT): - super().__init__(proc, fd) + def __init__(self, proc, fd, waiter=None, limit=streams._DEFAULT_LIMIT): + super().__init__(proc, fd, waiter) self._stream_reader = streams.StreamReader(limit=limit) def connection_made(self, transport): @@ -78,24 +81,27 @@ class SubprocessStreamProtocol(base_subprocess.SubprocessProtocol): self._waiters = [] self._transport = None - def create_read_pipe_protocol(self, transport, fd): - pipe = ReadSubprocessPipeStreamProto(transport, fd, self.limit) - self._pipes[fd] = pipe + def _pipe_connection_made(self, pipe, fut): + fd = pipe.fd + if fd == 0: + self.stdin = pipe.writer + if fd == 1: + self.stdout = pipe._stream_reader + if fd == 2: + self.stderr = pipe._stream_reader + + def create_read_pipe_protocol(self, transport, fd, waiter): + pipe = ReadSubprocessPipeStreamProto(transport, fd, waiter, self.limit) + waiter.add_done_callback(functools.partial(self._pipe_connection_made, pipe)) return pipe - def create_write_pipe_protocol(self, transport, fd): - pipe = WriteSubprocessPipeStreamProto(transport, fd) - self._pipes[fd] = pipe + def create_write_pipe_protocol(self, transport, fd, waiter=None): + pipe = WriteSubprocessPipeStreamProto(transport, fd, waiter) + waiter.add_done_callback(functools.partial(self._pipe_connection_made, pipe)) return pipe def connection_made(self, transport): self._transport = transport - if 0 in self._pipes: - self.stdin = self._pipes[0].writer - if 1 in self._pipes: - self.stdout = self._pipes[1]._stream_reader - if 2 in self._pipes: - self.stderr = self._pipes[1]._stream_reader @tasks.coroutine def wait(self): |