summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorVictor Stinner <victor.stinner@gmail.com>2014-01-27 18:56:39 +0100
committerVictor Stinner <victor.stinner@gmail.com>2014-01-27 18:56:39 +0100
commit3802b29ca9a76d9cf05ce19d648a26edf9b416b4 (patch)
tree80e98da79a9dd658120fb9e3e4c296c85b39b170
parentd6b5084304bcf5f2356e46c7e149b1bacd43942f (diff)
downloadtrollius-3802b29ca9a76d9cf05ce19d648a26edf9b416b4.tar.gz
Pipe protocols now notify the subprocess protocol using futures instead of
callbacks
-rw-r--r--asyncio/base_subprocess.py55
-rw-r--r--asyncio/subprocess_stream.py42
2 files changed, 54 insertions, 43 deletions
diff --git a/asyncio/base_subprocess.py b/asyncio/base_subprocess.py
index 48f7967..6cd6984 100644
--- a/asyncio/base_subprocess.py
+++ b/asyncio/base_subprocess.py
@@ -3,6 +3,7 @@ __all__ = ['SubprocessProtocol']
import collections
import subprocess
+from . import futures
from . import protocols
from . import tasks
from . import transports
@@ -16,11 +17,11 @@ STDERR = 2
class SubprocessProtocol(protocols.BaseProtocol):
"""Interface for protocol for subprocess calls."""
- def create_read_pipe_protocol(self, transport, fd):
- return ReadSubprocessPipeProto(transport, fd)
+ def create_read_pipe_protocol(self, transport, fd, waiter=None):
+ return ReadSubprocessPipeProto(transport, fd, waiter)
- def create_write_pipe_protocol(self, transport, fd):
- return WriteSubprocessPipeProto(transport, fd)
+ def create_write_pipe_protocol(self, transport, fd, waiter=None):
+ return WriteSubprocessPipeProto(transport, fd, waiter)
def pipe_data_received(self, fd, data):
"""Called when the subprocess writes data into stdout/stderr pipe.
@@ -104,19 +105,32 @@ class BaseSubprocessTransport(transports.SubprocessTransport):
proc = self._proc
loop = self._loop
if proc.stdin is not None:
- yield from loop.connect_write_pipe(
- lambda: self._protocol.create_write_pipe_protocol(self, STDIN),
+ f = futures.Future()
+ _, pipe = yield from loop.connect_write_pipe(
+ lambda: self._protocol.create_write_pipe_protocol(self, STDIN, f),
proc.stdin)
+ yield from f
+ self._pipes[STDIN] = pipe
if proc.stdout is not None:
- yield from loop.connect_read_pipe(
- lambda: self._protocol.create_read_pipe_protocol(self, STDOUT),
+ f = futures.Future()
+ _, pipe = yield from loop.connect_read_pipe(
+ lambda: self._protocol.create_read_pipe_protocol(self, STDOUT, f),
proc.stdout)
+ yield from f
+ self._pipes[STDOUT] = pipe
if proc.stderr is not None:
- yield from loop.connect_read_pipe(
- lambda: self._protocol.create_read_pipe_protocol(self, STDERR),
+ f = futures.Future()
+ _, pipe = yield from loop.connect_read_pipe(
+ lambda: self._protocol.create_read_pipe_protocol(self, STDERR, f),
proc.stderr)
- if not self._pipes:
- self._try_connected()
+ yield from f
+ self._pipes[STDERR] = pipe
+
+ assert self._pending_calls is not None
+ self._loop.call_soon(self._protocol.connection_made, self)
+ for callback, data in self._pending_calls:
+ self._loop.call_soon(callback, *data)
+ self._pending_calls = None
def _call(self, cb, *data):
if self._pending_calls is not None:
@@ -124,14 +138,6 @@ class BaseSubprocessTransport(transports.SubprocessTransport):
else:
self._loop.call_soon(cb, *data)
- def _try_connected(self):
- assert self._pending_calls is not None
- if all(p is not None and p.connected for p in self._pipes.values()):
- self._loop.call_soon(self._protocol.connection_made, self)
- for callback, data in self._pending_calls:
- self._loop.call_soon(callback, *data)
- self._pending_calls = None
-
def _pipe_connection_lost(self, fd, exc):
self._call(self._protocol.pipe_connection_lost, fd, exc)
self._try_finish()
@@ -167,17 +173,16 @@ class BaseSubprocessTransport(transports.SubprocessTransport):
class WriteSubprocessPipeProto(protocols.BaseProtocol):
pipe = None
- def __init__(self, proc, fd):
+ def __init__(self, proc, fd, waiter=None):
self.proc = proc
self.fd = fd
- self.connected = False
self.disconnected = False
- proc._pipes[fd] = self
+ self._waiter = waiter
def connection_made(self, transport):
- self.connected = True
self.pipe = transport
- self.proc._try_connected()
+ if self._waiter:
+ self._waiter.set_result(None)
def connection_lost(self, exc):
self.disconnected = True
diff --git a/asyncio/subprocess_stream.py b/asyncio/subprocess_stream.py
index 8adee8c..fbf92ed 100644
--- a/asyncio/subprocess_stream.py
+++ b/asyncio/subprocess_stream.py
@@ -1,5 +1,7 @@
__all__ = ['SubprocessStreamProtocol']
+import functools
+
from . import base_subprocess
from . import events
from . import protocols
@@ -7,15 +9,16 @@ from . import streams
from . import tasks
class WriteSubprocessPipeStreamProto(base_subprocess.WriteSubprocessPipeProto):
- def __init__(self, process_transport, fd):
- base_subprocess.WriteSubprocessPipeProto.__init__(self, process_transport, fd)
+ def __init__(self, process_transport, fd, waiter):
+ base_subprocess.WriteSubprocessPipeProto.__init__(self, process_transport, fd, waiter)
self._drain_waiter = None
self._paused = False
- self.writer = None
+ self.writer = streams.StreamWriter(None, self, None, None)
def connection_made(self, transport):
super().connection_made(transport)
- self.writer = streams.StreamWriter(transport, self, None, transport._loop)
+ self.writer._transport = transport
+ self.writer._loop = transport._loop
def connection_lost(self, exc):
super().connection_lost(exc)
@@ -46,8 +49,8 @@ class WriteSubprocessPipeStreamProto(base_subprocess.WriteSubprocessPipeProto):
class ReadSubprocessPipeStreamProto(base_subprocess.WriteSubprocessPipeProto,
protocols.Protocol):
- def __init__(self, proc, fd, limit=streams._DEFAULT_LIMIT):
- super().__init__(proc, fd)
+ def __init__(self, proc, fd, waiter=None, limit=streams._DEFAULT_LIMIT):
+ super().__init__(proc, fd, waiter)
self._stream_reader = streams.StreamReader(limit=limit)
def connection_made(self, transport):
@@ -78,24 +81,27 @@ class SubprocessStreamProtocol(base_subprocess.SubprocessProtocol):
self._waiters = []
self._transport = None
- def create_read_pipe_protocol(self, transport, fd):
- pipe = ReadSubprocessPipeStreamProto(transport, fd, self.limit)
- self._pipes[fd] = pipe
+ def _pipe_connection_made(self, pipe, fut):
+ fd = pipe.fd
+ if fd == 0:
+ self.stdin = pipe.writer
+ if fd == 1:
+ self.stdout = pipe._stream_reader
+ if fd == 2:
+ self.stderr = pipe._stream_reader
+
+ def create_read_pipe_protocol(self, transport, fd, waiter):
+ pipe = ReadSubprocessPipeStreamProto(transport, fd, waiter, self.limit)
+ waiter.add_done_callback(functools.partial(self._pipe_connection_made, pipe))
return pipe
- def create_write_pipe_protocol(self, transport, fd):
- pipe = WriteSubprocessPipeStreamProto(transport, fd)
- self._pipes[fd] = pipe
+ def create_write_pipe_protocol(self, transport, fd, waiter=None):
+ pipe = WriteSubprocessPipeStreamProto(transport, fd, waiter)
+ waiter.add_done_callback(functools.partial(self._pipe_connection_made, pipe))
return pipe
def connection_made(self, transport):
self._transport = transport
- if 0 in self._pipes:
- self.stdin = self._pipes[0].writer
- if 1 in self._pipes:
- self.stdout = self._pipes[1]._stream_reader
- if 2 in self._pipes:
- self.stderr = self._pipes[1]._stream_reader
@tasks.coroutine
def wait(self):