From ea6b4e215be5da305bde53aa84fd11148ec3d1b0 Mon Sep 17 00:00:00 2001 From: Victor Stinner Date: Sat, 1 Feb 2014 22:46:32 +0100 Subject: Merge (manually) the subprocess_stream into default * Add a new asyncio.subprocess module * Add new create_subprocess_exec() and create_subprocess_shell() functions * The new asyncio.subprocess.SubprocessStreamProtocol creates stream readers for stdout and stderr and a stream writer for stdin. * The new asyncio.subprocess.Process class offers an API close to the subprocess.Popen class: - pid, returncode, stdin, stdout and stderr attributes - communicate(), wait(), send_signal(), terminate() and kill() methods * Remove STDIN (0), STDOUT (1) and STDERR (2) constants from base_subprocess and unix_events, to not be confused with the symbols with the same name of subprocess and asyncio.subprocess modules * _ProactorBasePipeTransport.get_write_buffer_size() now counts also the size of the pending write * _ProactorBaseWritePipeTransport._loop_writing() may now pause the protocol if the write buffer size is greater than the high water mark (64 KB by default) * Add new subprocess examples: shell.py, subprocess_shell.py, * subprocess_attach_read_pipe.py and subprocess_attach_write_pipe.py --- examples/child_process.py | 4 +- examples/shell.py | 50 +++++++++++++++++++ examples/subprocess_attach_read_pipe.py | 33 +++++++++++++ examples/subprocess_attach_write_pipe.py | 33 +++++++++++++ examples/subprocess_shell.py | 85 ++++++++++++++++++++++++++++++++ 5 files changed, 204 insertions(+), 1 deletion(-) create mode 100644 examples/shell.py create mode 100644 examples/subprocess_attach_read_pipe.py create mode 100644 examples/subprocess_attach_write_pipe.py create mode 100644 examples/subprocess_shell.py (limited to 'examples') diff --git a/examples/child_process.py b/examples/child_process.py index 4410414..0c12cb9 100644 --- a/examples/child_process.py +++ b/examples/child_process.py @@ -1,7 +1,9 @@ """ Example of asynchronous interaction with a child python process. -Note that on Windows we must use the IOCP event loop. +This example shows how to attach an existing Popen object and use the low level +transport-protocol API. See shell.py and subprocess_shell.py for higher level +examples. """ import os diff --git a/examples/shell.py b/examples/shell.py new file mode 100644 index 0000000..e094b61 --- /dev/null +++ b/examples/shell.py @@ -0,0 +1,50 @@ +"""Examples using create_subprocess_exec() and create_subprocess_shell().""" +import logging; logging.basicConfig() + +import asyncio +import signal +from asyncio.subprocess import PIPE + +@asyncio.coroutine +def cat(loop): + proc = yield from asyncio.create_subprocess_shell("cat", + stdin=PIPE, + stdout=PIPE) + print("pid: %s" % proc.pid) + + message = "Hello World!" + print("cat write: %r" % message) + + stdout, stderr = yield from proc.communicate(message.encode('ascii')) + print("cat read: %r" % stdout.decode('ascii')) + + exitcode = yield from proc.wait() + print("(exit code %s)" % exitcode) + +@asyncio.coroutine +def ls(loop): + proc = yield from asyncio.create_subprocess_exec("ls", + stdout=PIPE) + while True: + line = yield from proc.stdout.readline() + if not line: + break + print("ls>>", line.decode('ascii').rstrip()) + try: + proc.send_signal(signal.SIGINT) + except ProcessLookupError: + pass + +@asyncio.coroutine +def test_call(*args, timeout=None): + try: + proc = yield from asyncio.create_subprocess_exec(*args) + exitcode = yield from asyncio.wait_for(proc.wait(), timeout) + print("%s: exit code %s" % (' '.join(args), exitcode)) + except asyncio.TimeoutError: + print("timeout! (%.1f sec)" % timeout) + +loop = asyncio.get_event_loop() +loop.run_until_complete(cat(loop)) +loop.run_until_complete(ls(loop)) +loop.run_until_complete(test_call("bash", "-c", "sleep 3", timeout=1.0)) diff --git a/examples/subprocess_attach_read_pipe.py b/examples/subprocess_attach_read_pipe.py new file mode 100644 index 0000000..8bec652 --- /dev/null +++ b/examples/subprocess_attach_read_pipe.py @@ -0,0 +1,33 @@ +#!/usr/bin/env python3 +"""Example showing how to attach a read pipe to a subprocess.""" +import asyncio +import os, sys +from asyncio import subprocess + +code = """ +import os, sys +fd = int(sys.argv[1]) +data = os.write(fd, b'data') +os.close(fd) +""" + +loop = asyncio.get_event_loop() + +@asyncio.coroutine +def task(): + rfd, wfd = os.pipe() + args = [sys.executable, '-c', code, str(wfd)] + + pipe = open(rfd, 'rb', 0) + reader = asyncio.StreamReader(loop=loop) + protocol = asyncio.StreamReaderProtocol(reader, loop=loop) + transport, _ = yield from loop.connect_read_pipe(lambda: protocol, pipe) + + proc = yield from asyncio.create_subprocess_exec(*args, pass_fds={wfd}) + yield from proc.wait() + + os.close(wfd) + data = yield from reader.read() + print("read = %r" % data.decode()) + +loop.run_until_complete(task()) diff --git a/examples/subprocess_attach_write_pipe.py b/examples/subprocess_attach_write_pipe.py new file mode 100644 index 0000000..017b827 --- /dev/null +++ b/examples/subprocess_attach_write_pipe.py @@ -0,0 +1,33 @@ +#!/usr/bin/env python3 +"""Example showing how to attach a write pipe to a subprocess.""" +import asyncio +import os, sys +from asyncio import subprocess + +code = """ +import os, sys +fd = int(sys.argv[1]) +data = os.read(fd, 1024) +sys.stdout.buffer.write(data) +""" + +loop = asyncio.get_event_loop() + +@asyncio.coroutine +def task(): + rfd, wfd = os.pipe() + args = [sys.executable, '-c', code, str(rfd)] + proc = yield from asyncio.create_subprocess_exec( + *args, + pass_fds={rfd}, + stdout=subprocess.PIPE) + + pipe = open(wfd, 'wb', 0) + transport, _ = yield from loop.connect_write_pipe(asyncio.Protocol, + pipe) + transport.write(b'data') + + stdout, stderr = yield from proc.communicate() + print("stdout = %r" % stdout.decode()) + +loop.run_until_complete(task()) diff --git a/examples/subprocess_shell.py b/examples/subprocess_shell.py new file mode 100644 index 0000000..d0e5d65 --- /dev/null +++ b/examples/subprocess_shell.py @@ -0,0 +1,85 @@ +"""Example writing to and reading from a subprocess at the same time using +tasks.""" + +import asyncio +import os +from asyncio.subprocess import PIPE + + +@asyncio.coroutine +def send_input(writer, input): + try: + for line in input: + print('sending', len(line), 'bytes') + writer.write(line) + d = writer.drain() + if d: + print('pause writing') + yield from d + print('resume writing') + writer.close() + except BrokenPipeError: + print('stdin: broken pipe error') + except ConnectionResetError: + print('stdin: connection reset error') + +@asyncio.coroutine +def log_errors(reader): + while True: + line = yield from reader.readline() + if not line: + break + print('ERROR', repr(line)) + +@asyncio.coroutine +def read_stdout(stdout): + while True: + line = yield from stdout.readline() + print('received', repr(line)) + if not line: + break + +@asyncio.coroutine +def start(cmd, input=None, **kwds): + kwds['stdout'] = PIPE + kwds['stderr'] = PIPE + if input is None and 'stdin' not in kwds: + kwds['stdin'] = None + else: + kwds['stdin'] = PIPE + proc = yield from asyncio.create_subprocess_shell(cmd, **kwds) + + tasks = [] + if input is not None: + tasks.append(send_input(proc.stdin, input)) + else: + print('No stdin') + if proc.stderr is not None: + tasks.append(log_errors(proc.stderr)) + else: + print('No stderr') + if proc.stdout is not None: + tasks.append(read_stdout(proc.stdout)) + else: + print('No stdout') + + if tasks: + # feed stdin while consuming stdout to avoid hang + # when stdin pipe is full + yield from asyncio.wait(tasks) + + exitcode = yield from proc.wait() + print("exit code: %s" % exitcode) + + +def main(): + if os.name == 'nt': + loop = asyncio.ProactorEventLoop() + asyncio.set_event_loop(loop) + else: + loop = asyncio.get_event_loop() + loop.run_until_complete(start('sleep 2; wc', input=[b'foo bar baz\n'*300 for i in range(100)])) + + +if __name__ == '__main__': + main() -- cgit v1.2.1