summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorVictor Stinner <victor.stinner@gmail.com>2014-01-27 00:14:21 +0100
committerVictor Stinner <victor.stinner@gmail.com>2014-01-27 00:14:21 +0100
commita503d1536efd91f6e31260e56972ef2190fe8a04 (patch)
treeb73771e2e402000f75c9729d49d8d74a81784b3d
parent80b908deb40ebcb4e54ff5e9dd8902b7a148f3ca (diff)
parentcf89b14fc3e56223f6c06cd1448d12e1051fac28 (diff)
downloadtrollius-a503d1536efd91f6e31260e56972ef2190fe8a04.tar.gz
Merge with default
-rw-r--r--asyncio/__init__.py2
-rw-r--r--asyncio/base_subprocess.py74
-rw-r--r--asyncio/protocols.py7
-rw-r--r--asyncio/subprocess_stream.py188
-rw-r--r--asyncio/transports.py52
-rw-r--r--asyncio/unix_events.py13
-rw-r--r--examples/subprocess_stream.py33
7 files changed, 311 insertions, 58 deletions
diff --git a/asyncio/__init__.py b/asyncio/__init__.py
index eb22c38..e24766b 100644
--- a/asyncio/__init__.py
+++ b/asyncio/__init__.py
@@ -24,6 +24,7 @@ from .locks import *
from .protocols import *
from .queues import *
from .streams import *
+from .subprocess_stream import *
from .tasks import *
from .transports import *
@@ -39,5 +40,6 @@ __all__ = (events.__all__ +
protocols.__all__ +
queues.__all__ +
streams.__all__ +
+ subprocess_stream.__all__ +
tasks.__all__ +
transports.__all__)
diff --git a/asyncio/base_subprocess.py b/asyncio/base_subprocess.py
index c5efda7..6c65678 100644
--- a/asyncio/base_subprocess.py
+++ b/asyncio/base_subprocess.py
@@ -1,3 +1,5 @@
+__all__ = ['SubprocessTransport']
+
import collections
import subprocess
@@ -11,7 +13,65 @@ STDOUT = 1
STDERR = 2
-class BaseSubprocessTransport(transports.SubprocessTransport):
+class SubprocessTransport(transports.BaseTransport):
+
+ def create_read_pipe_protocol(self, transport, fd):
+ return ReadSubprocessPipeProto(transport, fd)
+
+ def create_write_pipe_protocol(self, transport, fd):
+ return WriteSubprocessPipeProto(transport, fd)
+
+ def get_pid(self):
+ """Get subprocess id."""
+ raise NotImplementedError
+
+ def get_returncode(self):
+ """Get subprocess returncode.
+
+ See also
+ http://docs.python.org/3/library/subprocess#subprocess.Popen.returncode
+ """
+ raise NotImplementedError
+
+ def get_pipe_transport(self, fd):
+ """Get transport for pipe with number fd."""
+ raise NotImplementedError
+
+ def send_signal(self, signal):
+ """Send signal to subprocess.
+
+ See also:
+ docs.python.org/3/library/subprocess#subprocess.Popen.send_signal
+ """
+ raise NotImplementedError
+
+ def terminate(self):
+ """Stop the subprocess.
+
+ Alias for close() method.
+
+ On Posix OSs the method sends SIGTERM to the subprocess.
+ On Windows the Win32 API function TerminateProcess()
+ is called to stop the subprocess.
+
+ See also:
+ http://docs.python.org/3/library/subprocess#subprocess.Popen.terminate
+ """
+ raise NotImplementedError
+
+ def kill(self):
+ """Kill the subprocess.
+
+ On Posix OSs the function sends SIGKILL to the subprocess.
+ On Windows kill() is an alias for terminate().
+
+ See also:
+ http://docs.python.org/3/library/subprocess#subprocess.Popen.kill
+ """
+ raise NotImplementedError
+
+
+class BaseSubprocessTransport(SubprocessTransport):
def __init__(self, loop, protocol, args, shell,
stdin, stdout, stderr, bufsize,
@@ -75,16 +135,16 @@ class BaseSubprocessTransport(transports.SubprocessTransport):
proc = self._proc
loop = self._loop
if proc.stdin is not None:
- transp, proto = yield from loop.connect_write_pipe(
- lambda: WriteSubprocessPipeProto(self, STDIN),
+ yield from loop.connect_write_pipe(
+ lambda: self._protocol.create_write_pipe_protocol(self, STDIN),
proc.stdin)
if proc.stdout is not None:
- transp, proto = yield from loop.connect_read_pipe(
- lambda: ReadSubprocessPipeProto(self, STDOUT),
+ yield from loop.connect_read_pipe(
+ lambda: self._protocol.create_read_pipe_protocol(self, STDOUT),
proc.stdout)
if proc.stderr is not None:
- transp, proto = yield from loop.connect_read_pipe(
- lambda: ReadSubprocessPipeProto(self, STDERR),
+ yield from loop.connect_read_pipe(
+ lambda: self._protocol.create_read_pipe_protocol(self, STDERR),
proc.stderr)
if not self._pipes:
self._try_connected()
diff --git a/asyncio/protocols.py b/asyncio/protocols.py
index 3c4f3f4..99f0c82 100644
--- a/asyncio/protocols.py
+++ b/asyncio/protocols.py
@@ -111,6 +111,12 @@ class DatagramProtocol(BaseProtocol):
class SubprocessProtocol(BaseProtocol):
"""Interface for protocol for subprocess calls."""
+ read_pipe_protocol = None
+ write_pipe_protocol = None
+
+ def pipe_connection_made(self, fd, pipe):
+ pass
+
def pipe_data_received(self, fd, data):
"""Called when the subprocess writes data into stdout/stderr pipe.
@@ -127,3 +133,4 @@ class SubprocessProtocol(BaseProtocol):
def process_exited(self):
"""Called when subprocess has exited."""
+
diff --git a/asyncio/subprocess_stream.py b/asyncio/subprocess_stream.py
new file mode 100644
index 0000000..ff22f8f
--- /dev/null
+++ b/asyncio/subprocess_stream.py
@@ -0,0 +1,188 @@
+__all__ = ['SubprocessStreamProtocol']
+
+from . import base_subprocess
+from . import events
+from . import protocols
+from . import streams
+from . import tasks
+
+class WriteSubprocessPipeStreamProto(base_subprocess.WriteSubprocessPipeProto):
+ def __init__(self, process_transport, fd):
+ base_subprocess.WriteSubprocessPipeProto.__init__(self, process_transport, fd)
+ self._drain_waiter = None
+ self._paused = False
+ self.writer = WritePipeStream(None, self, None)
+
+ def connection_made(self, transport):
+ super().connection_made(transport)
+ self.writer._transport = transport
+ self.writer._loop = transport._loop
+
+ def connection_lost(self, exc):
+ # FIXME: call super().connection_lost(exc)
+ # Also wake up the writing side.
+ if self._paused:
+ waiter = self._drain_waiter
+ if waiter is not None:
+ self._drain_waiter = None
+ if not waiter.done():
+ if exc is None:
+ waiter.set_result(None)
+ else:
+ waiter.set_exception(exc)
+
+ def pause_writing(self):
+ assert not self._paused
+ self._paused = True
+
+ def resume_writing(self):
+ assert self._paused
+ self._paused = False
+ waiter = self._drain_waiter
+ if waiter is not None:
+ self._drain_waiter = None
+ if not waiter.done():
+ waiter.set_result(None)
+
+
+class WritePipeStream:
+ """Wraps a Transport.
+
+ This exposes write(), writelines(), [can_]write_eof(),
+ get_extra_info() and close(). It adds drain() which returns an
+ optional Future on which you can wait for flow control. It also
+ adds a transport property which references the Transport
+ directly.
+ """
+
+ def __init__(self, transport, protocol, loop):
+ self._transport = transport
+ self._protocol = protocol
+ self._loop = loop
+
+ @property
+ def transport(self):
+ return self._transport
+
+ def write(self, data):
+ self._transport.write(data)
+
+ def writelines(self, data):
+ self._transport.writelines(data)
+
+ def write_eof(self):
+ return self._transport.write_eof()
+
+ def can_write_eof(self):
+ return self._transport.can_write_eof()
+
+ def close(self):
+ return self._transport.close()
+
+ def get_extra_info(self, name, default=None):
+ return self._transport.get_extra_info(name, default)
+
+ def drain(self):
+ """This method has an unusual return value.
+
+ The intended use is to write
+
+ w.write(data)
+ yield from w.drain()
+
+ When there's nothing to wait for, drain() returns (), and the
+ yield-from continues immediately. When the transport buffer
+ is full (the protocol is paused), drain() creates and returns
+ a Future and the yield-from will block until that Future is
+ completed, which will happen when the buffer is (partially)
+ drained and the protocol is resumed.
+ """
+ if self._transport._conn_lost: # Uses private variable.
+ raise ConnectionResetError('Connection lost')
+ if not self._protocol._paused:
+ return ()
+ waiter = self._protocol._drain_waiter
+ assert waiter is None or waiter.cancelled()
+ waiter = futures.Future(loop=self._loop)
+ self._protocol._drain_waiter = waiter
+ return waiter
+
+class ReadSubprocessPipeStreamProto(base_subprocess.ReadSubprocessPipeProto):
+ def __init__(self, proc, fd, limit=streams._DEFAULT_LIMIT):
+ super().__init__(proc, fd)
+ self._stream_reader = streams.StreamReader(limit=limit)
+
+ def connection_made(self, transport):
+ super().connection_made(transport)
+ self._stream_reader.set_transport(transport)
+
+ def connection_lost(self, exc):
+ # FIXME: call super().connection_lost(exc)
+ if exc is None:
+ self._stream_reader.feed_eof()
+ else:
+ self._stream_reader.set_exception(exc)
+
+ def data_received(self, data):
+ self._stream_reader.feed_data(data)
+
+ def eof_received(self):
+ self._stream_reader.feed_eof()
+
+
+class SubprocessStreamProtocol(protocols.SubprocessProtocol):
+ def __init__(self, limit=streams._DEFAULT_LIMIT):
+ self._pipes = {}
+ self.limit = limit
+ self.stdin = None
+ self.stdout = None
+ self.stderr = None
+ self._waiters = []
+ self._transport = None
+
+ def create_read_pipe_protocol(self, transport, fd):
+ pipe = ReadSubprocessPipeStreamProto(transport, fd, self.limit)
+ if fd == 1:
+ self.stdout = pipe._stream_reader
+ elif fd == 2:
+ self.stderr = pipe._stream_reader
+ return pipe
+
+ def create_write_pipe_protocol(self, transport, fd):
+ pipe = WriteSubprocessPipeStreamProto(transport, fd)
+ if fd == 0:
+ self.stdin = pipe.writer
+ return pipe
+
+ def connection_made(self, transport):
+ self._transport = transport
+
+ def pipe_data_received(self, fd, data):
+ pipe = self._pipes[fd]
+ pipe.data_received(data)
+
+ def pipe_connection_lost(self, fd, exc):
+ pipe = self._pipes[fd]
+ pipe.connection_lost(exc)
+
+ @tasks.coroutine
+ def wait(self):
+ """
+ Wait until the process exit and return the process return code.
+ """
+ returncode = self._transport.get_returncode()
+ if returncode is not None:
+ return returncode
+
+ fut = tasks.Future()
+ self._waiters.append(fut)
+ yield from fut
+ return fut.result()
+
+ def process_exited(self):
+ returncode = self._transport.get_returncode()
+ # FIXME: not thread safe
+ waiters = self._waiters.copy()
+ self._waiters.clear()
+ for waiter in waiters:
+ waiter.set_result(returncode)
diff --git a/asyncio/transports.py b/asyncio/transports.py
index 67ae7fd..95a14e4 100644
--- a/asyncio/transports.py
+++ b/asyncio/transports.py
@@ -5,7 +5,7 @@ import sys
_PY34 = sys.version_info >= (3, 4)
__all__ = ['BaseTransport', 'ReadTransport', 'WriteTransport',
- 'Transport', 'DatagramTransport', 'SubprocessTransport',
+ 'Transport', 'DatagramTransport',
]
@@ -169,53 +169,3 @@ class DatagramTransport(BaseTransport):
raise NotImplementedError
-class SubprocessTransport(BaseTransport):
-
- def get_pid(self):
- """Get subprocess id."""
- raise NotImplementedError
-
- def get_returncode(self):
- """Get subprocess returncode.
-
- See also
- http://docs.python.org/3/library/subprocess#subprocess.Popen.returncode
- """
- raise NotImplementedError
-
- def get_pipe_transport(self, fd):
- """Get transport for pipe with number fd."""
- raise NotImplementedError
-
- def send_signal(self, signal):
- """Send signal to subprocess.
-
- See also:
- docs.python.org/3/library/subprocess#subprocess.Popen.send_signal
- """
- raise NotImplementedError
-
- def terminate(self):
- """Stop the subprocess.
-
- Alias for close() method.
-
- On Posix OSs the method sends SIGTERM to the subprocess.
- On Windows the Win32 API function TerminateProcess()
- is called to stop the subprocess.
-
- See also:
- http://docs.python.org/3/library/subprocess#subprocess.Popen.terminate
- """
- raise NotImplementedError
-
- def kill(self):
- """Kill the subprocess.
-
- On Posix OSs the function sends SIGKILL to the subprocess.
- On Windows kill() is an alias for terminate().
-
- See also:
- http://docs.python.org/3/library/subprocess#subprocess.Popen.kill
- """
- raise NotImplementedError
diff --git a/asyncio/unix_events.py b/asyncio/unix_events.py
index 219c88a..16de540 100644
--- a/asyncio/unix_events.py
+++ b/asyncio/unix_events.py
@@ -281,6 +281,16 @@ class _UnixWritePipeTransport(transports.WriteTransport):
# Pipe was closed by peer.
self._close()
+ # FIXME: inline this method?
+ def _pause_protocol(self):
+ # FIXME: try/except as _SelectorTransport._maybe_pause_protocol?
+ self._protocol.pause_writing()
+
+ # FIXME: inline this method?
+ def _resume_protocol(self):
+ # FIXME: try/except as _SelectorTransport._maybe_resume_protocol?
+ self._protocol.resume_writing()
+
def write(self, data):
assert isinstance(data, bytes), repr(data)
if not data:
@@ -310,6 +320,7 @@ class _UnixWritePipeTransport(transports.WriteTransport):
self._loop.add_writer(self._fileno, self._write_ready)
self._buffer.append(data)
+ self._pause_protocol()
def _write_ready(self):
data = b''.join(self._buffer)
@@ -329,6 +340,8 @@ class _UnixWritePipeTransport(transports.WriteTransport):
else:
if n == len(data):
self._loop.remove_writer(self._fileno)
+ # FIXME: move resume after the closing block?
+ self._resume_protocol()
if self._closing:
self._loop.remove_reader(self._fileno)
self._call_connection_lost(None)
diff --git a/examples/subprocess_stream.py b/examples/subprocess_stream.py
new file mode 100644
index 0000000..18c64f7
--- /dev/null
+++ b/examples/subprocess_stream.py
@@ -0,0 +1,33 @@
+import asyncio
+
+@asyncio.coroutine
+def cat(loop):
+ transport, proc = yield from loop.subprocess_shell(asyncio.SubprocessStreamProtocol, "cat")
+ print("pid: %s" % transport.get_pid())
+
+ message = "Hello World!"
+ print("cat write: %r" % message)
+ proc.stdin.write(message.encode('ascii'))
+ yield from proc.stdin.drain()
+
+ proc.stdin.close()
+ read = yield from proc.stdout.read()
+ print("cat read: %r" % read.decode('ascii'))
+
+ returncode = yield from proc.wait()
+ print("exit code: %s" % returncode)
+ transport.close()
+
+@asyncio.coroutine
+def ls(loop):
+ transport, proc = yield from loop.subprocess_exec(asyncio.SubprocessStreamProtocol, "ls", stdin=None)
+ while True:
+ line = yield from proc.stdout.readline()
+ if not line:
+ break
+ print("ls>>", line.decode('ascii').rstrip())
+ transport.close()
+
+loop = asyncio.get_event_loop()
+loop.run_until_complete(cat(loop))
+loop.run_until_complete(ls(loop))