diff options
author | Victor Stinner <victor.stinner@gmail.com> | 2014-01-26 03:08:34 +0100 |
---|---|---|
committer | Victor Stinner <victor.stinner@gmail.com> | 2014-01-26 03:08:34 +0100 |
commit | db9648e17f135247751a7f3a87481192070d4e61 (patch) | |
tree | 5c523f4d9f3a34a1beb5e6f961b2bf38d9ad5ccd | |
parent | 0ccc4e2be0c6dfb0ffe5176577a1378667ea0bfa (diff) | |
download | trollius-db9648e17f135247751a7f3a87481192070d4e61.tar.gz |
Move new classes to subprocess_stream.py
-rw-r--r-- | asyncio/__init__.py | 2 | ||||
-rw-r--r-- | asyncio/base_subprocess.py | 201 | ||||
-rw-r--r-- | asyncio/protocols.py | 20 | ||||
-rw-r--r-- | asyncio/subprocess_stream.py | 193 | ||||
-rw-r--r-- | asyncio/unix_events.py | 4 | ||||
-rw-r--r-- | examples/subprocess_stream.py | 5 |
6 files changed, 227 insertions, 198 deletions
diff --git a/asyncio/__init__.py b/asyncio/__init__.py index eb22c38..e24766b 100644 --- a/asyncio/__init__.py +++ b/asyncio/__init__.py @@ -24,6 +24,7 @@ from .locks import * from .protocols import * from .queues import * from .streams import * +from .subprocess_stream import * from .tasks import * from .transports import * @@ -39,5 +40,6 @@ __all__ = (events.__all__ + protocols.__all__ + queues.__all__ + streams.__all__ + + subprocess_stream.__all__ + tasks.__all__ + transports.__all__) diff --git a/asyncio/base_subprocess.py b/asyncio/base_subprocess.py index 3fd60d3..88e3574 100644 --- a/asyncio/base_subprocess.py +++ b/asyncio/base_subprocess.py @@ -3,7 +3,6 @@ import subprocess from . import protocols from . import tasks -from . import streams from . import transports @@ -16,6 +15,7 @@ class BaseSubprocessTransport(transports.SubprocessTransport): def __init__(self, loop, protocol, args, shell, stdin, stdout, stderr, bufsize, + read_pipe_protocol_factory, write_pipe_protocol_factory, extra=None, **kwargs): super().__init__(extra) self._protocol = protocol @@ -31,6 +31,14 @@ class BaseSubprocessTransport(transports.SubprocessTransport): self._pending_calls = collections.deque() self._finished = False self._returncode = None + if read_pipe_protocol_factory is not None: + self._read_pipe_proto_factory = read_pipe_protocol_factory + else: + self._read_pipe_proto_factory = ReadSubprocessPipeProto + if write_pipe_protocol_factory is not None: + self._write_pipe_proto_factory = write_pipe_protocol_factory + else: + self._write_pipe_proto_factory = WriteSubprocessPipeProto self._start(args=args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr, bufsize=bufsize, **kwargs) self._extra['subprocess'] = self._proc @@ -76,11 +84,17 @@ class BaseSubprocessTransport(transports.SubprocessTransport): proc = self._proc loop = self._loop if proc.stdin is not None: - transp, proto = yield from self._protocol.connect_write_pipe(self._loop, self, STDIN, proc.stdin) + yield from loop.connect_write_pipe( + lambda: self._write_pipe_proto_factory(self, STDIN), + proc.stdin) if proc.stdout is not None: - transp, proto = yield from self._protocol.connect_read_pipe(self._loop, self, STDOUT, proc.stdout) + yield from loop.connect_read_pipe( + lambda: self._read_pipe_proto_factory(self, STDOUT), + proc.stdout) if proc.stderr is not None: - transp, proto = yield from self._protocol.connect_read_pipe(self._loop, self, STDERR, proc.stderr) + yield from loop.connect_read_pipe( + lambda: self._read_pipe_proto_factory(self, STDERR), + proc.stderr) if not self._pipes: self._try_connected() @@ -98,6 +112,10 @@ class BaseSubprocessTransport(transports.SubprocessTransport): self._loop.call_soon(callback, *data) self._pending_calls = None + def _pipe_connection_made(self, fd, pipe): + self._try_connected() + self._protocol.pipe_connection_made(fd, pipe) + def _pipe_connection_lost(self, fd, exc): self._call(self._protocol.pipe_connection_lost, fd, exc) self._try_finish() @@ -144,7 +162,7 @@ class WriteSubprocessPipeProto(protocols.BaseProtocol): def connection_made(self, transport): self.connected = True self.pipe = transport - self.proc._try_connected() + self.proc._pipe_connection_made(self.fd, self) def connection_lost(self, exc): self.disconnected = True @@ -161,176 +179,3 @@ class ReadSubprocessPipeProto(WriteSubprocessPipeProto, self.proc._pipe_data_received(self.fd, data) -class WriteSubprocessPipeStreamProto(WriteSubprocessPipeProto): - def __init__(self, process_transport, fd): - WriteSubprocessPipeProto.__init__(self, process_transport, fd) - self._drain_waiter = None - self._paused = False - - def connection_lost(self, exc): - # Also wake up the writing side. - if self._paused: - waiter = self._drain_waiter - if waiter is not None: - self._drain_waiter = None - if not waiter.done(): - if exc is None: - waiter.set_result(None) - else: - waiter.set_exception(exc) - - def pause_writing(self): - assert not self._paused - self._paused = True - - def resume_writing(self): - assert self._paused - self._paused = False - waiter = self._drain_waiter - if waiter is not None: - self._drain_waiter = None - if not waiter.done(): - waiter.set_result(None) - - -class WritePipeStream: - """Wraps a Transport. - - This exposes write(), writelines(), [can_]write_eof(), - get_extra_info() and close(). It adds drain() which returns an - optional Future on which you can wait for flow control. It also - adds a transport property which references the Transport - directly. - """ - - def __init__(self, transport, protocol, loop): - self._transport = transport - self._protocol = protocol - self._loop = loop - - @property - def transport(self): - return self._transport - - def write(self, data): - self._transport.write(data) - - def writelines(self, data): - self._transport.writelines(data) - - def write_eof(self): - return self._transport.write_eof() - - def can_write_eof(self): - return self._transport.can_write_eof() - - def close(self): - return self._transport.close() - - def get_extra_info(self, name, default=None): - return self._transport.get_extra_info(name, default) - - def drain(self): - """This method has an unusual return value. - - The intended use is to write - - w.write(data) - yield from w.drain() - - When there's nothing to wait for, drain() returns (), and the - yield-from continues immediately. When the transport buffer - is full (the protocol is paused), drain() creates and returns - a Future and the yield-from will block until that Future is - completed, which will happen when the buffer is (partially) - drained and the protocol is resumed. - """ - if self._transport._conn_lost: # Uses private variable. - raise ConnectionResetError('Connection lost') - if not self._protocol._paused: - return () - waiter = self._protocol._drain_waiter - assert waiter is None or waiter.cancelled() - waiter = futures.Future(loop=self._loop) - self._protocol._drain_waiter = waiter - return waiter - - -class SubprocessStreamProtocol(protocols.SubprocessProtocol): - def __init__(self, limit=streams._DEFAULT_LIMIT): - self._pipes = {} - self.limit = limit - self.stdin = None - self.stdout = None - self.stderr = None - self._waiters = [] - self._returncode = None - self._loop = None - - def connection_made(self, transport): - self._loop = transport._loop - proc = transport._proc - if proc.stdout is not None: - self.stdout = self._get_protocol(1)._stream_reader - if proc.stderr is not None: - self.stderr = self._get_protocol(2)._stream_reader - - def get_pipe_reader(self, fd): - if fd in self._pipes: - return self._pipes[fd]._stream_reader - else: - return None - - def _get_protocol(self, fd): - try: - return self._pipes[fd] - except KeyError: - reader = streams.StreamReader(limit=self.limit) - protocol = streams.StreamReaderProtocol(reader, loop=self._loop) - self._pipes[fd] = protocol - return protocol - - def pipe_data_received(self, fd, data): - protocol = self._get_protocol(fd) - protocol.data_received(data) - - def pipe_connection_lost(self, fd, exc): - protocol = self._get_protocol(fd) - protocol.connection_lost(exc) - - @tasks.coroutine - def wait(self): - """ - Wait until the process exit and return the process return code. - """ - if self._returncode: - return self._returncode - - fut = tasks.Future() - self._waiters.append(fut) - yield from fut - return fut.result() - - def process_exited(self, returncode): - self._returncode = returncode - # FIXME: not thread safe - waiters = self._waiters.copy() - self._waiters.clear() - for waiter in waiters: - waiter.set_result(returncode) - - # FIXME: remove loop - @tasks.coroutine - def connect_read_pipe(self, loop, transport, fd, pipe): - return (yield from loop.connect_read_pipe( - lambda: ReadSubprocessPipeProto(transport, fd), - pipe)) - - @tasks.coroutine - def connect_write_pipe(self, loop, process_transport, fd, pipe): - transport, protocol = yield from loop.connect_write_pipe(lambda: WriteSubprocessPipeStreamProto(process_transport, fd), pipe) - writer = WritePipeStream(transport, protocol, loop) - if fd == 0: - self.stdin = writer - return transport, writer - diff --git a/asyncio/protocols.py b/asyncio/protocols.py index 9fe975c..238789b 100644 --- a/asyncio/protocols.py +++ b/asyncio/protocols.py @@ -1,6 +1,4 @@ """Abstract Protocol class.""" -# FIXME: don't create tasks in protocols -from . import tasks __all__ = ['BaseProtocol', 'Protocol', 'DatagramProtocol', 'SubprocessProtocol'] @@ -113,6 +111,9 @@ class DatagramProtocol(BaseProtocol): class SubprocessProtocol(BaseProtocol): """Interface for protocol for subprocess calls.""" + def pipe_connection_made(self, fd, pipe): + pass + def pipe_data_received(self, fd, data): """Called when the subprocess writes data into stdout/stderr pipe. @@ -130,18 +131,3 @@ class SubprocessProtocol(BaseProtocol): def process_exited(self, returncode): """Called when subprocess has exited.""" - # FIXME: remove loop - @tasks.coroutine - def connect_read_pipe(self, loop, transport, fd, pipe): - from . import base_subprocess - return (yield from loop.connect_read_pipe( - lambda: base_subprocess.ReadSubprocessPipeProto(transport, fd), - pipe)) - - @tasks.coroutine - def connect_write_pipe(self, loop, transport, fd, pipe): - from . import base_subprocess - return (yield from loop.connect_write_pipe( - lambda: base_subprocess.WriteSubprocessPipeProto(transport, fd), - pipe)) - diff --git a/asyncio/subprocess_stream.py b/asyncio/subprocess_stream.py new file mode 100644 index 0000000..d065c16 --- /dev/null +++ b/asyncio/subprocess_stream.py @@ -0,0 +1,193 @@ +__all__ = ['subprocess_shell', 'subprocess_exec'] + +from . import base_subprocess +from . import events +from . import protocols +from . import streams +from . import tasks + +class WriteSubprocessPipeStreamProto(base_subprocess.WriteSubprocessPipeProto): + def __init__(self, process_transport, fd): + base_subprocess.WriteSubprocessPipeProto.__init__(self, process_transport, fd) + self._drain_waiter = None + self._paused = False + self.writer = WritePipeStream(None, self, None) + + def connection_made(self, transport): + super().connection_made(transport) + self.writer._transport = transport + self.writer._loop = transport._loop + + def connection_lost(self, exc): + # Also wake up the writing side. + if self._paused: + waiter = self._drain_waiter + if waiter is not None: + self._drain_waiter = None + if not waiter.done(): + if exc is None: + waiter.set_result(None) + else: + waiter.set_exception(exc) + + def pause_writing(self): + assert not self._paused + self._paused = True + + def resume_writing(self): + assert self._paused + self._paused = False + waiter = self._drain_waiter + if waiter is not None: + self._drain_waiter = None + if not waiter.done(): + waiter.set_result(None) + + +class WritePipeStream: + """Wraps a Transport. + + This exposes write(), writelines(), [can_]write_eof(), + get_extra_info() and close(). It adds drain() which returns an + optional Future on which you can wait for flow control. It also + adds a transport property which references the Transport + directly. + """ + + def __init__(self, transport, protocol, loop): + self._transport = transport + self._protocol = protocol + self._loop = loop + + @property + def transport(self): + return self._transport + + def write(self, data): + self._transport.write(data) + + def writelines(self, data): + self._transport.writelines(data) + + def write_eof(self): + return self._transport.write_eof() + + def can_write_eof(self): + return self._transport.can_write_eof() + + def close(self): + return self._transport.close() + + def get_extra_info(self, name, default=None): + return self._transport.get_extra_info(name, default) + + def drain(self): + """This method has an unusual return value. + + The intended use is to write + + w.write(data) + yield from w.drain() + + When there's nothing to wait for, drain() returns (), and the + yield-from continues immediately. When the transport buffer + is full (the protocol is paused), drain() creates and returns + a Future and the yield-from will block until that Future is + completed, which will happen when the buffer is (partially) + drained and the protocol is resumed. + """ + if self._transport._conn_lost: # Uses private variable. + raise ConnectionResetError('Connection lost') + if not self._protocol._paused: + return () + waiter = self._protocol._drain_waiter + assert waiter is None or waiter.cancelled() + waiter = futures.Future(loop=self._loop) + self._protocol._drain_waiter = waiter + return waiter + + +class SubprocessStreamProtocol(protocols.SubprocessProtocol): + def __init__(self, limit=streams._DEFAULT_LIMIT): + self._pipes = {} + self.limit = limit + self.stdin = None + self.stdout = None + self.stderr = None + self._waiters = [] + self._returncode = None + self._loop = None + + def connection_made(self, transport): + self._loop = transport._loop + proc = transport._proc + if proc.stdout is not None: + self.stdout = self._get_protocol(1)._stream_reader + if proc.stderr is not None: + self.stderr = self._get_protocol(2)._stream_reader + + def get_pipe_reader(self, fd): + if fd in self._pipes: + return self._pipes[fd]._stream_reader + else: + return None + + def _get_protocol(self, fd): + try: + return self._pipes[fd] + except KeyError: + reader = streams.StreamReader(limit=self.limit) + protocol = streams.StreamReaderProtocol(reader, loop=self._loop) + self._pipes[fd] = protocol + return protocol + + def pipe_data_received(self, fd, data): + protocol = self._get_protocol(fd) + protocol.data_received(data) + + def pipe_connection_lost(self, fd, exc): + protocol = self._get_protocol(fd) + protocol.connection_lost(exc) + + @tasks.coroutine + def wait(self): + """ + Wait until the process exit and return the process return code. + """ + if self._returncode: + return self._returncode + + fut = tasks.Future() + self._waiters.append(fut) + yield from fut + return fut.result() + + def process_exited(self, returncode): + self._returncode = returncode + # FIXME: not thread safe + waiters = self._waiters.copy() + self._waiters.clear() + for waiter in waiters: + waiter.set_result(returncode) + + def pipe_connection_made(self, fd, pipe): + if fd == 0: + self.stdin = pipe.writer + +@tasks.coroutine +def subprocess_exec(*args, **kwargs): + loop = kwargs.pop('loop', None) + if loop is None: + loop = events.get_event_loop() + kwargs['write_pipe_proto_factory'] = WriteSubprocessPipeStreamProto + yield from loop.subprocess_exec(SubprocessStreamProtocol, *args, **kwargs) + + +@tasks.coroutine +def subprocess_shell(*args, **kwargs): + loop = kwargs.pop('loop', None) + if loop is None: + loop = events.get_event_loop() + kwargs['write_pipe_protocol_factory'] = WriteSubprocessPipeStreamProto + return (yield from loop.subprocess_shell(SubprocessStreamProtocol, *args, **kwargs)) + diff --git a/asyncio/unix_events.py b/asyncio/unix_events.py index e6a4d17..d7cda67 100644 --- a/asyncio/unix_events.py +++ b/asyncio/unix_events.py @@ -156,9 +156,13 @@ class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop): def _make_subprocess_transport(self, protocol, args, shell, stdin, stdout, stderr, bufsize, extra=None, **kwargs): + read_pipe_protocol_factory = kwargs.pop('read_pipe_protocol_factory', None) + write_pipe_protocol_factory = kwargs.pop('write_pipe_protocol_factory', None) with events.get_child_watcher() as watcher: transp = _UnixSubprocessTransport(self, protocol, args, shell, stdin, stdout, stderr, bufsize, + read_pipe_protocol_factory, + write_pipe_protocol_factory, extra=None, **kwargs) yield from transp._post_init() watcher.add_child_handler(transp.get_pid(), diff --git a/examples/subprocess_stream.py b/examples/subprocess_stream.py index 1ece32d..c33cdc7 100644 --- a/examples/subprocess_stream.py +++ b/examples/subprocess_stream.py @@ -1,9 +1,8 @@ import asyncio -from asyncio.base_subprocess import SubprocessStreamProtocol @asyncio.coroutine def cat(loop): - transport, protocol = yield from loop.subprocess_shell(SubprocessStreamProtocol, "cat") + transport, protocol = yield from asyncio.subprocess_shell("cat") print("pid: %s" % transport.get_pid()) stdin = protocol.stdin stdout = protocol.stdout @@ -20,7 +19,7 @@ def cat(loop): @asyncio.coroutine def ls(loop): - transport, protocol = yield from loop.subprocess_shell(SubprocessStreamProtocol, "ls", stdin=None) + transport, protocol = yield from asyncio.subprocess_shell("ls", stdin=None) while True: line = yield from protocol.stdout.readline() if not line: |