From eb2f818015157e74d300d271bcd0e0575c4df97e Mon Sep 17 00:00:00 2001 From: jquast Date: Sat, 7 Jun 2014 21:25:20 -0700 Subject: Improve EOF through test -> interact -> echo_prompt --- tests/echo_w_prompt.py | 8 +++-- tests/interact.py | 4 ++- tests/interact_unicode.py | 5 ++- tests/test_interact.py | 86 ++++++++++++++++++++++++++++------------------- 4 files changed, 65 insertions(+), 38 deletions(-) diff --git a/tests/echo_w_prompt.py b/tests/echo_w_prompt.py index 0706688..3c80553 100644 --- a/tests/echo_w_prompt.py +++ b/tests/echo_w_prompt.py @@ -7,5 +7,9 @@ except NameError: raw_input = input while True: - a = raw_input('') - print('', a, sep='') \ No newline at end of file + try: + a = raw_input('') + except EOFError: + print('') + break + print('', a, sep='') diff --git a/tests/interact.py b/tests/interact.py index 60e48c9..9f8e672 100755 --- a/tests/interact.py +++ b/tests/interact.py @@ -29,8 +29,10 @@ from utils import no_coverage_env import pexpect import sys + def main(): - p = pexpect.spawn(sys.executable + ' echo_w_prompt.py', env=no_coverage_env()) + p = pexpect.spawn(sys.executable + ' echo_w_prompt.py', + env=no_coverage_env()) p.interact() print("Escaped interact") diff --git a/tests/interact_unicode.py b/tests/interact_unicode.py index 93426dc..f4c1f55 100644 --- a/tests/interact_unicode.py +++ b/tests/interact_unicode.py @@ -13,9 +13,12 @@ from utils import no_coverage_env import pexpect import sys + def main(): - p = pexpect.spawnu(sys.executable + ' echo_w_prompt.py', env=no_coverage_env()) + p = pexpect.spawnu(sys.executable + ' echo_w_prompt.py', + env=no_coverage_env()) p.interact() + print("Escaped interact") if __name__ == '__main__': main() diff --git a/tests/test_interact.py b/tests/test_interact.py index 623201b..4854ff1 100755 --- a/tests/test_interact.py +++ b/tests/test_interact.py @@ -27,49 +27,67 @@ import pexpect import unittest from . import PexpectTestCase + class InteractTestCase (PexpectTestCase.PexpectTestCase): def setUp(self): super(InteractTestCase, self).setUp() - self.env = os.environ.copy() - # Ensure that Pexpect is importable by the subprocesses. - self.env['PYTHONPATH'] = self.project_dir + os.pathsep + os.environ.get('PYTHONPATH', '') + self.save_pythonpath = os.getenv('PYTHONPATH') + + # Ensure 'import pexpect' works in subprocess interact*.py + if not self.save_pythonpath: + os.putenv('PYTHONPATH', self.project_dir) + else: + os.putenv('PYTHONPATH', os.pathsep.join((self.project_dir, + self.save_pythonpath))) + + self.interact_py = ' '.join((self.PYTHONBIN, + 'interact.py',)) + self.interact_ucs_py = ' '.join((self.PYTHONBIN, + 'interact_unicode.py',)) - def test_interact (self): - p = pexpect.spawn(str('%s interact.py' % (self.PYTHONBIN,)), env=self.env) + def tearDown(self): + os.putenv('PYTHONPATH', self.save_pythonpath or '') + + def test_interact_escape(self): + " Ensure `escape_character' value exits interactive mode. " + p = pexpect.spawn(self.interact_py, timeout=5) p.expect('') - p.sendline (b'Hello') - p.sendline (b'there') - p.sendline (b'Mr. Python') - p.expect (b'Hello') - p.expect (b'there') - p.expect (b'Mr. Python') - p.sendcontrol(']') + p.sendcontrol(']') # chr(29), the default `escape_character' + # value of pexpect.interact(). p.expect_exact('Escaped interact') - assert p.isalive() - p.sendeof () - p.expect (pexpect.EOF) + p.expect(pexpect.EOF) assert not p.isalive() - assert p.exitstatus == 0, (p.exitstatus, p.before) + assert p.exitstatus == 0 - def test_interact_unicode (self): - p = pexpect.spawnu(str('%s interact_unicode.py' % (self.PYTHONBIN,)), env=self.env) - try: - p.expect('') - p.sendline ('Hello') - p.sendline ('theré') - p.sendline ('Mr. Pyþon') - p.expect ('Hello') - p.expect ('theré') - p.expect ('Mr. Pyþon') - assert p.isalive() - p.sendeof () - p.expect (pexpect.EOF) - assert not p.isalive() - assert p.exitstatus == 0, (p.exitstatus, p.before) - except: - print(p.before) - raise + def test_interact_spawn_eof(self): + " Ensure subprocess receives EOF and exit. " + p = pexpect.spawn(self.interact_py, timeout=5) + p.expect('') + p.sendline(b'alpha') + p.sendline(b'beta') + p.expect(b'alpha') + p.expect(b'beta') + p.sendeof() + p.expect_exact('') + p.expect_exact('Escaped interact') + p.expect(pexpect.EOF) + assert not p.isalive() + assert p.exitstatus == 0 + def test_interact_spawnu_eof(self): + " Ensure subprocess receives unicode, EOF, and exit. " + p = pexpect.spawnu(self.interact_ucs_py, timeout=5) + p.expect('') + p.sendline(u'ɑlpha') + p.sendline(u'Βeta') + p.expect(u'ɑlpha') + p.expect(u'Βeta') + p.sendeof() + p.expect_exact('') + p.expect_exact('Escaped interact') + p.expect(pexpect.EOF) + assert not p.isalive() + assert p.exitstatus == 0 if __name__ == '__main__': unittest.main() -- cgit v1.2.1 From 6e454ece1448b2dc6cf43a86fc01a5f3a1ae2803 Mon Sep 17 00:00:00 2001 From: jquast Date: Sat, 7 Jun 2014 21:35:16 -0700 Subject: Document and detect EOF condition in interact() --- doc/history.rst | 3 +++ pexpect/__init__.py | 41 +++++++++++++++-------------------------- 2 files changed, 18 insertions(+), 26 deletions(-) diff --git a/doc/history.rst b/doc/history.rst index 81ad48b..5b26521 100644 --- a/doc/history.rst +++ b/doc/history.rst @@ -14,6 +14,9 @@ Version 3.3 * Removed the ``pexpect.psh`` module. This was never documented, and we found no evidence that people use it. The new :mod:`pexpect.replwrap` module provides a more flexible alternative. +* Fixed issue where EOF was not correctly detected in ``interact()``, causing + a repeating loop of output on Linux, and blocking before EOF on BSD and + Solaris (:ghissue:`49`). Version 3.2 ``````````` diff --git a/pexpect/__init__.py b/pexpect/__init__.py index f27e3fe..2ce0b9d 100644 --- a/pexpect/__init__.py +++ b/pexpect/__init__.py @@ -911,12 +911,14 @@ class spawn(object): if self.child_fd in r: try: s = os.read(self.child_fd, size) - except OSError: - # Linux does this - self.flag_eof = True - raise EOF('End Of File (EOF). Exception style platform.') + except OSError, err: + if err.args[0] == errno.EIO: + # Linux-style EOF + self.flag_eof = True + raise EOF('End Of File (EOF). Exception style platform.') + raise if s == b'': - # BSD style + # BSD-style EOF self.flag_eof = True raise EOF('End Of File (EOF). Empty string style platform.') @@ -1072,23 +1074,6 @@ class spawn(object): called at the beginning of a line. This method does not send a newline. It is the responsibility of the caller to ensure the eof is sent at the beginning of a line. ''' - - ### Hmmm... how do I send an EOF? - ###C if ((m = write(pty, *buf, p - *buf)) < 0) - ###C return (errno == EWOULDBLOCK) ? n : -1; - #fd = sys.stdin.fileno() - #old = termios.tcgetattr(fd) # remember current state - #attr = termios.tcgetattr(fd) - #attr[3] = attr[3] | termios.ICANON # ICANON must be set to see EOF - #try: # use try/finally to ensure state gets restored - # termios.tcsetattr(fd, termios.TCSADRAIN, attr) - # if hasattr(termios, 'CEOF'): - # os.write(self.child_fd, '%c' % termios.CEOF) - # else: - # # Silly platform does not define CEOF so assume CTRL-D - # os.write(self.child_fd, '%c' % 4) - #finally: # restore state - # termios.tcsetattr(fd, termios.TCSADRAIN, old) if hasattr(termios, 'VEOF'): char = ord(termios.tcgetattr(self.child_fd)[6][termios.VEOF]) else: @@ -1641,10 +1626,14 @@ class spawn(object): if self.child_fd in r: try: data = self.__interact_read(self.child_fd) - except OSError as e: - # The subprocess may have closed before we get to reading it - if e.errno != errno.EIO: - raise + except OSError, err: + if err.args[0] == errno.EIO: + # Linux-style EOF + break + raise + if data == b'': + # BSD-style EOF + break if output_filter: data = output_filter(data) if self.logfile is not None: -- cgit v1.2.1 From a4d7da7c6a5c0325a07852283f008ade5a1e9c4f Mon Sep 17 00:00:00 2001 From: jquast Date: Sat, 7 Jun 2014 21:37:36 -0700 Subject: python3.2 accommodations --- tests/test_interact.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/test_interact.py b/tests/test_interact.py index 4854ff1..70a0e08 100755 --- a/tests/test_interact.py +++ b/tests/test_interact.py @@ -78,10 +78,10 @@ class InteractTestCase (PexpectTestCase.PexpectTestCase): " Ensure subprocess receives unicode, EOF, and exit. " p = pexpect.spawnu(self.interact_ucs_py, timeout=5) p.expect('') - p.sendline(u'ɑlpha') - p.sendline(u'Βeta') - p.expect(u'ɑlpha') - p.expect(u'Βeta') + p.sendline('ɑlpha') + p.sendline('Βeta') + p.expect('ɑlpha') + p.expect('Βeta') p.sendeof() p.expect_exact('') p.expect_exact('Escaped interact') -- cgit v1.2.1 From 20ded785263e64c5beec1d594f666bdae5f2c2aa Mon Sep 17 00:00:00 2001 From: jquast Date: Sat, 7 Jun 2014 21:45:22 -0700 Subject: For python3, exceptions should use "as err", not ", err" --- pexpect/__init__.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pexpect/__init__.py b/pexpect/__init__.py index 2ce0b9d..f37aaaf 100644 --- a/pexpect/__init__.py +++ b/pexpect/__init__.py @@ -911,7 +911,7 @@ class spawn(object): if self.child_fd in r: try: s = os.read(self.child_fd, size) - except OSError, err: + except OSError as err: if err.args[0] == errno.EIO: # Linux-style EOF self.flag_eof = True @@ -1626,7 +1626,7 @@ class spawn(object): if self.child_fd in r: try: data = self.__interact_read(self.child_fd) - except OSError, err: + except OSError as err: if err.args[0] == errno.EIO: # Linux-style EOF break -- cgit v1.2.1 From abe09d65daa99aab33b08a00f2c183b67e7f9ad3 Mon Sep 17 00:00:00 2001 From: Jeff Quast Date: Sun, 8 Jun 2014 08:07:03 +0000 Subject: Solaris support, tested on SmartOS from cron(1). --- doc/history.rst | 10 ++- pexpect/__init__.py | 210 ++++++++++++++++++++++++++++++---------------- pexpect/replwrap.py | 9 +- tests/getch.py | 2 +- tests/test_ctrl_chars.py | 100 +++++++++++----------- tests/test_expect.py | 162 ++++++++++++++++++++--------------- tests/test_isalive.py | 89 +++++++++----------- tests/test_misc.py | 39 +++++---- tests/test_performance.py | 14 ++-- tests/test_replwrap.py | 4 +- tests/test_unicode.py | 38 +++++++-- 11 files changed, 400 insertions(+), 277 deletions(-) diff --git a/doc/history.rst b/doc/history.rst index 5b26521..576316a 100644 --- a/doc/history.rst +++ b/doc/history.rst @@ -17,6 +17,12 @@ Version 3.3 * Fixed issue where EOF was not correctly detected in ``interact()``, causing a repeating loop of output on Linux, and blocking before EOF on BSD and Solaris (:ghissue:`49`). +* Several Solaris (SmartOS) bugfixes, preventing IOError exceptions, especially + when used with cron(1) (:ghissue:`44`). +* Added new keyword argument ``echo=True`` for ``spawn()``. On SRV4-like + systems, the method ``isatty()`` will always return *False*: the child pty + does not appear as a terminal. Therefore, ``setecho()``, ``getwinsize()``, + ``setwinsize()``, and ``waitnoecho()`` are not supported on those platforms. Version 3.2 ``````````` @@ -113,11 +119,11 @@ Version 2.3 consistently on different platforms. Solaris is the most difficult to support. * You can now put ``TIMEOUT`` in a list of expected patterns. This is just like putting ``EOF`` in the pattern list. Expecting for a ``TIMEOUT`` may not be - used as often as ``EOF``, but this makes Pexpect more consitent. + used as often as ``EOF``, but this makes Pexpect more consistent. * Thanks to a suggestion and sample code from Chad J. Schroeder I added the ability for Pexpect to operate on a file descriptor that is already open. This means that Pexpect can be used to control streams such as those from serial port devices. Now, - you just pass the integer file descriptor as the "command" when contsructing a + you just pass the integer file descriptor as the "command" when constructing a spawn open. For example on a Linux box with a modem on ttyS1:: fd = os.open("/dev/ttyS1", os.O_RDWR|os.O_NONBLOCK|os.O_NOCTTY) diff --git a/pexpect/__init__.py b/pexpect/__init__.py index fa40669..935aa31 100644 --- a/pexpect/__init__.py +++ b/pexpect/__init__.py @@ -303,7 +303,7 @@ class spawn(object): def __init__(self, command, args=[], timeout=30, maxread=2000, searchwindowsize=None, logfile=None, cwd=None, env=None, - ignore_sighup=True): + ignore_sighup=True, echo=True): '''This is the constructor. The command parameter may be a string that includes a command and any arguments to the command. For example:: @@ -416,7 +416,16 @@ class spawn(object): signalstatus will store the signal value and exitstatus will be None. If you need more detail you can also read the self.status member which stores the status returned by os.waitpid. You can interpret this using - os.WIFEXITED/os.WEXITSTATUS or os.WIFSIGNALED/os.TERMSIG. ''' + os.WIFEXITED/os.WEXITSTATUS or os.WIFSIGNALED/os.TERMSIG. + + The echo attribute may be set to False to disable echoing of input. + As a pseudo-terminal, all input echoed by the "keyboard" (send() + or sendline()) will be repeated to output. For many cases, it is + not desirable to have echo enabled, and it may be later disabled + using setecho(False) followed by waitnoecho(). However, for some + platforms such as Solaris, this is not possible, and should be + disabled immediately on spawn. + ''' self.STDIN_FILENO = pty.STDIN_FILENO self.STDOUT_FILENO = pty.STDOUT_FILENO @@ -438,7 +447,7 @@ class spawn(object): self.status = None self.flag_eof = False self.pid = None - # the chile filedescriptor is initially closed + # the child file descriptor is initially closed self.child_fd = -1 self.timeout = timeout self.delimiter = EOF @@ -467,16 +476,30 @@ class spawn(object): self.closed = True self.cwd = cwd self.env = env + self.echo = echo self.ignore_sighup = ignore_sighup + _platform = sys.platform.lower() # This flags if we are running on irix - self.__irix_hack = (sys.platform.lower().find('irix') >= 0) + self.__irix_hack = _platform.startswith('irix') # Solaris uses internal __fork_pty(). All others use pty.fork(). - if ((sys.platform.lower().find('solaris') >= 0) - or (sys.platform.lower().find('sunos5') >= 0)): - self.use_native_pty_fork = False - else: - self.use_native_pty_fork = True - + self.use_native_pty_fork = not ( + _platform.startswith('solaris') or + _platform.startswith('sunos')) + # inherit EOF and INTR definitions from controlling process. + try: + from termios import VEOF, VINTR + fd = sys.__stdin__.fileno() + self._INTR = ord(termios.tcgetattr(fd)[6][VINTR]) + self._EOF = ord(termios.tcgetattr(fd)[6][VEOF]) + except (ImportError, OSError, IOError, termios.error): + # unless the controlling process is also not a terminal, + # such as cron(1). Fall-back to using CEOF and CINTR. + try: + from termios import CEOF, CINTR + (self._INTR, self._EOF) = (CINTR, CEOF) + except ImportError: + # ^C, ^D + (self._INTR, self._EOF) = (3, 4) # Support subclasses that do not use command or args. if command is None: self.command = None @@ -607,26 +630,32 @@ class spawn(object): # Use internal __fork_pty self.pid, self.child_fd = self.__fork_pty() - if self.pid == 0: + # Some platforms must call setwinsize() and setecho() from the + # child process, and others from the master process. We do both, + # allowing IOError for either. + + if self.pid == pty.CHILD: # Child + self.child_fd = self.STDIN_FILENO + + # set default window size of 24 rows by 80 columns try: - # used by setwinsize() - self.child_fd = sys.stdout.fileno() self.setwinsize(24, 80) - # which exception, shouldnt' we catch explicitly .. ? - except: - # Some platforms do not like setwinsize (Cygwin). - # This will cause problem when running applications that - # are very picky about window size. - # This is a serious limitation, but not a show stopper. - pass + except IOError as err: + if err.args[0] not in (errno.EINVAL, errno.ENOTTY): + raise + + # disable echo if spawn argument echo was unset + if not self.echo: + try: + self.setecho(self.echo) + except (IOError, termios.error) as err: + if err.args[0] not in (errno.EINVAL, errno.ENOTTY): + raise + # Do not allow child to inherit open file descriptors from parent. max_fd = resource.getrlimit(resource.RLIMIT_NOFILE)[0] - for i in range(3, max_fd): - try: - os.close(i) - except OSError: - pass + os.closerange(3, max_fd) if self.ignore_sighup: signal.signal(signal.SIGHUP, signal.SIG_IGN) @@ -639,6 +668,13 @@ class spawn(object): os.execvpe(self.command, self.args, self.env) # Parent + try: + self.setwinsize(24, 80) + except IOError as err: + if err.args[0] not in (errno.EINVAL, errno.ENOTTY): + raise + + self.terminated = False self.closed = False @@ -661,19 +697,15 @@ class spawn(object): raise ExceptionPexpect("Could not open with os.openpty().") pid = os.fork() - if pid < 0: # pragma: no cover - raise ExceptionPexpect("Failed os.fork().") - elif pid == 0: + if pid == pty.CHILD: # Child. os.close(parent_fd) self.__pty_make_controlling_tty(child_fd) - os.dup2(child_fd, 0) - os.dup2(child_fd, 1) - os.dup2(child_fd, 2) + os.dup2(child_fd, self.STDIN_FILENO) + os.dup2(child_fd, self.STDOUT_FILENO) + os.dup2(child_fd, self.STDERR_FILENO) - if child_fd > 2: - os.close(child_fd) else: # Parent. os.close(child_fd) @@ -685,15 +717,29 @@ class spawn(object): more portable than the pty.fork() function. Specifically, this should work on Solaris. ''' - child_name = os.ttyname(tty_fd) + # os.ttyname() fails for the child process under a rare timing + # condition (about 1 in 10 on SmartOs in a VMWare machine). May + # raise OSError as errno ENOTTY -- ptsname(3C) should be used, + # on this system, but python does not expose it, it would require + # ctypes + libc. which is also not available, filed patch as + # http://bugs.python.org/issue20664 + child_name, tries = None, 0 + _poll, _max_tries= 0.1, 10 + while child_name is None: + try: + child_name = os.ttyname(tty_fd) + break + except OSError, err: + tries += 1 + if tries > _max_tries: + raise + time.sleep(_poll) # Disconnect from controlling tty. Harmless if not already connected. try: fd = os.open("/dev/tty", os.O_RDWR | os.O_NOCTTY) - if fd >= 0: - os.close(fd) - # which exception, shouldnt' we catch explicitly .. ? - except: + os.close(fd) + except OSError: # Already disconnected. This happens if running inside cron. pass @@ -703,10 +749,8 @@ class spawn(object): # by attempting to open it again. try: fd = os.open("/dev/tty", os.O_RDWR | os.O_NOCTTY) - if fd >= 0: - os.close(fd) - # which exception, shouldnt' we catch explicitly .. ? - except: + os.close(fd) + except OSError: # Good! We are disconnected from a controlling tty. pass @@ -718,11 +762,15 @@ class spawn(object): os.close(fd) # Verify we now have a controlling tty. - fd = os.open("/dev/tty", os.O_WRONLY) - if fd < 0: - raise ExceptionPexpect("Could not open controlling tty, /dev/tty") - else: + try: + fd = os.open("/dev/tty", os.O_WRONLY) os.close(fd) + except OSError as err: + if err.args[0] == ENXIO: + # on Solaris, `/dev/tty' raises OSError upon opening, though + # it does exist: /dev/tty -> ../devices/pseudo/sy@0:tty + pass + def fileno(self): '''This returns the file descriptor of the pty for the child. @@ -756,7 +804,11 @@ class spawn(object): def isatty(self): '''This returns True if the file descriptor is open and connected to a - tty(-like) device, else False. ''' + tty(-like) device, else False. + + On SRV4-style platforms implementing streams, the child pty does not + appear as a terminal device. This means methods such as setecho(), + setwinsize(), getwinsize() may raise an IOError. ''' return os.isatty(self.child_fd) @@ -793,12 +845,20 @@ class spawn(object): def getecho(self): '''This returns the terminal echo mode. This returns True if echo is on or False if echo is off. Child applications that are expecting you - to enter a password often set ECHO False. See waitnoecho(). ''' + to enter a password often set ECHO False. See waitnoecho(). - attr = termios.tcgetattr(self.child_fd) - if attr[3] & termios.ECHO: - return True - return False + Not supported on platforms where ``isatty()`` returns False. ''' + + try: + attr = termios.tcgetattr(self.child_fd) + except termios.error as err: + errmsg = 'getecho() may not be called on this platform' + if err.args[0] == errno.EINVAL: + raise IOError(err.args[0], '%s: %s.' % (err.args[1], errmsg)) + raise + + self.echo = bool(attr[3] & termios.ECHO) + return self.echo def setecho(self, state): '''This sets the terminal echo mode on or off. Note that anything the @@ -828,18 +888,35 @@ class spawn(object): p.expect(['1234']) p.expect(['abcd']) p.expect(['wxyz']) + + + Not supported on platforms where ``isatty()`` returns False. ''' - self.child_fd - attr = termios.tcgetattr(self.child_fd) + errmsg = 'setecho() may not be called on this platform' + + try: + attr = termios.tcgetattr(self.child_fd) + except termios.error as err: + if err.args[0] == errno.EINVAL: + raise IOError(err.args[0], '%s: %s.' % (err.args[1], errmsg)) + raise + if state: attr[3] = attr[3] | termios.ECHO else: attr[3] = attr[3] & ~termios.ECHO - # I tried TCSADRAIN and TCSAFLUSH, but - # these were inconsistent and blocked on some platforms. - # TCSADRAIN would probably be ideal if it worked. - termios.tcsetattr(self.child_fd, termios.TCSANOW, attr) + + try: + # I tried TCSADRAIN and TCSAFLUSH, but these were inconsistent and + # blocked on some platforms. TCSADRAIN would probably be ideal. + termios.tcsetattr(self.child_fd, termios.TCSANOW, attr) + except IOError as err: + if err.args[0] == errno.EINVAL: + raise IOError(err.args[0], '%s: %s.' % (err.args[1], errmsg)) + raise + + self.echo = state def _log(self, s, direction): if self.logfile is not None: @@ -1075,24 +1152,15 @@ class spawn(object): called at the beginning of a line. This method does not send a newline. It is the responsibility of the caller to ensure the eof is sent at the beginning of a line. ''' - if hasattr(termios, 'VEOF'): - char = ord(termios.tcgetattr(self.child_fd)[6][termios.VEOF]) - else: - # platform does not define VEOF so assume CTRL-D - char = 4 - self.send(self._chr(char)) + + self.send(self._chr(self._EOF)) def sendintr(self): '''This sends a SIGINT to the child. It does not require the SIGINT to be the first character on a line. ''' - if hasattr(termios, 'VINTR'): - char = ord(termios.tcgetattr(self.child_fd)[6][termios.VINTR]) - else: - # platform does not define VINTR so assume CTRL-C - char = 3 - self.send(self._chr(char)) + self.send(self._chr(self._INTR)) def eof(self): @@ -1185,7 +1253,7 @@ class spawn(object): if self.flag_eof: # This is for Linux, which requires the blocking form - # of waitpid to # get status of a defunct process. + # of waitpid to get the status of a defunct process. # This is super-lame. The flag_eof would have been set # in read_nonblocking(), so this should be safe. waitpid_options = 0 diff --git a/pexpect/replwrap.py b/pexpect/replwrap.py index af4f889..a36678c 100644 --- a/pexpect/replwrap.py +++ b/pexpect/replwrap.py @@ -34,11 +34,14 @@ class REPLWrapper(object): new_prompt=PEXPECT_PROMPT, continuation_prompt=PEXPECT_CONTINUATION_PROMPT): if isinstance(cmd_or_spawn, str): - self.child = pexpect.spawnu(cmd_or_spawn) + self.child = pexpect.spawnu(cmd_or_spawn, echo=False) else: self.child = cmd_or_spawn - self.child.setecho(False) # Don't repeat our input. - self.child.waitnoecho() + if self.child.echo: + # Existing spawn instance has echo enabled, disable it + # to prevent our input from being repeated to output. + self.child.setecho(False) + self.child.waitnoecho() if prompt_change is None: self.prompt = orig_prompt diff --git a/tests/getch.py b/tests/getch.py index cb20bef..41e3224 100755 --- a/tests/getch.py +++ b/tests/getch.py @@ -33,7 +33,7 @@ def main(): val = ord(stdin.read(1)) except KeyboardInterrupt: val = 3 - sys.stdout.write('%d\r\n' % (val,)) + sys.stdout.write('%d\r\n' % (val,)) if val == 0: # StopIteration equivalent is ctrl+' ' (\x00, NUL) break diff --git a/tests/test_ctrl_chars.py b/tests/test_ctrl_chars.py index 7b87840..d84f01b 100755 --- a/tests/test_ctrl_chars.py +++ b/tests/test_ctrl_chars.py @@ -37,82 +37,82 @@ class TestCtrlChars(PexpectTestCase.PexpectTestCase): def test_control_chars(self): '''This tests that we can send all 256 8-bit characters to a child process.''' - child = pexpect.spawn('python getch.py') - child.expect('READY', timeout=5) - try: - for i in range(1,256): - child.send(byte(i)) - child.expect ('%d\r\n' % (i,)) - # This needs to be last, as getch.py exits on \x00 - child.send(byte(0)) - child.expect('0\r\n') - child.expect(pexpect.EOF) - except Exception: - err = sys.exc_info()[1] - msg = "Did not echo character value: " + str(i) + "\n" - msg = msg + str(err) - self.fail(msg) + child = pexpect.spawn('python getch.py', echo=False, timeout=5) + child.expect('READY') + for i in range(1, 256): + child.send(byte(i)) + child.expect ('%d' % (i,)) + + # This needs to be last, as getch.py exits on \x00 + child.send(byte(0)) + child.expect('0') + child.expect(pexpect.EOF) + assert not child.isalive() + assert child.exitstatus == 0 def test_sendintr (self): - try: - child = pexpect.spawn('python getch.py') - child.expect('READY', timeout=5) - child.sendintr() - child.expect ('3\r\n') - except Exception: - err = sys.exc_info()[1] - self.fail("Did not echo character value: 3, %s\n%s\n%s" % ( - str(err), child.before, child.after,)) + child = pexpect.spawn('python getch.py', echo=False, timeout=5) + child.expect('READY') + child.sendintr() + child.expect('3') + + child.send(byte(0)) + child.expect('0') + child.expect(pexpect.EOF) + assert not child.isalive() + assert child.exitstatus == 0 + + def test_sendeof(self): + child = pexpect.spawn('python getch.py', echo=False, timeout=5) + child.expect('READY') + child.sendeof() + child.expect(str(child._EOF) + '') + + child.send(byte(0)) + child.expect('0') + child.expect(pexpect.EOF) + assert not child.isalive() + assert child.exitstatus == 0 def test_bad_sendcontrol_chars (self): '''This tests that sendcontrol will return 0 for an unknown char. ''' - child = pexpect.spawn('python getch.py') - retval = child.sendcontrol('1') - assert retval == 0, "sendcontrol() should have returned 0 because there is no such thing as ctrl-1." + child = pexpect.spawn('python getch.py', echo=False, timeout=5) + child.expect('READY') + assert 0 == child.sendcontrol('1') def test_sendcontrol(self): '''This tests that we can send all special control codes by name. ''' - child = pexpect.spawn('python getch.py') - # On slow machines, like Travis, the process is not ready in time to - # catch the first character unless we wait for it. - child.expect('READY', timeout=5) - child.delaybeforesend = 0.05 + child = pexpect.spawn('python getch.py', echo=False, timeout=5) + child.expect('READY') for ctrl in 'abcdefghijklmnopqrstuvwxyz': assert child.sendcontrol(ctrl) == 1 val = ord(ctrl) - ord('a') + 1 - try: - child.expect_exact(str(val)+'\r\n', timeout=2) - except: - print(ctrl) - raise + child.expect_exact(str(val)+'') # escape character assert child.sendcontrol('[') == 1 - child.expect ('27\r\n') + child.expect('27') assert child.sendcontrol('\\') == 1 - child.expect ('28\r\n') + child.expect('28') # telnet escape character assert child.sendcontrol(']') == 1 - child.expect ('29\r\n') + child.expect('29') assert child.sendcontrol('^') == 1 - child.expect ('30\r\n') + child.expect('30') # irc protocol uses this to underline ... assert child.sendcontrol('_') == 1 - child.expect ('31\r\n') + child.expect('31') # the real "backspace is delete" assert child.sendcontrol('?') == 1 - child.expect ('127\r\n') + child.expect('127') + # NUL, same as ctrl + ' ' assert child.sendcontrol('@') == 1 - child.expect ('0\r\n') - # 0 is sentinel value to getch.py, assert exit: - # causes child to exit, but, if immediately tested, - # isalive() still returns True unless an artifical timer - # is used. - time.sleep(0.5) - assert child.isalive() == False, child.isalive() + child.expect('0') + child.expect(pexpect.EOF) + assert not child.isalive() assert child.exitstatus == 0 if __name__ == '__main__': diff --git a/tests/test_expect.py b/tests/test_expect.py index 4e99aad..da5214c 100755 --- a/tests/test_expect.py +++ b/tests/test_expect.py @@ -18,22 +18,21 @@ PEXPECT LICENSE OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. ''' -import pexpect import unittest import subprocess import time +import signal +import sys + +import pexpect from . import PexpectTestCase from .utils import no_coverage_env -import signal # Many of these test cases blindly assume that sequential directory # listings of the /bin directory will yield the same results. # This may not be true, but seems adequate for testing now. # I should fix this at some point. -# query: For some reason an extra newline occures under OS X evey -# once in a while. Excessive uses of .replace resolve these - FILTER=''.join([(len(repr(chr(x)))==3) and chr(x) or '.' for x in range(256)]) def hex_dump(src, length=16): result=[] @@ -51,32 +50,10 @@ def hex_diff(left, right): return '\n' + '\n'.join(diff,) -class assert_raises_msg(object): - def __init__(self, errtype, msgpart): - self.errtype = errtype - self.msgpart = msgpart - - def __enter__(self): - pass - - def __exit__(self, etype, value, traceback): - if value is None: - raise AssertionError('Expected %s, but no exception was raised' \ - % self.errtype) - if not isinstance(value, self.errtype): - raise AssertionError('Expected %s, but %s was raised' \ - % (self.errtype, etype)) - - errstr = str(value) - if self.msgpart not in errstr: - raise AssertionError('%r was not in %r' % (self.msgpart, errstr)) - - return True - class ExpectTestCase (PexpectTestCase.PexpectTestCase): def test_expect_basic (self): - p = pexpect.spawn('cat') + p = pexpect.spawn('cat', echo=False, timeout=5) p.sendline (b'Hello') p.sendline (b'there') p.sendline (b'Mr. Python') @@ -87,7 +64,7 @@ class ExpectTestCase (PexpectTestCase.PexpectTestCase): p.expect (pexpect.EOF) def test_expect_exact_basic (self): - p = pexpect.spawn('cat') + p = pexpect.spawn('cat', echo=False, timeout=5) p.sendline (b'Hello') p.sendline (b'there') p.sendline (b'Mr. Python') @@ -101,7 +78,7 @@ class ExpectTestCase (PexpectTestCase.PexpectTestCase): '''This test that the ignorecase flag will match patterns even if case is different using the regex (?i) directive. ''' - p = pexpect.spawn('cat') + p = pexpect.spawn('cat', echo=False, timeout=5) p.sendline (b'HELLO') p.sendline (b'there') p.expect (b'(?i)hello') @@ -113,7 +90,7 @@ class ExpectTestCase (PexpectTestCase.PexpectTestCase): '''This test that the ignorecase flag will match patterns even if case is different using the ignorecase flag. ''' - p = pexpect.spawn('cat') + p = pexpect.spawn('cat', echo=False, timeout=5) p.ignorecase = True p.sendline (b'HELLO') p.sendline (b'there') @@ -129,22 +106,17 @@ class ExpectTestCase (PexpectTestCase.PexpectTestCase): (one of the) the leftmost matches in the input? -- grahn) ... agreed! -jquast, the buffer ptr isn't forwarded on match, see first two test cases ''' - p = pexpect.spawn('cat') + p = pexpect.spawn('cat', echo=False, timeout=5) self._expect_order(p) def test_expect_order_exact (self): '''Like test_expect_order(), but using expect_exact(). ''' - p = pexpect.spawn('cat') + p = pexpect.spawn('cat', echo=False, timeout=5) p.expect = p.expect_exact self._expect_order(p) def _expect_order (self, p): - # Disable echo so that the output we see is in an entirely predictable - # order - p.setecho(False) - p.waitnoecho() - p.sendline (b'1234') p.sendline (b'abcd') p.sendline (b'wxyz') @@ -187,7 +159,44 @@ class ExpectTestCase (PexpectTestCase.PexpectTestCase): pexpect.EOF]) assert index == 3, (index, p.before, p.after) - def test_waitnoecho (self): + def test_expect_setecho_off(self): + '''This tests that echo may be toggled off. + ''' + p = pexpect.spawn('cat', echo=True, timeout=5) + try: + self._expect_echo_toggle(p) + except IOError: + if sys.platform.lower().startswith('sunos'): + if hasattr(unittest, 'SkipTest'): + raise unittest.SkipTest("Not supported on this platform.") + return 'skip' + + def test_expect_setecho_off_exact(self): + p = pexpect.spawn('cat', echo=True, timeout=5) + p.expect = p.expect_exact + try: + self._expect_echo_toggle(p) + except IOError: + if sys.platform.lower().startswith('sunos'): + if hasattr(unittest, 'SkipTest'): + raise unittest.SkipTest("Not supported on this platform.") + return 'skip' + raise + + def test_waitnoecho(self): + " Tests setecho(False) followed by waitnoecho() " + p = pexpect.spawn('cat', echo=False, timeout=5) + try: + p.setecho(False) + p.waitnoecho() + except IOError: + if sys.platform.lower().startswith('sunos'): + if hasattr(unittest, 'SkipTest'): + raise unittest.SkipTest("Not supported on this platform.") + return 'skip' + raise + + def test_waitnoecho_order(self): ''' This tests that we can wait on a child process to set echo mode. For example, this tests that we could wait for SSH to set ECHO False @@ -196,7 +205,16 @@ class ExpectTestCase (PexpectTestCase.PexpectTestCase): p1 = pexpect.spawn('%s echo_wait.py' % self.PYTHONBIN) start = time.time() - p1.waitnoecho(timeout=10) + try: + p1.waitnoecho(timeout=10) + except IOError: + if sys.platform.lower().startswith('sunos'): + if hasattr(unittest, 'SkipTest'): + raise unittest.SkipTest("Not supported on this platform.") + return 'skip' + raise + + end_time = time.time() - start assert end_time < 10 and end_time > 2, "waitnoecho did not set ECHO off in the expected window of time." @@ -216,15 +234,15 @@ class ExpectTestCase (PexpectTestCase.PexpectTestCase): assert end_time < 10, "waitnoecho did not set ECHO off in the expected window of time." def test_expect_echo (self): - '''This tests that echo can be turned on and off. + '''This tests that echo is on by default. ''' - p = pexpect.spawn('cat', timeout=10) + p = pexpect.spawn('cat', echo=True, timeout=5) self._expect_echo(p) def test_expect_echo_exact (self): '''Like test_expect_echo(), but using expect_exact(). ''' - p = pexpect.spawn('cat', timeout=10) + p = pexpect.spawn('cat', echo=True, timeout=5) p.expect = p.expect_exact self._expect_echo(p) @@ -243,7 +261,24 @@ class ExpectTestCase (PexpectTestCase.PexpectTestCase): b'wxyz', pexpect.EOF]) assert index == 0, "index="+str(index) + + def _expect_echo_toggle(self, p): + p.sendline (b'1234') # Should see this twice (once from tty echo and again from cat). + index = p.expect ([ + b'1234', + b'abcd', + b'wxyz', + pexpect.EOF, + pexpect.TIMEOUT]) + assert index == 0, "index="+str(index)+"\n"+p.before + index = p.expect ([ + b'1234', + b'abcd', + b'wxyz', + pexpect.EOF]) + assert index == 0, "index="+str(index) p.setecho(0) # Turn off tty echo + p.waitnoecho() p.sendline (b'abcd') # Now, should only see this once. p.sendline (b'wxyz') # Should also be only once. index = p.expect ([ @@ -271,32 +306,32 @@ class ExpectTestCase (PexpectTestCase.PexpectTestCase): '''This tests that mixed list of regex strings, TIMEOUT, and EOF all return the correct index when matched. ''' - p = pexpect.spawn('cat') + p = pexpect.spawn('cat', echo=False, timeout=5) self._expect_index(p) def test_expect_index_exact (self): '''Like test_expect_index(), but using expect_exact(). ''' - p = pexpect.spawn('cat') + p = pexpect.spawn('cat', echo=False, timeout=5) p.expect = p.expect_exact self._expect_index(p) def _expect_index (self, p): - p.setecho(0) p.sendline (b'1234') index = p.expect ([b'abcd',b'wxyz',b'1234',pexpect.EOF]) assert index == 2, "index="+str(index) p.sendline (b'abcd') index = p.expect ([pexpect.TIMEOUT,b'abcd',b'wxyz',b'1234',pexpect.EOF]) - assert index == 1, "index="+str(index) + assert index == 1, "index="+str(index)+str(p) p.sendline (b'wxyz') - index = p.expect ([b'54321',pexpect.TIMEOUT,b'abcd',b'wxyz',b'1234',pexpect.EOF], timeout=5) + index = p.expect ([b'54321',pexpect.TIMEOUT,b'abcd',b'wxyz',b'1234',pexpect.EOF]) assert index == 3, "index="+str(index) # Expect 'wxyz' p.sendline (b'$*!@?') - index = p.expect ([b'54321',pexpect.TIMEOUT,b'abcd',b'wxyz',b'1234',pexpect.EOF], timeout=5) + index = p.expect ([b'54321',pexpect.TIMEOUT,b'abcd',b'wxyz',b'1234',pexpect.EOF], + timeout=1) assert index == 1, "index="+str(index) # Expect TIMEOUT p.sendeof () - index = p.expect ([b'54321',pexpect.TIMEOUT,b'abcd',b'wxyz',b'1234',pexpect.EOF], timeout=5) + index = p.expect ([b'54321',pexpect.TIMEOUT,b'abcd',b'wxyz',b'1234',pexpect.EOF]) assert index == 5, "index="+str(index) # Expect EOF def test_expect (self): @@ -471,14 +506,13 @@ class ExpectTestCase (PexpectTestCase.PexpectTestCase): def test_bad_arg(self): p = pexpect.spawn('cat') - with assert_raises_msg(TypeError, 'must be one of'): + with self.assertRaisesRegexp(TypeError, '.*must be one of'): p.expect(1) - with assert_raises_msg(TypeError, 'must be one of'): + with self.assertRaisesRegexp(TypeError, '.*must be one of'): p.expect([1, b'2']) - - with assert_raises_msg(TypeError, 'must be one of'): + with self.assertRaisesRegexp(TypeError, '.*must be one of'): p.expect_exact(1) - with assert_raises_msg(TypeError, 'must be one of'): + with self.assertRaisesRegexp(TypeError, '.*must be one of'): p.expect_exact([1, b'2']) def test_timeout_none(self): @@ -493,7 +527,7 @@ class ExpectTestCase (PexpectTestCase.PexpectTestCase): SIGWINCH generated when a window is resized), but in this test, we are substituting an ALARM signal as this is much easier for testing and is treated the same as a SIGWINCH. - + To ensure that the alarm fires during the expect call, we are setting the signal to alarm after 1 second while the spawned process sleeps for 2 seconds prior to sending the expected output. @@ -501,21 +535,13 @@ class ExpectTestCase (PexpectTestCase.PexpectTestCase): def noop(x, y): pass signal.signal(signal.SIGALRM, noop) - - p1 = pexpect.spawn('%s sleep_for.py 2' % self.PYTHONBIN) - p1.expect('READY', timeout=10) + + p1 = pexpect.spawn('%s sleep_for.py 2' % self.PYTHONBIN, timeout=5) + p1.expect('READY') signal.alarm(1) - p1.expect('END', timeout=10) + p1.expect('END') if __name__ == '__main__': unittest.main() suite = unittest.makeSuite(ExpectTestCase,'test') - -#fout = open('delete_me_1','wb') -#fout.write(the_old_way) -#fout.close -#fout = open('delete_me_2', 'wb') -#fout.write(the_new_way) -#fout.close - diff --git a/tests/test_isalive.py b/tests/test_isalive.py index cbb474d..5168a52 100755 --- a/tests/test_isalive.py +++ b/tests/test_isalive.py @@ -31,23 +31,16 @@ class IsAliveTestCase(PexpectTestCase.PexpectTestCase): '''This tests that calling wait on a finished process works as expected. ''' p = pexpect.spawn('sleep 3') - if not p.isalive(): - self.fail ('Child process is not alive. It should be.') - time.sleep(1) + assert p.isalive() p.wait() - if p.isalive(): - self.fail ('Child process is not dead. It should be.') + assert not p.isalive() + p = pexpect.spawn('sleep 3') - if not p.isalive(): - self.fail ('Child process is not alive. It should be.') + assert p.isalive() p.kill(9) time.sleep(1) - try: + with self.assertRaises(pexpect.ExceptionPexpect): p.wait() - except pexpect.ExceptionPexpect: - pass - else: - self.fail ('Should have raised ExceptionPython because you can\'t call wait on a dead process.') def test_signal_wait(self): '''Test calling wait with a process terminated by a signal.''' @@ -55,64 +48,64 @@ class IsAliveTestCase(PexpectTestCase.PexpectTestCase): return 'SKIP' p = pexpect.spawn(sys.executable, ['alarm_die.py']) p.wait() - assert p.exitstatus is None, p.exitstatus + assert p.exitstatus is None self.assertEqual(p.signalstatus, signal.SIGALRM) def test_expect_isalive_dead_after_normal_termination (self): - p = pexpect.spawn('ls') + p = pexpect.spawn('ls', timeout=15) p.expect(pexpect.EOF) - time.sleep(1) # allow kernel status time to catch up with state. - if p.isalive(): - self.fail ('Child process is not dead. It should be.') + assert not p.isalive() - def test_expect_isalive_dead_after_SIGINT (self): + def test_expect_isalive_dead_after_SIGHUP(self): + p = pexpect.spawn('cat', timeout=5, ignore_sighup=False) + assert p.isalive() + force = False + if sys.platform.lower().startswith('sunos'): + # On Solaris (SmartOs), and only when executed from cron(1), SIGKILL + # is required to end the sub-process. This is done using force=True + force = True + assert p.terminate(force) == True + p.expect(pexpect.EOF) + assert not p.isalive() + + def test_expect_isalive_dead_after_SIGINT(self): p = pexpect.spawn('cat', timeout=5) - if not p.isalive(): - self.fail ('Child process is not alive. It should be.') - p.terminate() - # Solaris is kind of slow. - # Without this delay then p.expect(...) will not see - # that the process is dead and it will timeout. - time.sleep(1) + assert p.isalive() + force = False + if sys.platform.lower().startswith('sunos'): + # On Solaris (SmartOs), and only when executed from cron(1), SIGKILL + # is required to end the sub-process. This is done using force=True + force = True + assert p.terminate(force) == True p.expect(pexpect.EOF) - if p.isalive(): - self.fail ('Child process is not dead. It should be.') + assert not p.isalive() - def test_expect_isalive_dead_after_SIGKILL (self): - p = pexpect.spawn('cat', timeout=3) - if not p.isalive(): - self.fail ('Child process is not alive. It should be.') + def test_expect_isalive_dead_after_SIGKILL(self): + p = pexpect.spawn('cat', timeout=5) + assert p.isalive() p.kill(9) - # Solaris is kind of slow. - # Without this delay then p.expect(...) will not see - # that the process is dead and it will timeout. - time.sleep(1) p.expect(pexpect.EOF) - if p.isalive(): - self.fail ('Child process is not dead. It should be.') + assert not p.isalive() def test_forced_terminate(self): p = pexpect.spawn(sys.executable, ['needs_kill.py']) p.expect('READY') - res = p.terminate(force=True) - assert res, res + assert p.terminate(force=True) == True + p.expect(pexpect.EOF) + assert not p.isalive() ### Some platforms allow this. Some reset status after call to waitpid. +### probably not necessary, isalive() returns early when terminate is False. def test_expect_isalive_consistent_multiple_calls (self): '''This tests that multiple calls to isalive() return same value. ''' - p = pexpect.spawn('cat') - if not p.isalive(): - self.fail ('Child process is not alive. It should be.') - if not p.isalive(): - self.fail ('Second call. Child process is not alive. It should be.') + assert p.isalive() + assert p.isalive() p.kill(9) p.expect(pexpect.EOF) - if p.isalive(): - self.fail ('Child process is not dead. It should be.') - if p.isalive(): - self.fail ('Second call. Child process is not dead. It should be.') + assert not p.isalive() + assert not p.isalive() if __name__ == '__main__': unittest.main() diff --git a/tests/test_misc.py b/tests/test_misc.py index 66d943b..c465e89 100755 --- a/tests/test_misc.py +++ b/tests/test_misc.py @@ -25,6 +25,7 @@ import sys import re import signal import time +import os # the program cat(1) may display ^D\x08\x08 when \x04 (EOF, Ctrl-D) is sent _CAT_EOF = b'^D\x08\x08' @@ -33,7 +34,11 @@ class TestCaseMisc(PexpectTestCase.PexpectTestCase): def test_isatty (self): child = pexpect.spawn('cat') - assert child.isatty(), "Not returning True. Should always be True." + if not child.isatty() and sys.platform.lower().startswith('sunos'): + if hasattr(unittest, 'SkipTest'): + raise unittest.SkipTest("Not supported on this platform.") + return 'skip' + assert child.isatty() def test_read (self): child = pexpect.spawn('cat') @@ -95,7 +100,7 @@ class TestCaseMisc(PexpectTestCase.PexpectTestCase): assert (page == b'abc\r\nabc\r\n123\r\n123\r\n' or page == b'abc\r\n123\r\nabc\r\n123\r\n' or page == b'abc\r\n123abc\r\n\r\n123\r\n') , \ - "iterator did not work. page=%r"(page,) + "iterator did not work. page=%r" % (page,) def test_readlines(self): '''Note that on some slow or heavily loaded systems that the lines @@ -201,34 +206,28 @@ class TestCaseMisc(PexpectTestCase.PexpectTestCase): else: self.fail ("child.isalive() should have raised a pexpect.ExceptionPexpect") child.terminated = 1 # Force back to valid state so __del__ won't complain + def test_bad_arguments (self): '''This tests that we get a graceful error when passing bad arguments.''' - try: - p = pexpect.spawn(1) - except pexpect.ExceptionPexpect: - pass - else: - self.fail ("pexpect.spawn(1) should have raised a pexpect.ExceptionPexpect.") - try: - p = pexpect.spawn('ls', '-la') # should really use pexpect.spawn('ls', ['-ls']) - except TypeError: - pass - else: - self.fail ("pexpect.spawn('ls', '-la') should have raised a TypeError.") - try: - p = pexpect.spawn('cat') + with self.assertRaises(pexpect.ExceptionPexpect): + pexpect.spawn(1) + + with self.assertRaises(TypeError): + # should use pexpect.spawn('ls', ['-ls']) + pexpect.spawn('ls', '-la') + + with self.assertRaises(ValueError): + p = pexpect.spawn('cat', timeout=5) p.close() p.read_nonblocking(size=1, timeout=3) - except ValueError: - pass - else: - self.fail ("read_nonblocking on closed spawn object should have raised a ValueError.") + def test_isalive(self): child = pexpect.spawn('cat') assert child.isalive(), child.isalive() child.sendeof() child.expect(pexpect.EOF) assert not child.isalive(), child.isalive() + def test_bad_type_in_expect(self): child = pexpect.spawn('cat') try: diff --git a/tests/test_performance.py b/tests/test_performance.py index 163e4f2..7be0cf6 100755 --- a/tests/test_performance.py +++ b/tests/test_performance.py @@ -31,7 +31,7 @@ class PerformanceTestCase (PexpectTestCase.PexpectTestCase): '''Testing the performance of expect, with emphasis on wading through long inputs. ''' - + if sys.version_info[0] >= 3: @staticmethod def _iter_n(n): @@ -41,10 +41,10 @@ class PerformanceTestCase (PexpectTestCase.PexpectTestCase): else: @staticmethod def _iter_n(n): - return 'for n in range(1, %d+1): print(n)' % n + return 'for n in range(1, %d+1): print(n)' % n def plain_range(self, n): - e = pexpect.spawn('python') + e = pexpect.spawn('python', timeout=100) self.assertEqual(e.expect(b'>>>'), 0) e.sendline(self._iter_n(n)) self.assertEqual(e.expect(br'\.{3}'), 0) @@ -52,7 +52,7 @@ class PerformanceTestCase (PexpectTestCase.PexpectTestCase): self.assertEqual(e.expect([b'inquisition', '%d' % n]), 1) def window_range(self, n): - e = pexpect.spawn('python') + e = pexpect.spawn('python', timeout=100) self.assertEqual(e.expect(b'>>>'), 0) e.sendline(self._iter_n(n)) self.assertEqual(e.expect(r'\.{3}'), 0) @@ -60,7 +60,7 @@ class PerformanceTestCase (PexpectTestCase.PexpectTestCase): self.assertEqual(e.expect([b'inquisition', '%d' % n], searchwindowsize=20), 1) def exact_range(self, n): - e = pexpect.spawn('python') + e = pexpect.spawn('python', timeout=100) self.assertEqual(e.expect_exact([b'>>>']), 0) e.sendline(self._iter_n(n)) self.assertEqual(e.expect_exact([b'...']), 0) @@ -68,7 +68,7 @@ class PerformanceTestCase (PexpectTestCase.PexpectTestCase): self.assertEqual(e.expect_exact([b'inquisition', '%d' % n],timeout=520), 1) def ewin_range(self, n): - e = pexpect.spawn('python') + e = pexpect.spawn('python', timeout=100) self.assertEqual(e.expect_exact([b'>>>']), 0) e.sendline(self._iter_n(n)) self.assertEqual(e.expect_exact([b'...']), 0) @@ -76,7 +76,7 @@ class PerformanceTestCase (PexpectTestCase.PexpectTestCase): self.assertEqual(e.expect_exact([b'inquisition', '%d' % n], searchwindowsize=20), 1) def faster_range(self, n): - e = pexpect.spawn('python') + e = pexpect.spawn('python', timeout=100) self.assertEqual(e.expect(b'>>>'), 0) e.sendline(('list(range(1, %d+1))' % n).encode('ascii')) self.assertEqual(e.expect([b'inquisition', '%d' % n]), 1) diff --git a/tests/test_replwrap.py b/tests/test_replwrap.py index 3ee9f0c..93ba6fc 100644 --- a/tests/test_replwrap.py +++ b/tests/test_replwrap.py @@ -45,7 +45,7 @@ class REPLWrapTestCase(unittest.TestCase): self.assertEqual(res.strip().splitlines(), ['1 2', '3 4']) def test_existing_spawn(self): - child = pexpect.spawnu("bash", timeout=5) + child = pexpect.spawnu("bash", timeout=5, echo=False) repl = replwrap.REPLWrapper(child, re.compile('[$#]'), "PS1='{0}' PS2='{1}' " "PROMPT_COMMAND=''") @@ -68,7 +68,7 @@ class REPLWrapTestCase(unittest.TestCase): if platform.python_implementation() == 'PyPy': raise unittest.SkipTest("This test fails on PyPy because of REPL differences") - child = pexpect.spawnu('python', timeout=5) + child = pexpect.spawnu('python', echo=False, timeout=5) # prompt_change=None should mean no prompt change py = replwrap.REPLWrapper(child, replwrap.u(">>> "), prompt_change=None, continuation_prompt=replwrap.u("... ")) diff --git a/tests/test_unicode.py b/tests/test_unicode.py index 38e758d..1a60ff1 100644 --- a/tests/test_unicode.py +++ b/tests/test_unicode.py @@ -3,6 +3,7 @@ from __future__ import unicode_literals import platform import tempfile +import sys import pexpect import unittest @@ -34,33 +35,60 @@ class UnicodeTests(PexpectTestCase.PexpectTestCase): p.sendeof() p.expect_exact (pexpect.EOF) - def test_expect_echo (self): - '''This tests that echo can be turned on and off. + def test_expect_setecho_toggle(self): + '''This tests that echo may be toggled off. ''' - p = pexpect.spawnu('cat', timeout=10) - self._expect_echo(p) + p = pexpect.spawnu('cat', timeout=5) + try: + self._expect_echo_toggle_off(p) + except IOError: + if sys.platform.lower().startswith('sunos'): + if hasattr(unittest, 'SkipTest'): + raise unittest.SkipTest("Not supported on this platform.") + return 'skip' + raise + self._expect_echo_toggle_on(p) def test_expect_echo_exact (self): '''Like test_expect_echo(), but using expect_exact(). ''' - p = pexpect.spawnu('cat', timeout=10) + p = pexpect.spawnu('cat', timeout=5) p.expect = p.expect_exact self._expect_echo(p) + def test_expect_setecho_toggle_exact(self): + p = pexpect.spawnu('cat', timeout=5) + p.expect = p.expect_exact + try: + self._expect_echo_toggle_off(p) + except IOError: + if sys.platform.lower().startswith('sunos'): + if hasattr(unittest, 'SkipTest'): + raise unittest.SkipTest("Not supported on this platform.") + return 'skip' + raise + self._expect_echo_toggle_on(p) + def _expect_echo (self, p): p.sendline('1234') # Should see this twice (once from tty echo and again from cat). index = p.expect (['1234', 'abcdé', 'wxyz', pexpect.EOF, pexpect.TIMEOUT]) assert index == 0, (index, p.before) index = p.expect (['1234', 'abcdé', 'wxyz', pexpect.EOF]) assert index == 0, index + + def _expect_echo_toggle_off(self, p): p.setecho(0) # Turn off tty echo + p.waitnoecho() p.sendline('abcdé') # Now, should only see this once. p.sendline('wxyz') # Should also be only once. index = p.expect ([pexpect.EOF,pexpect.TIMEOUT, 'abcdé', 'wxyz', '1234']) assert index == 2, index index = p.expect ([pexpect.EOF, 'abcdé', 'wxyz', '7890']) assert index == 2, index + + def _expect_echo_toggle_on(self, p): p.setecho(1) # Turn on tty echo + time.sleep(0.2) # there is no waitecho() ! p.sendline('7890') # Should see this twice. index = p.expect ([pexpect.EOF, 'abcdé', 'wxyz', '7890']) assert index == 3, index -- cgit v1.2.1 From 8f48ac8d97ffd520074f02af4ef5ef7b8aadf862 Mon Sep 17 00:00:00 2001 From: Jeff Quast Date: Sun, 8 Jun 2014 08:07:03 +0000 Subject: Solaris support, tested on SmartOS from cron(1). --- doc/history.rst | 10 ++- pexpect/__init__.py | 210 ++++++++++++++++++++++++++++++---------------- pexpect/replwrap.py | 9 +- tests/getch.py | 2 +- tests/test_ctrl_chars.py | 100 +++++++++++----------- tests/test_expect.py | 162 ++++++++++++++++++++--------------- tests/test_isalive.py | 89 +++++++++----------- tests/test_misc.py | 39 +++++---- tests/test_performance.py | 14 ++-- tests/test_replwrap.py | 4 +- tests/test_unicode.py | 38 +++++++-- 11 files changed, 400 insertions(+), 277 deletions(-) diff --git a/doc/history.rst b/doc/history.rst index 2b924a9..313b9ff 100644 --- a/doc/history.rst +++ b/doc/history.rst @@ -19,6 +19,12 @@ Version 3.3 * Fixed issue where EOF was not correctly detected in ``interact()``, causing a repeating loop of output on Linux, and blocking before EOF on BSD and Solaris (:ghissue:`49`). +* Several Solaris (SmartOS) bugfixes, preventing IOError exceptions, especially + when used with cron(1) (:ghissue:`44`). +* Added new keyword argument ``echo=True`` for ``spawn()``. On SRV4-like + systems, the method ``isatty()`` will always return *False*: the child pty + does not appear as a terminal. Therefore, ``setecho()``, ``getwinsize()``, + ``setwinsize()``, and ``waitnoecho()`` are not supported on those platforms. Version 3.2 ``````````` @@ -115,11 +121,11 @@ Version 2.3 consistently on different platforms. Solaris is the most difficult to support. * You can now put ``TIMEOUT`` in a list of expected patterns. This is just like putting ``EOF`` in the pattern list. Expecting for a ``TIMEOUT`` may not be - used as often as ``EOF``, but this makes Pexpect more consitent. + used as often as ``EOF``, but this makes Pexpect more consistent. * Thanks to a suggestion and sample code from Chad J. Schroeder I added the ability for Pexpect to operate on a file descriptor that is already open. This means that Pexpect can be used to control streams such as those from serial port devices. Now, - you just pass the integer file descriptor as the "command" when contsructing a + you just pass the integer file descriptor as the "command" when constructing a spawn open. For example on a Linux box with a modem on ttyS1:: fd = os.open("/dev/ttyS1", os.O_RDWR|os.O_NONBLOCK|os.O_NOCTTY) diff --git a/pexpect/__init__.py b/pexpect/__init__.py index 23eef39..c7411cb 100644 --- a/pexpect/__init__.py +++ b/pexpect/__init__.py @@ -305,7 +305,7 @@ class spawn(object): def __init__(self, command, args=[], timeout=30, maxread=2000, searchwindowsize=None, logfile=None, cwd=None, env=None, - ignore_sighup=True): + ignore_sighup=True, echo=True): '''This is the constructor. The command parameter may be a string that includes a command and any arguments to the command. For example:: @@ -418,7 +418,16 @@ class spawn(object): signalstatus will store the signal value and exitstatus will be None. If you need more detail you can also read the self.status member which stores the status returned by os.waitpid. You can interpret this using - os.WIFEXITED/os.WEXITSTATUS or os.WIFSIGNALED/os.TERMSIG. ''' + os.WIFEXITED/os.WEXITSTATUS or os.WIFSIGNALED/os.TERMSIG. + + The echo attribute may be set to False to disable echoing of input. + As a pseudo-terminal, all input echoed by the "keyboard" (send() + or sendline()) will be repeated to output. For many cases, it is + not desirable to have echo enabled, and it may be later disabled + using setecho(False) followed by waitnoecho(). However, for some + platforms such as Solaris, this is not possible, and should be + disabled immediately on spawn. + ''' self.STDIN_FILENO = pty.STDIN_FILENO self.STDOUT_FILENO = pty.STDOUT_FILENO @@ -440,7 +449,7 @@ class spawn(object): self.status = None self.flag_eof = False self.pid = None - # the chile filedescriptor is initially closed + # the child file descriptor is initially closed self.child_fd = -1 self.timeout = timeout self.delimiter = EOF @@ -469,16 +478,30 @@ class spawn(object): self.closed = True self.cwd = cwd self.env = env + self.echo = echo self.ignore_sighup = ignore_sighup + _platform = sys.platform.lower() # This flags if we are running on irix - self.__irix_hack = (sys.platform.lower().find('irix') >= 0) + self.__irix_hack = _platform.startswith('irix') # Solaris uses internal __fork_pty(). All others use pty.fork(). - if ((sys.platform.lower().find('solaris') >= 0) - or (sys.platform.lower().find('sunos5') >= 0)): - self.use_native_pty_fork = False - else: - self.use_native_pty_fork = True - + self.use_native_pty_fork = not ( + _platform.startswith('solaris') or + _platform.startswith('sunos')) + # inherit EOF and INTR definitions from controlling process. + try: + from termios import VEOF, VINTR + fd = sys.__stdin__.fileno() + self._INTR = ord(termios.tcgetattr(fd)[6][VINTR]) + self._EOF = ord(termios.tcgetattr(fd)[6][VEOF]) + except (ImportError, OSError, IOError, termios.error): + # unless the controlling process is also not a terminal, + # such as cron(1). Fall-back to using CEOF and CINTR. + try: + from termios import CEOF, CINTR + (self._INTR, self._EOF) = (CINTR, CEOF) + except ImportError: + # ^C, ^D + (self._INTR, self._EOF) = (3, 4) # Support subclasses that do not use command or args. if command is None: self.command = None @@ -609,26 +632,32 @@ class spawn(object): # Use internal __fork_pty self.pid, self.child_fd = self.__fork_pty() - if self.pid == 0: + # Some platforms must call setwinsize() and setecho() from the + # child process, and others from the master process. We do both, + # allowing IOError for either. + + if self.pid == pty.CHILD: # Child + self.child_fd = self.STDIN_FILENO + + # set default window size of 24 rows by 80 columns try: - # used by setwinsize() - self.child_fd = sys.stdout.fileno() self.setwinsize(24, 80) - # which exception, shouldnt' we catch explicitly .. ? - except: - # Some platforms do not like setwinsize (Cygwin). - # This will cause problem when running applications that - # are very picky about window size. - # This is a serious limitation, but not a show stopper. - pass + except IOError as err: + if err.args[0] not in (errno.EINVAL, errno.ENOTTY): + raise + + # disable echo if spawn argument echo was unset + if not self.echo: + try: + self.setecho(self.echo) + except (IOError, termios.error) as err: + if err.args[0] not in (errno.EINVAL, errno.ENOTTY): + raise + # Do not allow child to inherit open file descriptors from parent. max_fd = resource.getrlimit(resource.RLIMIT_NOFILE)[0] - for i in range(3, max_fd): - try: - os.close(i) - except OSError: - pass + os.closerange(3, max_fd) if self.ignore_sighup: signal.signal(signal.SIGHUP, signal.SIG_IGN) @@ -641,6 +670,13 @@ class spawn(object): os.execvpe(self.command, self.args, self.env) # Parent + try: + self.setwinsize(24, 80) + except IOError as err: + if err.args[0] not in (errno.EINVAL, errno.ENOTTY): + raise + + self.terminated = False self.closed = False @@ -663,19 +699,15 @@ class spawn(object): raise ExceptionPexpect("Could not open with os.openpty().") pid = os.fork() - if pid < 0: # pragma: no cover - raise ExceptionPexpect("Failed os.fork().") - elif pid == 0: + if pid == pty.CHILD: # Child. os.close(parent_fd) self.__pty_make_controlling_tty(child_fd) - os.dup2(child_fd, 0) - os.dup2(child_fd, 1) - os.dup2(child_fd, 2) + os.dup2(child_fd, self.STDIN_FILENO) + os.dup2(child_fd, self.STDOUT_FILENO) + os.dup2(child_fd, self.STDERR_FILENO) - if child_fd > 2: - os.close(child_fd) else: # Parent. os.close(child_fd) @@ -687,15 +719,29 @@ class spawn(object): more portable than the pty.fork() function. Specifically, this should work on Solaris. ''' - child_name = os.ttyname(tty_fd) + # os.ttyname() fails for the child process under a rare timing + # condition (about 1 in 10 on SmartOs in a VMWare machine). May + # raise OSError as errno ENOTTY -- ptsname(3C) should be used, + # on this system, but python does not expose it, it would require + # ctypes + libc. which is also not available, filed patch as + # http://bugs.python.org/issue20664 + child_name, tries = None, 0 + _poll, _max_tries= 0.1, 10 + while child_name is None: + try: + child_name = os.ttyname(tty_fd) + break + except OSError, err: + tries += 1 + if tries > _max_tries: + raise + time.sleep(_poll) # Disconnect from controlling tty. Harmless if not already connected. try: fd = os.open("/dev/tty", os.O_RDWR | os.O_NOCTTY) - if fd >= 0: - os.close(fd) - # which exception, shouldnt' we catch explicitly .. ? - except: + os.close(fd) + except OSError: # Already disconnected. This happens if running inside cron. pass @@ -705,10 +751,8 @@ class spawn(object): # by attempting to open it again. try: fd = os.open("/dev/tty", os.O_RDWR | os.O_NOCTTY) - if fd >= 0: - os.close(fd) - # which exception, shouldnt' we catch explicitly .. ? - except: + os.close(fd) + except OSError: # Good! We are disconnected from a controlling tty. pass @@ -720,11 +764,15 @@ class spawn(object): os.close(fd) # Verify we now have a controlling tty. - fd = os.open("/dev/tty", os.O_WRONLY) - if fd < 0: - raise ExceptionPexpect("Could not open controlling tty, /dev/tty") - else: + try: + fd = os.open("/dev/tty", os.O_WRONLY) os.close(fd) + except OSError as err: + if err.args[0] == ENXIO: + # on Solaris, `/dev/tty' raises OSError upon opening, though + # it does exist: /dev/tty -> ../devices/pseudo/sy@0:tty + pass + def fileno(self): '''This returns the file descriptor of the pty for the child. @@ -758,7 +806,11 @@ class spawn(object): def isatty(self): '''This returns True if the file descriptor is open and connected to a - tty(-like) device, else False. ''' + tty(-like) device, else False. + + On SRV4-style platforms implementing streams, the child pty does not + appear as a terminal device. This means methods such as setecho(), + setwinsize(), getwinsize() may raise an IOError. ''' return os.isatty(self.child_fd) @@ -795,12 +847,20 @@ class spawn(object): def getecho(self): '''This returns the terminal echo mode. This returns True if echo is on or False if echo is off. Child applications that are expecting you - to enter a password often set ECHO False. See waitnoecho(). ''' + to enter a password often set ECHO False. See waitnoecho(). - attr = termios.tcgetattr(self.child_fd) - if attr[3] & termios.ECHO: - return True - return False + Not supported on platforms where ``isatty()`` returns False. ''' + + try: + attr = termios.tcgetattr(self.child_fd) + except termios.error as err: + errmsg = 'getecho() may not be called on this platform' + if err.args[0] == errno.EINVAL: + raise IOError(err.args[0], '%s: %s.' % (err.args[1], errmsg)) + raise + + self.echo = bool(attr[3] & termios.ECHO) + return self.echo def setecho(self, state): '''This sets the terminal echo mode on or off. Note that anything the @@ -830,18 +890,35 @@ class spawn(object): p.expect(['1234']) p.expect(['abcd']) p.expect(['wxyz']) + + + Not supported on platforms where ``isatty()`` returns False. ''' - self.child_fd - attr = termios.tcgetattr(self.child_fd) + errmsg = 'setecho() may not be called on this platform' + + try: + attr = termios.tcgetattr(self.child_fd) + except termios.error as err: + if err.args[0] == errno.EINVAL: + raise IOError(err.args[0], '%s: %s.' % (err.args[1], errmsg)) + raise + if state: attr[3] = attr[3] | termios.ECHO else: attr[3] = attr[3] & ~termios.ECHO - # I tried TCSADRAIN and TCSAFLUSH, but - # these were inconsistent and blocked on some platforms. - # TCSADRAIN would probably be ideal if it worked. - termios.tcsetattr(self.child_fd, termios.TCSANOW, attr) + + try: + # I tried TCSADRAIN and TCSAFLUSH, but these were inconsistent and + # blocked on some platforms. TCSADRAIN would probably be ideal. + termios.tcsetattr(self.child_fd, termios.TCSANOW, attr) + except IOError as err: + if err.args[0] == errno.EINVAL: + raise IOError(err.args[0], '%s: %s.' % (err.args[1], errmsg)) + raise + + self.echo = state def _log(self, s, direction): if self.logfile is not None: @@ -1077,24 +1154,15 @@ class spawn(object): called at the beginning of a line. This method does not send a newline. It is the responsibility of the caller to ensure the eof is sent at the beginning of a line. ''' - if hasattr(termios, 'VEOF'): - char = ord(termios.tcgetattr(self.child_fd)[6][termios.VEOF]) - else: - # platform does not define VEOF so assume CTRL-D - char = 4 - self.send(self._chr(char)) + + self.send(self._chr(self._EOF)) def sendintr(self): '''This sends a SIGINT to the child. It does not require the SIGINT to be the first character on a line. ''' - if hasattr(termios, 'VINTR'): - char = ord(termios.tcgetattr(self.child_fd)[6][termios.VINTR]) - else: - # platform does not define VINTR so assume CTRL-C - char = 3 - self.send(self._chr(char)) + self.send(self._chr(self._INTR)) def eof(self): @@ -1187,7 +1255,7 @@ class spawn(object): if self.flag_eof: # This is for Linux, which requires the blocking form - # of waitpid to # get status of a defunct process. + # of waitpid to get the status of a defunct process. # This is super-lame. The flag_eof would have been set # in read_nonblocking(), so this should be safe. waitpid_options = 0 diff --git a/pexpect/replwrap.py b/pexpect/replwrap.py index af4f889..a36678c 100644 --- a/pexpect/replwrap.py +++ b/pexpect/replwrap.py @@ -34,11 +34,14 @@ class REPLWrapper(object): new_prompt=PEXPECT_PROMPT, continuation_prompt=PEXPECT_CONTINUATION_PROMPT): if isinstance(cmd_or_spawn, str): - self.child = pexpect.spawnu(cmd_or_spawn) + self.child = pexpect.spawnu(cmd_or_spawn, echo=False) else: self.child = cmd_or_spawn - self.child.setecho(False) # Don't repeat our input. - self.child.waitnoecho() + if self.child.echo: + # Existing spawn instance has echo enabled, disable it + # to prevent our input from being repeated to output. + self.child.setecho(False) + self.child.waitnoecho() if prompt_change is None: self.prompt = orig_prompt diff --git a/tests/getch.py b/tests/getch.py index cb20bef..41e3224 100755 --- a/tests/getch.py +++ b/tests/getch.py @@ -33,7 +33,7 @@ def main(): val = ord(stdin.read(1)) except KeyboardInterrupt: val = 3 - sys.stdout.write('%d\r\n' % (val,)) + sys.stdout.write('%d\r\n' % (val,)) if val == 0: # StopIteration equivalent is ctrl+' ' (\x00, NUL) break diff --git a/tests/test_ctrl_chars.py b/tests/test_ctrl_chars.py index 7b87840..d84f01b 100755 --- a/tests/test_ctrl_chars.py +++ b/tests/test_ctrl_chars.py @@ -37,82 +37,82 @@ class TestCtrlChars(PexpectTestCase.PexpectTestCase): def test_control_chars(self): '''This tests that we can send all 256 8-bit characters to a child process.''' - child = pexpect.spawn('python getch.py') - child.expect('READY', timeout=5) - try: - for i in range(1,256): - child.send(byte(i)) - child.expect ('%d\r\n' % (i,)) - # This needs to be last, as getch.py exits on \x00 - child.send(byte(0)) - child.expect('0\r\n') - child.expect(pexpect.EOF) - except Exception: - err = sys.exc_info()[1] - msg = "Did not echo character value: " + str(i) + "\n" - msg = msg + str(err) - self.fail(msg) + child = pexpect.spawn('python getch.py', echo=False, timeout=5) + child.expect('READY') + for i in range(1, 256): + child.send(byte(i)) + child.expect ('%d' % (i,)) + + # This needs to be last, as getch.py exits on \x00 + child.send(byte(0)) + child.expect('0') + child.expect(pexpect.EOF) + assert not child.isalive() + assert child.exitstatus == 0 def test_sendintr (self): - try: - child = pexpect.spawn('python getch.py') - child.expect('READY', timeout=5) - child.sendintr() - child.expect ('3\r\n') - except Exception: - err = sys.exc_info()[1] - self.fail("Did not echo character value: 3, %s\n%s\n%s" % ( - str(err), child.before, child.after,)) + child = pexpect.spawn('python getch.py', echo=False, timeout=5) + child.expect('READY') + child.sendintr() + child.expect('3') + + child.send(byte(0)) + child.expect('0') + child.expect(pexpect.EOF) + assert not child.isalive() + assert child.exitstatus == 0 + + def test_sendeof(self): + child = pexpect.spawn('python getch.py', echo=False, timeout=5) + child.expect('READY') + child.sendeof() + child.expect(str(child._EOF) + '') + + child.send(byte(0)) + child.expect('0') + child.expect(pexpect.EOF) + assert not child.isalive() + assert child.exitstatus == 0 def test_bad_sendcontrol_chars (self): '''This tests that sendcontrol will return 0 for an unknown char. ''' - child = pexpect.spawn('python getch.py') - retval = child.sendcontrol('1') - assert retval == 0, "sendcontrol() should have returned 0 because there is no such thing as ctrl-1." + child = pexpect.spawn('python getch.py', echo=False, timeout=5) + child.expect('READY') + assert 0 == child.sendcontrol('1') def test_sendcontrol(self): '''This tests that we can send all special control codes by name. ''' - child = pexpect.spawn('python getch.py') - # On slow machines, like Travis, the process is not ready in time to - # catch the first character unless we wait for it. - child.expect('READY', timeout=5) - child.delaybeforesend = 0.05 + child = pexpect.spawn('python getch.py', echo=False, timeout=5) + child.expect('READY') for ctrl in 'abcdefghijklmnopqrstuvwxyz': assert child.sendcontrol(ctrl) == 1 val = ord(ctrl) - ord('a') + 1 - try: - child.expect_exact(str(val)+'\r\n', timeout=2) - except: - print(ctrl) - raise + child.expect_exact(str(val)+'') # escape character assert child.sendcontrol('[') == 1 - child.expect ('27\r\n') + child.expect('27') assert child.sendcontrol('\\') == 1 - child.expect ('28\r\n') + child.expect('28') # telnet escape character assert child.sendcontrol(']') == 1 - child.expect ('29\r\n') + child.expect('29') assert child.sendcontrol('^') == 1 - child.expect ('30\r\n') + child.expect('30') # irc protocol uses this to underline ... assert child.sendcontrol('_') == 1 - child.expect ('31\r\n') + child.expect('31') # the real "backspace is delete" assert child.sendcontrol('?') == 1 - child.expect ('127\r\n') + child.expect('127') + # NUL, same as ctrl + ' ' assert child.sendcontrol('@') == 1 - child.expect ('0\r\n') - # 0 is sentinel value to getch.py, assert exit: - # causes child to exit, but, if immediately tested, - # isalive() still returns True unless an artifical timer - # is used. - time.sleep(0.5) - assert child.isalive() == False, child.isalive() + child.expect('0') + child.expect(pexpect.EOF) + assert not child.isalive() assert child.exitstatus == 0 if __name__ == '__main__': diff --git a/tests/test_expect.py b/tests/test_expect.py index 4e99aad..da5214c 100755 --- a/tests/test_expect.py +++ b/tests/test_expect.py @@ -18,22 +18,21 @@ PEXPECT LICENSE OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. ''' -import pexpect import unittest import subprocess import time +import signal +import sys + +import pexpect from . import PexpectTestCase from .utils import no_coverage_env -import signal # Many of these test cases blindly assume that sequential directory # listings of the /bin directory will yield the same results. # This may not be true, but seems adequate for testing now. # I should fix this at some point. -# query: For some reason an extra newline occures under OS X evey -# once in a while. Excessive uses of .replace resolve these - FILTER=''.join([(len(repr(chr(x)))==3) and chr(x) or '.' for x in range(256)]) def hex_dump(src, length=16): result=[] @@ -51,32 +50,10 @@ def hex_diff(left, right): return '\n' + '\n'.join(diff,) -class assert_raises_msg(object): - def __init__(self, errtype, msgpart): - self.errtype = errtype - self.msgpart = msgpart - - def __enter__(self): - pass - - def __exit__(self, etype, value, traceback): - if value is None: - raise AssertionError('Expected %s, but no exception was raised' \ - % self.errtype) - if not isinstance(value, self.errtype): - raise AssertionError('Expected %s, but %s was raised' \ - % (self.errtype, etype)) - - errstr = str(value) - if self.msgpart not in errstr: - raise AssertionError('%r was not in %r' % (self.msgpart, errstr)) - - return True - class ExpectTestCase (PexpectTestCase.PexpectTestCase): def test_expect_basic (self): - p = pexpect.spawn('cat') + p = pexpect.spawn('cat', echo=False, timeout=5) p.sendline (b'Hello') p.sendline (b'there') p.sendline (b'Mr. Python') @@ -87,7 +64,7 @@ class ExpectTestCase (PexpectTestCase.PexpectTestCase): p.expect (pexpect.EOF) def test_expect_exact_basic (self): - p = pexpect.spawn('cat') + p = pexpect.spawn('cat', echo=False, timeout=5) p.sendline (b'Hello') p.sendline (b'there') p.sendline (b'Mr. Python') @@ -101,7 +78,7 @@ class ExpectTestCase (PexpectTestCase.PexpectTestCase): '''This test that the ignorecase flag will match patterns even if case is different using the regex (?i) directive. ''' - p = pexpect.spawn('cat') + p = pexpect.spawn('cat', echo=False, timeout=5) p.sendline (b'HELLO') p.sendline (b'there') p.expect (b'(?i)hello') @@ -113,7 +90,7 @@ class ExpectTestCase (PexpectTestCase.PexpectTestCase): '''This test that the ignorecase flag will match patterns even if case is different using the ignorecase flag. ''' - p = pexpect.spawn('cat') + p = pexpect.spawn('cat', echo=False, timeout=5) p.ignorecase = True p.sendline (b'HELLO') p.sendline (b'there') @@ -129,22 +106,17 @@ class ExpectTestCase (PexpectTestCase.PexpectTestCase): (one of the) the leftmost matches in the input? -- grahn) ... agreed! -jquast, the buffer ptr isn't forwarded on match, see first two test cases ''' - p = pexpect.spawn('cat') + p = pexpect.spawn('cat', echo=False, timeout=5) self._expect_order(p) def test_expect_order_exact (self): '''Like test_expect_order(), but using expect_exact(). ''' - p = pexpect.spawn('cat') + p = pexpect.spawn('cat', echo=False, timeout=5) p.expect = p.expect_exact self._expect_order(p) def _expect_order (self, p): - # Disable echo so that the output we see is in an entirely predictable - # order - p.setecho(False) - p.waitnoecho() - p.sendline (b'1234') p.sendline (b'abcd') p.sendline (b'wxyz') @@ -187,7 +159,44 @@ class ExpectTestCase (PexpectTestCase.PexpectTestCase): pexpect.EOF]) assert index == 3, (index, p.before, p.after) - def test_waitnoecho (self): + def test_expect_setecho_off(self): + '''This tests that echo may be toggled off. + ''' + p = pexpect.spawn('cat', echo=True, timeout=5) + try: + self._expect_echo_toggle(p) + except IOError: + if sys.platform.lower().startswith('sunos'): + if hasattr(unittest, 'SkipTest'): + raise unittest.SkipTest("Not supported on this platform.") + return 'skip' + + def test_expect_setecho_off_exact(self): + p = pexpect.spawn('cat', echo=True, timeout=5) + p.expect = p.expect_exact + try: + self._expect_echo_toggle(p) + except IOError: + if sys.platform.lower().startswith('sunos'): + if hasattr(unittest, 'SkipTest'): + raise unittest.SkipTest("Not supported on this platform.") + return 'skip' + raise + + def test_waitnoecho(self): + " Tests setecho(False) followed by waitnoecho() " + p = pexpect.spawn('cat', echo=False, timeout=5) + try: + p.setecho(False) + p.waitnoecho() + except IOError: + if sys.platform.lower().startswith('sunos'): + if hasattr(unittest, 'SkipTest'): + raise unittest.SkipTest("Not supported on this platform.") + return 'skip' + raise + + def test_waitnoecho_order(self): ''' This tests that we can wait on a child process to set echo mode. For example, this tests that we could wait for SSH to set ECHO False @@ -196,7 +205,16 @@ class ExpectTestCase (PexpectTestCase.PexpectTestCase): p1 = pexpect.spawn('%s echo_wait.py' % self.PYTHONBIN) start = time.time() - p1.waitnoecho(timeout=10) + try: + p1.waitnoecho(timeout=10) + except IOError: + if sys.platform.lower().startswith('sunos'): + if hasattr(unittest, 'SkipTest'): + raise unittest.SkipTest("Not supported on this platform.") + return 'skip' + raise + + end_time = time.time() - start assert end_time < 10 and end_time > 2, "waitnoecho did not set ECHO off in the expected window of time." @@ -216,15 +234,15 @@ class ExpectTestCase (PexpectTestCase.PexpectTestCase): assert end_time < 10, "waitnoecho did not set ECHO off in the expected window of time." def test_expect_echo (self): - '''This tests that echo can be turned on and off. + '''This tests that echo is on by default. ''' - p = pexpect.spawn('cat', timeout=10) + p = pexpect.spawn('cat', echo=True, timeout=5) self._expect_echo(p) def test_expect_echo_exact (self): '''Like test_expect_echo(), but using expect_exact(). ''' - p = pexpect.spawn('cat', timeout=10) + p = pexpect.spawn('cat', echo=True, timeout=5) p.expect = p.expect_exact self._expect_echo(p) @@ -243,7 +261,24 @@ class ExpectTestCase (PexpectTestCase.PexpectTestCase): b'wxyz', pexpect.EOF]) assert index == 0, "index="+str(index) + + def _expect_echo_toggle(self, p): + p.sendline (b'1234') # Should see this twice (once from tty echo and again from cat). + index = p.expect ([ + b'1234', + b'abcd', + b'wxyz', + pexpect.EOF, + pexpect.TIMEOUT]) + assert index == 0, "index="+str(index)+"\n"+p.before + index = p.expect ([ + b'1234', + b'abcd', + b'wxyz', + pexpect.EOF]) + assert index == 0, "index="+str(index) p.setecho(0) # Turn off tty echo + p.waitnoecho() p.sendline (b'abcd') # Now, should only see this once. p.sendline (b'wxyz') # Should also be only once. index = p.expect ([ @@ -271,32 +306,32 @@ class ExpectTestCase (PexpectTestCase.PexpectTestCase): '''This tests that mixed list of regex strings, TIMEOUT, and EOF all return the correct index when matched. ''' - p = pexpect.spawn('cat') + p = pexpect.spawn('cat', echo=False, timeout=5) self._expect_index(p) def test_expect_index_exact (self): '''Like test_expect_index(), but using expect_exact(). ''' - p = pexpect.spawn('cat') + p = pexpect.spawn('cat', echo=False, timeout=5) p.expect = p.expect_exact self._expect_index(p) def _expect_index (self, p): - p.setecho(0) p.sendline (b'1234') index = p.expect ([b'abcd',b'wxyz',b'1234',pexpect.EOF]) assert index == 2, "index="+str(index) p.sendline (b'abcd') index = p.expect ([pexpect.TIMEOUT,b'abcd',b'wxyz',b'1234',pexpect.EOF]) - assert index == 1, "index="+str(index) + assert index == 1, "index="+str(index)+str(p) p.sendline (b'wxyz') - index = p.expect ([b'54321',pexpect.TIMEOUT,b'abcd',b'wxyz',b'1234',pexpect.EOF], timeout=5) + index = p.expect ([b'54321',pexpect.TIMEOUT,b'abcd',b'wxyz',b'1234',pexpect.EOF]) assert index == 3, "index="+str(index) # Expect 'wxyz' p.sendline (b'$*!@?') - index = p.expect ([b'54321',pexpect.TIMEOUT,b'abcd',b'wxyz',b'1234',pexpect.EOF], timeout=5) + index = p.expect ([b'54321',pexpect.TIMEOUT,b'abcd',b'wxyz',b'1234',pexpect.EOF], + timeout=1) assert index == 1, "index="+str(index) # Expect TIMEOUT p.sendeof () - index = p.expect ([b'54321',pexpect.TIMEOUT,b'abcd',b'wxyz',b'1234',pexpect.EOF], timeout=5) + index = p.expect ([b'54321',pexpect.TIMEOUT,b'abcd',b'wxyz',b'1234',pexpect.EOF]) assert index == 5, "index="+str(index) # Expect EOF def test_expect (self): @@ -471,14 +506,13 @@ class ExpectTestCase (PexpectTestCase.PexpectTestCase): def test_bad_arg(self): p = pexpect.spawn('cat') - with assert_raises_msg(TypeError, 'must be one of'): + with self.assertRaisesRegexp(TypeError, '.*must be one of'): p.expect(1) - with assert_raises_msg(TypeError, 'must be one of'): + with self.assertRaisesRegexp(TypeError, '.*must be one of'): p.expect([1, b'2']) - - with assert_raises_msg(TypeError, 'must be one of'): + with self.assertRaisesRegexp(TypeError, '.*must be one of'): p.expect_exact(1) - with assert_raises_msg(TypeError, 'must be one of'): + with self.assertRaisesRegexp(TypeError, '.*must be one of'): p.expect_exact([1, b'2']) def test_timeout_none(self): @@ -493,7 +527,7 @@ class ExpectTestCase (PexpectTestCase.PexpectTestCase): SIGWINCH generated when a window is resized), but in this test, we are substituting an ALARM signal as this is much easier for testing and is treated the same as a SIGWINCH. - + To ensure that the alarm fires during the expect call, we are setting the signal to alarm after 1 second while the spawned process sleeps for 2 seconds prior to sending the expected output. @@ -501,21 +535,13 @@ class ExpectTestCase (PexpectTestCase.PexpectTestCase): def noop(x, y): pass signal.signal(signal.SIGALRM, noop) - - p1 = pexpect.spawn('%s sleep_for.py 2' % self.PYTHONBIN) - p1.expect('READY', timeout=10) + + p1 = pexpect.spawn('%s sleep_for.py 2' % self.PYTHONBIN, timeout=5) + p1.expect('READY') signal.alarm(1) - p1.expect('END', timeout=10) + p1.expect('END') if __name__ == '__main__': unittest.main() suite = unittest.makeSuite(ExpectTestCase,'test') - -#fout = open('delete_me_1','wb') -#fout.write(the_old_way) -#fout.close -#fout = open('delete_me_2', 'wb') -#fout.write(the_new_way) -#fout.close - diff --git a/tests/test_isalive.py b/tests/test_isalive.py index cbb474d..5168a52 100755 --- a/tests/test_isalive.py +++ b/tests/test_isalive.py @@ -31,23 +31,16 @@ class IsAliveTestCase(PexpectTestCase.PexpectTestCase): '''This tests that calling wait on a finished process works as expected. ''' p = pexpect.spawn('sleep 3') - if not p.isalive(): - self.fail ('Child process is not alive. It should be.') - time.sleep(1) + assert p.isalive() p.wait() - if p.isalive(): - self.fail ('Child process is not dead. It should be.') + assert not p.isalive() + p = pexpect.spawn('sleep 3') - if not p.isalive(): - self.fail ('Child process is not alive. It should be.') + assert p.isalive() p.kill(9) time.sleep(1) - try: + with self.assertRaises(pexpect.ExceptionPexpect): p.wait() - except pexpect.ExceptionPexpect: - pass - else: - self.fail ('Should have raised ExceptionPython because you can\'t call wait on a dead process.') def test_signal_wait(self): '''Test calling wait with a process terminated by a signal.''' @@ -55,64 +48,64 @@ class IsAliveTestCase(PexpectTestCase.PexpectTestCase): return 'SKIP' p = pexpect.spawn(sys.executable, ['alarm_die.py']) p.wait() - assert p.exitstatus is None, p.exitstatus + assert p.exitstatus is None self.assertEqual(p.signalstatus, signal.SIGALRM) def test_expect_isalive_dead_after_normal_termination (self): - p = pexpect.spawn('ls') + p = pexpect.spawn('ls', timeout=15) p.expect(pexpect.EOF) - time.sleep(1) # allow kernel status time to catch up with state. - if p.isalive(): - self.fail ('Child process is not dead. It should be.') + assert not p.isalive() - def test_expect_isalive_dead_after_SIGINT (self): + def test_expect_isalive_dead_after_SIGHUP(self): + p = pexpect.spawn('cat', timeout=5, ignore_sighup=False) + assert p.isalive() + force = False + if sys.platform.lower().startswith('sunos'): + # On Solaris (SmartOs), and only when executed from cron(1), SIGKILL + # is required to end the sub-process. This is done using force=True + force = True + assert p.terminate(force) == True + p.expect(pexpect.EOF) + assert not p.isalive() + + def test_expect_isalive_dead_after_SIGINT(self): p = pexpect.spawn('cat', timeout=5) - if not p.isalive(): - self.fail ('Child process is not alive. It should be.') - p.terminate() - # Solaris is kind of slow. - # Without this delay then p.expect(...) will not see - # that the process is dead and it will timeout. - time.sleep(1) + assert p.isalive() + force = False + if sys.platform.lower().startswith('sunos'): + # On Solaris (SmartOs), and only when executed from cron(1), SIGKILL + # is required to end the sub-process. This is done using force=True + force = True + assert p.terminate(force) == True p.expect(pexpect.EOF) - if p.isalive(): - self.fail ('Child process is not dead. It should be.') + assert not p.isalive() - def test_expect_isalive_dead_after_SIGKILL (self): - p = pexpect.spawn('cat', timeout=3) - if not p.isalive(): - self.fail ('Child process is not alive. It should be.') + def test_expect_isalive_dead_after_SIGKILL(self): + p = pexpect.spawn('cat', timeout=5) + assert p.isalive() p.kill(9) - # Solaris is kind of slow. - # Without this delay then p.expect(...) will not see - # that the process is dead and it will timeout. - time.sleep(1) p.expect(pexpect.EOF) - if p.isalive(): - self.fail ('Child process is not dead. It should be.') + assert not p.isalive() def test_forced_terminate(self): p = pexpect.spawn(sys.executable, ['needs_kill.py']) p.expect('READY') - res = p.terminate(force=True) - assert res, res + assert p.terminate(force=True) == True + p.expect(pexpect.EOF) + assert not p.isalive() ### Some platforms allow this. Some reset status after call to waitpid. +### probably not necessary, isalive() returns early when terminate is False. def test_expect_isalive_consistent_multiple_calls (self): '''This tests that multiple calls to isalive() return same value. ''' - p = pexpect.spawn('cat') - if not p.isalive(): - self.fail ('Child process is not alive. It should be.') - if not p.isalive(): - self.fail ('Second call. Child process is not alive. It should be.') + assert p.isalive() + assert p.isalive() p.kill(9) p.expect(pexpect.EOF) - if p.isalive(): - self.fail ('Child process is not dead. It should be.') - if p.isalive(): - self.fail ('Second call. Child process is not dead. It should be.') + assert not p.isalive() + assert not p.isalive() if __name__ == '__main__': unittest.main() diff --git a/tests/test_misc.py b/tests/test_misc.py index a052fe5..cb82a09 100755 --- a/tests/test_misc.py +++ b/tests/test_misc.py @@ -25,6 +25,7 @@ import sys import re import signal import time +import os # the program cat(1) may display ^D\x08\x08 when \x04 (EOF, Ctrl-D) is sent _CAT_EOF = b'^D\x08\x08' @@ -33,7 +34,11 @@ class TestCaseMisc(PexpectTestCase.PexpectTestCase): def test_isatty (self): child = pexpect.spawn('cat') - assert child.isatty(), "Not returning True. Should always be True." + if not child.isatty() and sys.platform.lower().startswith('sunos'): + if hasattr(unittest, 'SkipTest'): + raise unittest.SkipTest("Not supported on this platform.") + return 'skip' + assert child.isatty() def test_read (self): child = pexpect.spawn('cat') @@ -102,7 +107,7 @@ class TestCaseMisc(PexpectTestCase.PexpectTestCase): assert (page == b'abc\r\nabc\r\n123\r\n123\r\n' or page == b'abc\r\n123\r\nabc\r\n123\r\n' or page == b'abc\r\n123abc\r\n\r\n123\r\n') , \ - "iterator did not work. page=%r"(page,) + "iterator did not work. page=%r" % (page,) def test_readlines(self): '''Note that on some slow or heavily loaded systems that the lines @@ -208,34 +213,28 @@ class TestCaseMisc(PexpectTestCase.PexpectTestCase): else: self.fail ("child.isalive() should have raised a pexpect.ExceptionPexpect") child.terminated = 1 # Force back to valid state so __del__ won't complain + def test_bad_arguments (self): '''This tests that we get a graceful error when passing bad arguments.''' - try: - p = pexpect.spawn(1) - except pexpect.ExceptionPexpect: - pass - else: - self.fail ("pexpect.spawn(1) should have raised a pexpect.ExceptionPexpect.") - try: - p = pexpect.spawn('ls', '-la') # should really use pexpect.spawn('ls', ['-ls']) - except TypeError: - pass - else: - self.fail ("pexpect.spawn('ls', '-la') should have raised a TypeError.") - try: - p = pexpect.spawn('cat') + with self.assertRaises(pexpect.ExceptionPexpect): + pexpect.spawn(1) + + with self.assertRaises(TypeError): + # should use pexpect.spawn('ls', ['-ls']) + pexpect.spawn('ls', '-la') + + with self.assertRaises(ValueError): + p = pexpect.spawn('cat', timeout=5) p.close() p.read_nonblocking(size=1, timeout=3) - except ValueError: - pass - else: - self.fail ("read_nonblocking on closed spawn object should have raised a ValueError.") + def test_isalive(self): child = pexpect.spawn('cat') assert child.isalive(), child.isalive() child.sendeof() child.expect(pexpect.EOF) assert not child.isalive(), child.isalive() + def test_bad_type_in_expect(self): child = pexpect.spawn('cat') try: diff --git a/tests/test_performance.py b/tests/test_performance.py index 163e4f2..7be0cf6 100755 --- a/tests/test_performance.py +++ b/tests/test_performance.py @@ -31,7 +31,7 @@ class PerformanceTestCase (PexpectTestCase.PexpectTestCase): '''Testing the performance of expect, with emphasis on wading through long inputs. ''' - + if sys.version_info[0] >= 3: @staticmethod def _iter_n(n): @@ -41,10 +41,10 @@ class PerformanceTestCase (PexpectTestCase.PexpectTestCase): else: @staticmethod def _iter_n(n): - return 'for n in range(1, %d+1): print(n)' % n + return 'for n in range(1, %d+1): print(n)' % n def plain_range(self, n): - e = pexpect.spawn('python') + e = pexpect.spawn('python', timeout=100) self.assertEqual(e.expect(b'>>>'), 0) e.sendline(self._iter_n(n)) self.assertEqual(e.expect(br'\.{3}'), 0) @@ -52,7 +52,7 @@ class PerformanceTestCase (PexpectTestCase.PexpectTestCase): self.assertEqual(e.expect([b'inquisition', '%d' % n]), 1) def window_range(self, n): - e = pexpect.spawn('python') + e = pexpect.spawn('python', timeout=100) self.assertEqual(e.expect(b'>>>'), 0) e.sendline(self._iter_n(n)) self.assertEqual(e.expect(r'\.{3}'), 0) @@ -60,7 +60,7 @@ class PerformanceTestCase (PexpectTestCase.PexpectTestCase): self.assertEqual(e.expect([b'inquisition', '%d' % n], searchwindowsize=20), 1) def exact_range(self, n): - e = pexpect.spawn('python') + e = pexpect.spawn('python', timeout=100) self.assertEqual(e.expect_exact([b'>>>']), 0) e.sendline(self._iter_n(n)) self.assertEqual(e.expect_exact([b'...']), 0) @@ -68,7 +68,7 @@ class PerformanceTestCase (PexpectTestCase.PexpectTestCase): self.assertEqual(e.expect_exact([b'inquisition', '%d' % n],timeout=520), 1) def ewin_range(self, n): - e = pexpect.spawn('python') + e = pexpect.spawn('python', timeout=100) self.assertEqual(e.expect_exact([b'>>>']), 0) e.sendline(self._iter_n(n)) self.assertEqual(e.expect_exact([b'...']), 0) @@ -76,7 +76,7 @@ class PerformanceTestCase (PexpectTestCase.PexpectTestCase): self.assertEqual(e.expect_exact([b'inquisition', '%d' % n], searchwindowsize=20), 1) def faster_range(self, n): - e = pexpect.spawn('python') + e = pexpect.spawn('python', timeout=100) self.assertEqual(e.expect(b'>>>'), 0) e.sendline(('list(range(1, %d+1))' % n).encode('ascii')) self.assertEqual(e.expect([b'inquisition', '%d' % n]), 1) diff --git a/tests/test_replwrap.py b/tests/test_replwrap.py index a6ea956..d7aa049 100644 --- a/tests/test_replwrap.py +++ b/tests/test_replwrap.py @@ -43,7 +43,7 @@ class REPLWrapTestCase(unittest.TestCase): self.assertEqual(res.strip().splitlines(), ['1 2', '3 4']) def test_existing_spawn(self): - child = pexpect.spawnu("bash", timeout=5) + child = pexpect.spawnu("bash", timeout=5, echo=False) repl = replwrap.REPLWrapper(child, re.compile('[$#]'), "PS1='{0}' PS2='{1}' " "PROMPT_COMMAND=''") @@ -66,7 +66,7 @@ class REPLWrapTestCase(unittest.TestCase): if platform.python_implementation() == 'PyPy': raise unittest.SkipTest("This test fails on PyPy because of REPL differences") - child = pexpect.spawnu('python', timeout=5) + child = pexpect.spawnu('python', echo=False, timeout=5) # prompt_change=None should mean no prompt change py = replwrap.REPLWrapper(child, replwrap.u(">>> "), prompt_change=None, continuation_prompt=replwrap.u("... ")) diff --git a/tests/test_unicode.py b/tests/test_unicode.py index 256057d..85a2100 100644 --- a/tests/test_unicode.py +++ b/tests/test_unicode.py @@ -3,6 +3,7 @@ from __future__ import unicode_literals import platform import tempfile +import sys import pexpect import unittest @@ -34,33 +35,60 @@ class UnicodeTests(PexpectTestCase.PexpectTestCase): p.sendeof() p.expect_exact (pexpect.EOF) - def test_expect_echo (self): - '''This tests that echo can be turned on and off. + def test_expect_setecho_toggle(self): + '''This tests that echo may be toggled off. ''' - p = pexpect.spawnu('cat', timeout=10) - self._expect_echo(p) + p = pexpect.spawnu('cat', timeout=5) + try: + self._expect_echo_toggle_off(p) + except IOError: + if sys.platform.lower().startswith('sunos'): + if hasattr(unittest, 'SkipTest'): + raise unittest.SkipTest("Not supported on this platform.") + return 'skip' + raise + self._expect_echo_toggle_on(p) def test_expect_echo_exact (self): '''Like test_expect_echo(), but using expect_exact(). ''' - p = pexpect.spawnu('cat', timeout=10) + p = pexpect.spawnu('cat', timeout=5) p.expect = p.expect_exact self._expect_echo(p) + def test_expect_setecho_toggle_exact(self): + p = pexpect.spawnu('cat', timeout=5) + p.expect = p.expect_exact + try: + self._expect_echo_toggle_off(p) + except IOError: + if sys.platform.lower().startswith('sunos'): + if hasattr(unittest, 'SkipTest'): + raise unittest.SkipTest("Not supported on this platform.") + return 'skip' + raise + self._expect_echo_toggle_on(p) + def _expect_echo (self, p): p.sendline('1234') # Should see this twice (once from tty echo and again from cat). index = p.expect (['1234', 'abcdé', 'wxyz', pexpect.EOF, pexpect.TIMEOUT]) assert index == 0, (index, p.before) index = p.expect (['1234', 'abcdé', 'wxyz', pexpect.EOF]) assert index == 0, index + + def _expect_echo_toggle_off(self, p): p.setecho(0) # Turn off tty echo + p.waitnoecho() p.sendline('abcdé') # Now, should only see this once. p.sendline('wxyz') # Should also be only once. index = p.expect ([pexpect.EOF,pexpect.TIMEOUT, 'abcdé', 'wxyz', '1234']) assert index == 2, index index = p.expect ([pexpect.EOF, 'abcdé', 'wxyz', '7890']) assert index == 2, index + + def _expect_echo_toggle_on(self, p): p.setecho(1) # Turn on tty echo + time.sleep(0.2) # there is no waitecho() ! p.sendline('7890') # Should see this twice. index = p.expect ([pexpect.EOF, 'abcdé', 'wxyz', '7890']) assert index == 3, index -- cgit v1.2.1 From 0fe779fbb0db8f3ea4cb8198db526378902121c4 Mon Sep 17 00:00:00 2001 From: jquast Date: Sun, 8 Jun 2014 01:19:24 -0700 Subject: Missing time import -- there is no 'waitecho' function although I am against most time.sleep()s and trying my best to remove them where possible, there is no opposing waitnoecho() function (perhaps we should supply one?). --- tests/test_unicode.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/test_unicode.py b/tests/test_unicode.py index 85a2100..f342bf9 100644 --- a/tests/test_unicode.py +++ b/tests/test_unicode.py @@ -4,6 +4,7 @@ from __future__ import unicode_literals import platform import tempfile import sys +import time import pexpect import unittest -- cgit v1.2.1 From d0fd37033e1a48bff38399394604a653cd0f3baa Mon Sep 17 00:00:00 2001 From: jquast Date: Sun, 8 Jun 2014 01:25:13 -0700 Subject: Another "as err" vs ", err" exception fix. as well as another indent-by-4 fix --- pexpect/__init__.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/pexpect/__init__.py b/pexpect/__init__.py index c7411cb..1442aa8 100644 --- a/pexpect/__init__.py +++ b/pexpect/__init__.py @@ -731,11 +731,11 @@ class spawn(object): try: child_name = os.ttyname(tty_fd) break - except OSError, err: - tries += 1 - if tries > _max_tries: - raise - time.sleep(_poll) + except OSError: + tries += 1 + if tries > _max_tries: + raise + time.sleep(_poll) # Disconnect from controlling tty. Harmless if not already connected. try: -- cgit v1.2.1 From 0ffc3142adbeb189e2f2ffc2dac45501fc3c0e57 Mon Sep 17 00:00:00 2001 From: Thomas Kluyver Date: Sun, 15 Jun 2014 19:26:42 -0700 Subject: Implement assertRaises and assertRaisesRegexp context managers for Python 2.6 --- tests/PexpectTestCase.py | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/tests/PexpectTestCase.py b/tests/PexpectTestCase.py index 7ec7e53..7a9574e 100644 --- a/tests/PexpectTestCase.py +++ b/tests/PexpectTestCase.py @@ -20,6 +20,7 @@ PEXPECT LICENSE ''' from __future__ import print_function +import contextlib import unittest import sys import os @@ -40,3 +41,26 @@ class PexpectTestCase(unittest.TestCase): def tearDown(self): os.chdir (self.original_path) + if sys.version_info < (2,7): + # We want to use these methods, which are new/improved in 2.7, but + # we are still supporting 2.6 for the moment. This section can be + # removed when we drop Python 2.6 support. + @contextlib.contextmanager + def assertRaises(self, excClass): + try: + yield + except Exception as e: + assert isinstance(e, excClass) + else: + raise AssertionError("%s was not raised" % excClass) + + @contextlib.contextmanager + def assertRaisesRegexp(self, excClass, pattern): + import re + try: + yield + except Exception as e: + assert isinstance(e, excClass) + assert re.match(pattern, str(e)) + else: + raise AssertionError("%s was not raised" % excClass) -- cgit v1.2.1 From 732984f41997fb95333d70041d10138d06c196d6 Mon Sep 17 00:00:00 2001 From: Thomas Kluyver Date: Sun, 15 Jun 2014 19:30:59 -0700 Subject: Miscellaneous minor fixes --- tests/test_expect.py | 1 + tests/test_misc.py | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/test_expect.py b/tests/test_expect.py index da5214c..8ccb9c5 100755 --- a/tests/test_expect.py +++ b/tests/test_expect.py @@ -170,6 +170,7 @@ class ExpectTestCase (PexpectTestCase.PexpectTestCase): if hasattr(unittest, 'SkipTest'): raise unittest.SkipTest("Not supported on this platform.") return 'skip' + raise def test_expect_setecho_off_exact(self): p = pexpect.spawn('cat', echo=True, timeout=5) diff --git a/tests/test_misc.py b/tests/test_misc.py index cb82a09..81cc6ff 100755 --- a/tests/test_misc.py +++ b/tests/test_misc.py @@ -220,7 +220,7 @@ class TestCaseMisc(PexpectTestCase.PexpectTestCase): pexpect.spawn(1) with self.assertRaises(TypeError): - # should use pexpect.spawn('ls', ['-ls']) + # should use pexpect.spawn('ls', ['-la']) pexpect.spawn('ls', '-la') with self.assertRaises(ValueError): -- cgit v1.2.1 From a21580e775c577843071b83232232b7a604b6e2d Mon Sep 17 00:00:00 2001 From: Jeff Quast Date: Tue, 24 Jun 2014 20:44:47 +0000 Subject: Refactor exceptions in __pty_make_controlling_tty Make very clear which exceptions we expect, where, and why. I've done exaustive testing from within cron(1) and without. I've been unable to reproduce the previously discovered os.ttyname issue, so that exception handling loop has been removed. Also, resolves the ENXIO reference (thanks TK!). Pushing to test this on OSX and later cygwin with the non_native pty fork test --- pexpect/__init__.py | 56 ++++++++++++++++------------------------------------- 1 file changed, 17 insertions(+), 39 deletions(-) diff --git a/pexpect/__init__.py b/pexpect/__init__.py index 1442aa8..a468667 100644 --- a/pexpect/__init__.py +++ b/pexpect/__init__.py @@ -719,59 +719,37 @@ class spawn(object): more portable than the pty.fork() function. Specifically, this should work on Solaris. ''' - # os.ttyname() fails for the child process under a rare timing - # condition (about 1 in 10 on SmartOs in a VMWare machine). May - # raise OSError as errno ENOTTY -- ptsname(3C) should be used, - # on this system, but python does not expose it, it would require - # ctypes + libc. which is also not available, filed patch as - # http://bugs.python.org/issue20664 - child_name, tries = None, 0 - _poll, _max_tries= 0.1, 10 - while child_name is None: - try: - child_name = os.ttyname(tty_fd) - break - except OSError: - tries += 1 - if tries > _max_tries: - raise - time.sleep(_poll) + child_name = os.ttyname(tty_fd) - # Disconnect from controlling tty. Harmless if not already connected. + # Disconnect from controlling tty, if any. Raises OSError of ENXIO + # if there was no controlling tty to begin with, such as when + # executed by a cron(1) job. try: fd = os.open("/dev/tty", os.O_RDWR | os.O_NOCTTY) os.close(fd) - except OSError: - # Already disconnected. This happens if running inside cron. - pass + except OSError, err: + if err.errno != errno.ENXIO: + raise os.setsid() - # Verify we are disconnected from controlling tty - # by attempting to open it again. + # Verify we are disconnected from controlling tty by attempting to open + # it again. We expect that OSError of ENXIO should always be raised. try: fd = os.open("/dev/tty", os.O_RDWR | os.O_NOCTTY) os.close(fd) - except OSError: - # Good! We are disconnected from a controlling tty. - pass + raise ExceptionPexpect("OSError of errno.ENXIO should be raised.") + except OSError, err: + if err.errno != errno.ENXIO: + raise # Verify we can open child pty. fd = os.open(child_name, os.O_RDWR) - if fd < 0: - raise ExceptionPexpect("Could not open child pty, " + child_name) - else: - os.close(fd) + os.close(fd) # Verify we now have a controlling tty. - try: - fd = os.open("/dev/tty", os.O_WRONLY) - os.close(fd) - except OSError as err: - if err.args[0] == ENXIO: - # on Solaris, `/dev/tty' raises OSError upon opening, though - # it does exist: /dev/tty -> ../devices/pseudo/sy@0:tty - pass + fd = os.open("/dev/tty", os.O_WRONLY) + os.close(fd) def fileno(self): @@ -2141,4 +2119,4 @@ def split_command_line(command_line): arg_list.append(arg) return arg_list -# vi:set sr et ts=4 sw=4 ft=python : +# vim: set shiftround expandtab tabstop=4 shiftwidth=4 ft=python autoindent : -- cgit v1.2.1 From 91d17a7192fe80879474587d7f1b7610d31bf782 Mon Sep 17 00:00:00 2001 From: Jeff Quast Date: Tue, 24 Jun 2014 21:23:52 +0000 Subject: Provide example of SRV4-like systems --- pexpect/__init__.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/pexpect/__init__.py b/pexpect/__init__.py index a468667..468c4c2 100644 --- a/pexpect/__init__.py +++ b/pexpect/__init__.py @@ -786,9 +786,10 @@ class spawn(object): '''This returns True if the file descriptor is open and connected to a tty(-like) device, else False. - On SRV4-style platforms implementing streams, the child pty does not - appear as a terminal device. This means methods such as setecho(), - setwinsize(), getwinsize() may raise an IOError. ''' + On SVR4-style platforms implementing streams, such as SunOS and HP-UX, + the child pty may not appear as a terminal device. This means + methods such as setecho(), setwinsize(), getwinsize() may raise an + IOError. ''' return os.isatty(self.child_fd) -- cgit v1.2.1 From 995f9daf88387523129577048a6ada3de5fecfa1 Mon Sep 17 00:00:00 2001 From: Jeff Quast Date: Tue, 24 Jun 2014 21:27:07 +0000 Subject: Use const child._VINTR instead of '3' --- tests/test_ctrl_chars.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_ctrl_chars.py b/tests/test_ctrl_chars.py index d84f01b..7c9c372 100755 --- a/tests/test_ctrl_chars.py +++ b/tests/test_ctrl_chars.py @@ -54,7 +54,7 @@ class TestCtrlChars(PexpectTestCase.PexpectTestCase): child = pexpect.spawn('python getch.py', echo=False, timeout=5) child.expect('READY') child.sendintr() - child.expect('3') + child.expect(str(child._VINTR) + '') child.send(byte(0)) child.expect('0') -- cgit v1.2.1 From 3a5a8a6cbc2d64a1a9561a52b30d6f8c7be28bb4 Mon Sep 17 00:00:00 2001 From: jquast Date: Tue, 24 Jun 2014 14:34:54 -0700 Subject: note using py.test -- specify 'tests' folder On Solaris and OSX, when running a bare 'py.test', it locks up indefinitely after line, 'collecting 0 items'. ^C also does not respond, so I'm not able to discern exactly where the lockup is -- regardless, if you specify the 'tests' folder, it issues fine. --- DEVELOPERS.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/DEVELOPERS.rst b/DEVELOPERS.rst index d5e4475..bf2bb9f 100644 --- a/DEVELOPERS.rst +++ b/DEVELOPERS.rst @@ -1,6 +1,6 @@ To run the tests, use `py.test `_:: - py.test + py.test tests The tests are all located in the tests/ directory. To add a new unit test all you have to do is create the file in the tests/ directory with a -- cgit v1.2.1 From 033393ad5ee629fff9ba2e4930931d85bac8ad58 Mon Sep 17 00:00:00 2001 From: Thomas Kluyver Date: Tue, 24 Jun 2014 14:35:03 -0700 Subject: Fix up except syntax again --- pexpect/__init__.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pexpect/__init__.py b/pexpect/__init__.py index a468667..b0800d8 100644 --- a/pexpect/__init__.py +++ b/pexpect/__init__.py @@ -727,7 +727,7 @@ class spawn(object): try: fd = os.open("/dev/tty", os.O_RDWR | os.O_NOCTTY) os.close(fd) - except OSError, err: + except OSError as err: if err.errno != errno.ENXIO: raise @@ -739,7 +739,7 @@ class spawn(object): fd = os.open("/dev/tty", os.O_RDWR | os.O_NOCTTY) os.close(fd) raise ExceptionPexpect("OSError of errno.ENXIO should be raised.") - except OSError, err: + except OSError as err: if err.errno != errno.ENXIO: raise -- cgit v1.2.1 From 84b836b7e6d40a4af231321966d6bebb1efe32cf Mon Sep 17 00:00:00 2001 From: jquast Date: Tue, 24 Jun 2014 15:08:34 -0700 Subject: gah; fix constant VINTR -> INTR --- tests/test_ctrl_chars.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_ctrl_chars.py b/tests/test_ctrl_chars.py index 7c9c372..9c7b869 100755 --- a/tests/test_ctrl_chars.py +++ b/tests/test_ctrl_chars.py @@ -54,7 +54,7 @@ class TestCtrlChars(PexpectTestCase.PexpectTestCase): child = pexpect.spawn('python getch.py', echo=False, timeout=5) child.expect('READY') child.sendintr() - child.expect(str(child._VINTR) + '') + child.expect(str(child._INTR) + '') child.send(byte(0)) child.expect('0') -- cgit v1.2.1 From eeaa7f8b839154415787896705bacef85c1a0e62 Mon Sep 17 00:00:00 2001 From: jquast Date: Tue, 24 Jun 2014 16:43:25 -0700 Subject: SRV4 -> SVR4 see https://en.wikipedia.org/wiki/SVR4#SVR4 --- doc/history.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/doc/history.rst b/doc/history.rst index 313b9ff..96c979e 100644 --- a/doc/history.rst +++ b/doc/history.rst @@ -21,7 +21,7 @@ Version 3.3 Solaris (:ghissue:`49`). * Several Solaris (SmartOS) bugfixes, preventing IOError exceptions, especially when used with cron(1) (:ghissue:`44`). -* Added new keyword argument ``echo=True`` for ``spawn()``. On SRV4-like +* Added new keyword argument ``echo=True`` for ``spawn()``. On SVR4-like systems, the method ``isatty()`` will always return *False*: the child pty does not appear as a terminal. Therefore, ``setecho()``, ``getwinsize()``, ``setwinsize()``, and ``waitnoecho()`` are not supported on those platforms. -- cgit v1.2.1