summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorVictor Stinner <victor.stinner@gmail.com>2014-01-26 03:08:34 +0100
committerVictor Stinner <victor.stinner@gmail.com>2014-01-26 03:08:34 +0100
commitdb9648e17f135247751a7f3a87481192070d4e61 (patch)
tree5c523f4d9f3a34a1beb5e6f961b2bf38d9ad5ccd
parent0ccc4e2be0c6dfb0ffe5176577a1378667ea0bfa (diff)
downloadtrollius-db9648e17f135247751a7f3a87481192070d4e61.tar.gz
Move new classes to subprocess_stream.py
-rw-r--r--asyncio/__init__.py2
-rw-r--r--asyncio/base_subprocess.py201
-rw-r--r--asyncio/protocols.py20
-rw-r--r--asyncio/subprocess_stream.py193
-rw-r--r--asyncio/unix_events.py4
-rw-r--r--examples/subprocess_stream.py5
6 files changed, 227 insertions, 198 deletions
diff --git a/asyncio/__init__.py b/asyncio/__init__.py
index eb22c38..e24766b 100644
--- a/asyncio/__init__.py
+++ b/asyncio/__init__.py
@@ -24,6 +24,7 @@ from .locks import *
from .protocols import *
from .queues import *
from .streams import *
+from .subprocess_stream import *
from .tasks import *
from .transports import *
@@ -39,5 +40,6 @@ __all__ = (events.__all__ +
protocols.__all__ +
queues.__all__ +
streams.__all__ +
+ subprocess_stream.__all__ +
tasks.__all__ +
transports.__all__)
diff --git a/asyncio/base_subprocess.py b/asyncio/base_subprocess.py
index 3fd60d3..88e3574 100644
--- a/asyncio/base_subprocess.py
+++ b/asyncio/base_subprocess.py
@@ -3,7 +3,6 @@ import subprocess
from . import protocols
from . import tasks
-from . import streams
from . import transports
@@ -16,6 +15,7 @@ class BaseSubprocessTransport(transports.SubprocessTransport):
def __init__(self, loop, protocol, args, shell,
stdin, stdout, stderr, bufsize,
+ read_pipe_protocol_factory, write_pipe_protocol_factory,
extra=None, **kwargs):
super().__init__(extra)
self._protocol = protocol
@@ -31,6 +31,14 @@ class BaseSubprocessTransport(transports.SubprocessTransport):
self._pending_calls = collections.deque()
self._finished = False
self._returncode = None
+ if read_pipe_protocol_factory is not None:
+ self._read_pipe_proto_factory = read_pipe_protocol_factory
+ else:
+ self._read_pipe_proto_factory = ReadSubprocessPipeProto
+ if write_pipe_protocol_factory is not None:
+ self._write_pipe_proto_factory = write_pipe_protocol_factory
+ else:
+ self._write_pipe_proto_factory = WriteSubprocessPipeProto
self._start(args=args, shell=shell, stdin=stdin, stdout=stdout,
stderr=stderr, bufsize=bufsize, **kwargs)
self._extra['subprocess'] = self._proc
@@ -76,11 +84,17 @@ class BaseSubprocessTransport(transports.SubprocessTransport):
proc = self._proc
loop = self._loop
if proc.stdin is not None:
- transp, proto = yield from self._protocol.connect_write_pipe(self._loop, self, STDIN, proc.stdin)
+ yield from loop.connect_write_pipe(
+ lambda: self._write_pipe_proto_factory(self, STDIN),
+ proc.stdin)
if proc.stdout is not None:
- transp, proto = yield from self._protocol.connect_read_pipe(self._loop, self, STDOUT, proc.stdout)
+ yield from loop.connect_read_pipe(
+ lambda: self._read_pipe_proto_factory(self, STDOUT),
+ proc.stdout)
if proc.stderr is not None:
- transp, proto = yield from self._protocol.connect_read_pipe(self._loop, self, STDERR, proc.stderr)
+ yield from loop.connect_read_pipe(
+ lambda: self._read_pipe_proto_factory(self, STDERR),
+ proc.stderr)
if not self._pipes:
self._try_connected()
@@ -98,6 +112,10 @@ class BaseSubprocessTransport(transports.SubprocessTransport):
self._loop.call_soon(callback, *data)
self._pending_calls = None
+ def _pipe_connection_made(self, fd, pipe):
+ self._try_connected()
+ self._protocol.pipe_connection_made(fd, pipe)
+
def _pipe_connection_lost(self, fd, exc):
self._call(self._protocol.pipe_connection_lost, fd, exc)
self._try_finish()
@@ -144,7 +162,7 @@ class WriteSubprocessPipeProto(protocols.BaseProtocol):
def connection_made(self, transport):
self.connected = True
self.pipe = transport
- self.proc._try_connected()
+ self.proc._pipe_connection_made(self.fd, self)
def connection_lost(self, exc):
self.disconnected = True
@@ -161,176 +179,3 @@ class ReadSubprocessPipeProto(WriteSubprocessPipeProto,
self.proc._pipe_data_received(self.fd, data)
-class WriteSubprocessPipeStreamProto(WriteSubprocessPipeProto):
- def __init__(self, process_transport, fd):
- WriteSubprocessPipeProto.__init__(self, process_transport, fd)
- self._drain_waiter = None
- self._paused = False
-
- def connection_lost(self, exc):
- # Also wake up the writing side.
- if self._paused:
- waiter = self._drain_waiter
- if waiter is not None:
- self._drain_waiter = None
- if not waiter.done():
- if exc is None:
- waiter.set_result(None)
- else:
- waiter.set_exception(exc)
-
- def pause_writing(self):
- assert not self._paused
- self._paused = True
-
- def resume_writing(self):
- assert self._paused
- self._paused = False
- waiter = self._drain_waiter
- if waiter is not None:
- self._drain_waiter = None
- if not waiter.done():
- waiter.set_result(None)
-
-
-class WritePipeStream:
- """Wraps a Transport.
-
- This exposes write(), writelines(), [can_]write_eof(),
- get_extra_info() and close(). It adds drain() which returns an
- optional Future on which you can wait for flow control. It also
- adds a transport property which references the Transport
- directly.
- """
-
- def __init__(self, transport, protocol, loop):
- self._transport = transport
- self._protocol = protocol
- self._loop = loop
-
- @property
- def transport(self):
- return self._transport
-
- def write(self, data):
- self._transport.write(data)
-
- def writelines(self, data):
- self._transport.writelines(data)
-
- def write_eof(self):
- return self._transport.write_eof()
-
- def can_write_eof(self):
- return self._transport.can_write_eof()
-
- def close(self):
- return self._transport.close()
-
- def get_extra_info(self, name, default=None):
- return self._transport.get_extra_info(name, default)
-
- def drain(self):
- """This method has an unusual return value.
-
- The intended use is to write
-
- w.write(data)
- yield from w.drain()
-
- When there's nothing to wait for, drain() returns (), and the
- yield-from continues immediately. When the transport buffer
- is full (the protocol is paused), drain() creates and returns
- a Future and the yield-from will block until that Future is
- completed, which will happen when the buffer is (partially)
- drained and the protocol is resumed.
- """
- if self._transport._conn_lost: # Uses private variable.
- raise ConnectionResetError('Connection lost')
- if not self._protocol._paused:
- return ()
- waiter = self._protocol._drain_waiter
- assert waiter is None or waiter.cancelled()
- waiter = futures.Future(loop=self._loop)
- self._protocol._drain_waiter = waiter
- return waiter
-
-
-class SubprocessStreamProtocol(protocols.SubprocessProtocol):
- def __init__(self, limit=streams._DEFAULT_LIMIT):
- self._pipes = {}
- self.limit = limit
- self.stdin = None
- self.stdout = None
- self.stderr = None
- self._waiters = []
- self._returncode = None
- self._loop = None
-
- def connection_made(self, transport):
- self._loop = transport._loop
- proc = transport._proc
- if proc.stdout is not None:
- self.stdout = self._get_protocol(1)._stream_reader
- if proc.stderr is not None:
- self.stderr = self._get_protocol(2)._stream_reader
-
- def get_pipe_reader(self, fd):
- if fd in self._pipes:
- return self._pipes[fd]._stream_reader
- else:
- return None
-
- def _get_protocol(self, fd):
- try:
- return self._pipes[fd]
- except KeyError:
- reader = streams.StreamReader(limit=self.limit)
- protocol = streams.StreamReaderProtocol(reader, loop=self._loop)
- self._pipes[fd] = protocol
- return protocol
-
- def pipe_data_received(self, fd, data):
- protocol = self._get_protocol(fd)
- protocol.data_received(data)
-
- def pipe_connection_lost(self, fd, exc):
- protocol = self._get_protocol(fd)
- protocol.connection_lost(exc)
-
- @tasks.coroutine
- def wait(self):
- """
- Wait until the process exit and return the process return code.
- """
- if self._returncode:
- return self._returncode
-
- fut = tasks.Future()
- self._waiters.append(fut)
- yield from fut
- return fut.result()
-
- def process_exited(self, returncode):
- self._returncode = returncode
- # FIXME: not thread safe
- waiters = self._waiters.copy()
- self._waiters.clear()
- for waiter in waiters:
- waiter.set_result(returncode)
-
- # FIXME: remove loop
- @tasks.coroutine
- def connect_read_pipe(self, loop, transport, fd, pipe):
- return (yield from loop.connect_read_pipe(
- lambda: ReadSubprocessPipeProto(transport, fd),
- pipe))
-
- @tasks.coroutine
- def connect_write_pipe(self, loop, process_transport, fd, pipe):
- transport, protocol = yield from loop.connect_write_pipe(lambda: WriteSubprocessPipeStreamProto(process_transport, fd), pipe)
- writer = WritePipeStream(transport, protocol, loop)
- if fd == 0:
- self.stdin = writer
- return transport, writer
-
diff --git a/asyncio/protocols.py b/asyncio/protocols.py
index 9fe975c..238789b 100644
--- a/asyncio/protocols.py
+++ b/asyncio/protocols.py
@@ -1,6 +1,4 @@
"""Abstract Protocol class."""
-# FIXME: don't create tasks in protocols
-from . import tasks
__all__ = ['BaseProtocol', 'Protocol', 'DatagramProtocol',
'SubprocessProtocol']
@@ -113,6 +111,9 @@ class DatagramProtocol(BaseProtocol):
class SubprocessProtocol(BaseProtocol):
"""Interface for protocol for subprocess calls."""
+ def pipe_connection_made(self, fd, pipe):
+ pass
+
def pipe_data_received(self, fd, data):
"""Called when the subprocess writes data into stdout/stderr pipe.
@@ -130,18 +131,3 @@ class SubprocessProtocol(BaseProtocol):
def process_exited(self, returncode):
"""Called when subprocess has exited."""
- # FIXME: remove loop
- @tasks.coroutine
- def connect_read_pipe(self, loop, transport, fd, pipe):
- from . import base_subprocess
- return (yield from loop.connect_read_pipe(
- lambda: base_subprocess.ReadSubprocessPipeProto(transport, fd),
- pipe))
-
- @tasks.coroutine
- def connect_write_pipe(self, loop, transport, fd, pipe):
- from . import base_subprocess
- return (yield from loop.connect_write_pipe(
- lambda: base_subprocess.WriteSubprocessPipeProto(transport, fd),
- pipe))
-
diff --git a/asyncio/subprocess_stream.py b/asyncio/subprocess_stream.py
new file mode 100644
index 0000000..d065c16
--- /dev/null
+++ b/asyncio/subprocess_stream.py
@@ -0,0 +1,193 @@
+__all__ = ['subprocess_shell', 'subprocess_exec']
+
+from . import base_subprocess
+from . import events
+from . import protocols
+from . import streams
+from . import tasks
+
+class WriteSubprocessPipeStreamProto(base_subprocess.WriteSubprocessPipeProto):
+ def __init__(self, process_transport, fd):
+ base_subprocess.WriteSubprocessPipeProto.__init__(self, process_transport, fd)
+ self._drain_waiter = None
+ self._paused = False
+ self.writer = WritePipeStream(None, self, None)
+
+ def connection_made(self, transport):
+ super().connection_made(transport)
+ self.writer._transport = transport
+ self.writer._loop = transport._loop
+
+ def connection_lost(self, exc):
+ # Also wake up the writing side.
+ if self._paused:
+ waiter = self._drain_waiter
+ if waiter is not None:
+ self._drain_waiter = None
+ if not waiter.done():
+ if exc is None:
+ waiter.set_result(None)
+ else:
+ waiter.set_exception(exc)
+
+ def pause_writing(self):
+ assert not self._paused
+ self._paused = True
+
+ def resume_writing(self):
+ assert self._paused
+ self._paused = False
+ waiter = self._drain_waiter
+ if waiter is not None:
+ self._drain_waiter = None
+ if not waiter.done():
+ waiter.set_result(None)
+
+
+class WritePipeStream:
+ """Wraps a Transport.
+
+ This exposes write(), writelines(), [can_]write_eof(),
+ get_extra_info() and close(). It adds drain() which returns an
+ optional Future on which you can wait for flow control. It also
+ adds a transport property which references the Transport
+ directly.
+ """
+
+ def __init__(self, transport, protocol, loop):
+ self._transport = transport
+ self._protocol = protocol
+ self._loop = loop
+
+ @property
+ def transport(self):
+ return self._transport
+
+ def write(self, data):
+ self._transport.write(data)
+
+ def writelines(self, data):
+ self._transport.writelines(data)
+
+ def write_eof(self):
+ return self._transport.write_eof()
+
+ def can_write_eof(self):
+ return self._transport.can_write_eof()
+
+ def close(self):
+ return self._transport.close()
+
+ def get_extra_info(self, name, default=None):
+ return self._transport.get_extra_info(name, default)
+
+ def drain(self):
+ """This method has an unusual return value.
+
+ The intended use is to write
+
+ w.write(data)
+ yield from w.drain()
+
+ When there's nothing to wait for, drain() returns (), and the
+ yield-from continues immediately. When the transport buffer
+ is full (the protocol is paused), drain() creates and returns
+ a Future and the yield-from will block until that Future is
+ completed, which will happen when the buffer is (partially)
+ drained and the protocol is resumed.
+ """
+ if self._transport._conn_lost: # Uses private variable.
+ raise ConnectionResetError('Connection lost')
+ if not self._protocol._paused:
+ return ()
+ waiter = self._protocol._drain_waiter
+ assert waiter is None or waiter.cancelled()
+ waiter = futures.Future(loop=self._loop)
+ self._protocol._drain_waiter = waiter
+ return waiter
+
+
+class SubprocessStreamProtocol(protocols.SubprocessProtocol):
+ def __init__(self, limit=streams._DEFAULT_LIMIT):
+ self._pipes = {}
+ self.limit = limit
+ self.stdin = None
+ self.stdout = None
+ self.stderr = None
+ self._waiters = []
+ self._returncode = None
+ self._loop = None
+
+ def connection_made(self, transport):
+ self._loop = transport._loop
+ proc = transport._proc
+ if proc.stdout is not None:
+ self.stdout = self._get_protocol(1)._stream_reader
+ if proc.stderr is not None:
+ self.stderr = self._get_protocol(2)._stream_reader
+
+ def get_pipe_reader(self, fd):
+ if fd in self._pipes:
+ return self._pipes[fd]._stream_reader
+ else:
+ return None
+
+ def _get_protocol(self, fd):
+ try:
+ return self._pipes[fd]
+ except KeyError:
+ reader = streams.StreamReader(limit=self.limit)
+ protocol = streams.StreamReaderProtocol(reader, loop=self._loop)
+ self._pipes[fd] = protocol
+ return protocol
+
+ def pipe_data_received(self, fd, data):
+ protocol = self._get_protocol(fd)
+ protocol.data_received(data)
+
+ def pipe_connection_lost(self, fd, exc):
+ protocol = self._get_protocol(fd)
+ protocol.connection_lost(exc)
+
+ @tasks.coroutine
+ def wait(self):
+ """
+ Wait until the process exit and return the process return code.
+ """
+ if self._returncode:
+ return self._returncode
+
+ fut = tasks.Future()
+ self._waiters.append(fut)
+ yield from fut
+ return fut.result()
+
+ def process_exited(self, returncode):
+ self._returncode = returncode
+ # FIXME: not thread safe
+ waiters = self._waiters.copy()
+ self._waiters.clear()
+ for waiter in waiters:
+ waiter.set_result(returncode)
+
+ def pipe_connection_made(self, fd, pipe):
+ if fd == 0:
+ self.stdin = pipe.writer
+
+@tasks.coroutine
+def subprocess_exec(*args, **kwargs):
+ loop = kwargs.pop('loop', None)
+ if loop is None:
+ loop = events.get_event_loop()
+ kwargs['write_pipe_proto_factory'] = WriteSubprocessPipeStreamProto
+ yield from loop.subprocess_exec(SubprocessStreamProtocol, *args, **kwargs)
+
+
+@tasks.coroutine
+def subprocess_shell(*args, **kwargs):
+ loop = kwargs.pop('loop', None)
+ if loop is None:
+ loop = events.get_event_loop()
+ kwargs['write_pipe_protocol_factory'] = WriteSubprocessPipeStreamProto
+ return (yield from loop.subprocess_shell(SubprocessStreamProtocol, *args, **kwargs))
+
diff --git a/asyncio/unix_events.py b/asyncio/unix_events.py
index e6a4d17..d7cda67 100644
--- a/asyncio/unix_events.py
+++ b/asyncio/unix_events.py
@@ -156,9 +156,13 @@ class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
def _make_subprocess_transport(self, protocol, args, shell,
stdin, stdout, stderr, bufsize,
extra=None, **kwargs):
+ read_pipe_protocol_factory = kwargs.pop('read_pipe_protocol_factory', None)
+ write_pipe_protocol_factory = kwargs.pop('write_pipe_protocol_factory', None)
with events.get_child_watcher() as watcher:
transp = _UnixSubprocessTransport(self, protocol, args, shell,
stdin, stdout, stderr, bufsize,
+ read_pipe_protocol_factory,
+ write_pipe_protocol_factory,
extra=None, **kwargs)
yield from transp._post_init()
watcher.add_child_handler(transp.get_pid(),
diff --git a/examples/subprocess_stream.py b/examples/subprocess_stream.py
index 1ece32d..c33cdc7 100644
--- a/examples/subprocess_stream.py
+++ b/examples/subprocess_stream.py
@@ -1,9 +1,8 @@
import asyncio
-from asyncio.base_subprocess import SubprocessStreamProtocol
@asyncio.coroutine
def cat(loop):
- transport, protocol = yield from loop.subprocess_shell(SubprocessStreamProtocol, "cat")
+ transport, protocol = yield from asyncio.subprocess_shell("cat")
print("pid: %s" % transport.get_pid())
stdin = protocol.stdin
stdout = protocol.stdout
@@ -20,7 +19,7 @@ def cat(loop):
@asyncio.coroutine
def ls(loop):
- transport, protocol = yield from loop.subprocess_shell(SubprocessStreamProtocol, "ls", stdin=None)
+ transport, protocol = yield from asyncio.subprocess_shell("ls", stdin=None)
while True:
line = yield from protocol.stdout.readline()
if not line: