summaryrefslogtreecommitdiff
path: root/examples/child_process.py
blob: 3fac175e08b5d809e7ee351f8ec2814988c4c006 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
"""
Example of asynchronous interaction with a child python process.

This example shows how to attach an existing Popen object and use the low level
transport-protocol API. See shell.py and subprocess_shell.py for higher level
examples.
"""

import os
import sys

try:
    import asyncio
except ImportError:
    # asyncio is not installed
    sys.path.append(os.path.join(os.path.dirname(__file__), '..'))
    import asyncio

if sys.platform == 'win32':
    from asyncio.windows_utils import Popen, PIPE
    from asyncio.windows_events import ProactorEventLoop
else:
    from subprocess import Popen, PIPE

#
# Return a write-only transport wrapping a writable pipe
#

@asyncio.coroutine
def connect_write_pipe(file):
    loop = asyncio.get_event_loop()
    transport, _ =  yield from loop.connect_write_pipe(asyncio.Protocol, file)
    return transport

#
# Wrap a readable pipe in a stream
#

@asyncio.coroutine
def connect_read_pipe(file):
    loop = asyncio.get_event_loop()
    stream_reader = asyncio.StreamReader(loop=loop)
    def factory():
        return asyncio.StreamReaderProtocol(stream_reader)
    transport, _ = yield from loop.connect_read_pipe(factory, file)
    return stream_reader, transport


#
# Example
#

@asyncio.coroutine
def main(loop):
    # program which prints evaluation of each expression from stdin
    code = r'''if 1:
                   import os
                   def writeall(fd, buf):
                       while buf:
                           n = os.write(fd, buf)
                           buf = buf[n:]
                   while True:
                       s = os.read(0, 1024)
                       if not s:
                           break
                       s = s.decode('ascii')
                       s = repr(eval(s)) + '\n'
                       s = s.encode('ascii')
                       writeall(1, s)
                   '''

    # commands to send to input
    commands = iter([b"1+1\n",
                     b"2**16\n",
                     b"1/3\n",
                     b"'x'*50",
                     b"1/0\n"])

    # start subprocess and wrap stdin, stdout, stderr
    p = Popen([sys.executable, '-c', code],
              stdin=PIPE, stdout=PIPE, stderr=PIPE)

    stdin = yield from connect_write_pipe(p.stdin)
    stdout, stdout_transport = yield from connect_read_pipe(p.stdout)
    stderr, stderr_transport = yield from connect_read_pipe(p.stderr)

    # interact with subprocess
    name = {stdout:'OUT', stderr:'ERR'}
    registered = {asyncio.Task(stderr.readline()): stderr,
                  asyncio.Task(stdout.readline()): stdout}
    while registered:
        # write command
        cmd = next(commands, None)
        if cmd is None:
            stdin.close()
        else:
            print('>>>', cmd.decode('ascii').rstrip())
            stdin.write(cmd)

        # get and print lines from stdout, stderr
        timeout = None
        while registered:
            done, pending = yield from asyncio.wait(
                registered, timeout=timeout,
                return_when=asyncio.FIRST_COMPLETED)
            if not done:
                break
            for f in done:
                stream = registered.pop(f)
                res = f.result()
                print(name[stream], res.decode('ascii').rstrip())
                if res != b'':
                    registered[asyncio.Task(stream.readline())] = stream
            timeout = 0.0

    stdout_transport.close()
    stderr_transport.close()

if __name__ == '__main__':
    if sys.platform == 'win32':
        loop = ProactorEventLoop()
        asyncio.set_event_loop(loop)
    else:
        loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(main(loop))
    finally:
        loop.close()