summaryrefslogtreecommitdiff
path: root/pexpect/popen_spawn.py
blob: 0a186fe7fdad35cb1b3216b46277a662e1026b60 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
"""Spawn interface using subprocess.Popen
"""
import os
import threading
import subprocess
import sys
import time
import signal
import shlex

try:
    from queue import Queue, Empty  # Python 3
except ImportError:
    from Queue import Queue, Empty  # Python 2

from .spawnbase import SpawnBase, SpawnBaseUnicode, PY3
from .exceptions import EOF

class PopenSpawn(SpawnBase):
    if PY3:
        crlf = '\n'.encode('ascii')
    else:
        crlf = '\n'

    def __init__(self, cmd, timeout=30, maxread=2000, searchwindowsize=None,
                 logfile=None, cwd=None,  env=None):
        super(PopenSpawn, self).__init__(timeout=timeout, maxread=maxread,
                searchwindowsize=searchwindowsize, logfile=logfile)
                
        kwargs = dict(bufsize=0, stdin=subprocess.PIPE,
                      stderr=subprocess.STDOUT, stdout=subprocess.PIPE,
                      cwd=cwd, env=env)

        if sys.platform == 'win32':
            startupinfo = subprocess.STARTUPINFO()
            startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
            kwargs['startupinfo'] = startupinfo
            kwargs['creationflags'] = subprocess.CREATE_NEW_PROCESS_GROUP

        if not isinstance(cmd, (list, tuple)):
            cmd = shlex.split(cmd)

        self.proc = subprocess.Popen(cmd, **kwargs)
        self.closed = False
        self._buf = ''

        self._read_queue = Queue()
        self._read_thread = threading.Thread(target=self._read_incoming)
        self._read_thread.setDaemon(True)
        self._read_thread.start()

    def read_nonblocking(self, size, timeout):
        if self.closed:
            raise ValueError('I/O operation on closed file.')
        elif self.flag_eof:
            self.closed = True
            raise EOF('End Of File (EOF).')

        if timeout == -1:
            timeout = self.timeout
        elif timeout is None:
            timeout = 1e6

        t0 = time.time()
        buf = b''
        while (time.time() - t0) < timeout and size and len(buf) < size:
            try:
                incoming = self._read_queue.get_nowait()
            except Empty:
                break
            else:
                if incoming is None:
                    self.flag_eof = True
                    raise EOF('End of File')

                buf += incoming

        if len(buf) > size:
            self.buffer = buf[size:]
            buf = buf[:size]

        self._log(buf, 'read')
        return buf

    def _read_incoming(self):
        """Run in a thread to move output from a pipe to a queue."""
        fileno = self.proc.stdout.fileno()
        while 1:
            buf = ''
            try:
                buf = os.read(fileno, 1024)
            except OSError as e:
                self._log(e, 'read')

            if not buf:
                self._read_queue.put(None)
                return

            self._read_queue.put(buf)
            time.sleep(0.001)

    def write(self, s):
        '''This is similar to send() except that there is no return value.
        '''
        self.send(s)

    def writelines(self, sequence):
        '''This calls write() for each element in the sequence.
        
        The sequence can be any iterable object producing strings, typically a
        list of strings. This does not add line separators. There is no return
        value.
        '''
        for s in sequence:
            self.send(s)

    def _send(self, s):
        return self.proc.stdin.write(s)

    def send(self, s):
        s = self._coerce_send_string(s)
        self._log(s, 'send')

        return self._send(s)

    def sendline(self, s=''):
        '''Wraps send(), sending string ``s`` to child process, with os.linesep
        automatically appended. Returns number of bytes written. '''

        n = self.send(s)
        n = n + self.send(self.linesep)
        return n

    def wait(self):
        status = self.proc.wait()
        if status >= 0:
            self.exitstatus = status
            self.signalstatus = None
        else:
            self.exitstatus = None
            self.signalstatus = -status
        self.terminated = True
        return status

    def kill(self, sig):
        if sys.platform == 'win32':
            if sig in [signal.SIGINT, signal.CTRL_C_EVENT]:
                sig = signal.CTRL_C_EVENT
            elif sig in [signal.SIGBREAK, signal.CTRL_BREAK_EVENT]:
                sig = signal.CTRL_BREAK_EVENT
            else:
                sig = signal.SIGTERM

        os.kill(self.proc.pid, sig)


class PopenSpawnUnicode(SpawnBaseUnicode, PopenSpawn):
    def _send(self, s):
        super(PopenSpawnUnicode, self)._send(s.encode(self.encoding, self.errors))