summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorThomas Kluyver <takowl@gmail.com>2014-06-24 16:56:55 -0700
committerThomas Kluyver <takowl@gmail.com>2014-06-24 16:56:55 -0700
commit1515457b2ffe262bfac7d606ffbdef50d591f97e (patch)
tree25a3b6e4d0f03cbbd317efc43d3ee718de8193b5
parent9716ea092af443f706c40ce409219b6f7c714013 (diff)
parenteeaa7f8b839154415787896705bacef85c1a0e62 (diff)
downloadpexpect-1515457b2ffe262bfac7d606ffbdef50d591f97e.tar.gz
Merge pull request #73 from pexpect/issue-44-solaris-try-3
Solaris support regarding setecho/setwinsize, new argument echo=False
-rw-r--r--DEVELOPERS.rst2
-rw-r--r--doc/history.rst10
-rw-r--r--pexpect/__init__.py211
-rw-r--r--pexpect/replwrap.py9
-rw-r--r--tests/PexpectTestCase.py24
-rwxr-xr-xtests/getch.py2
-rwxr-xr-xtests/test_ctrl_chars.py100
-rwxr-xr-xtests/test_expect.py163
-rwxr-xr-xtests/test_isalive.py89
-rwxr-xr-xtests/test_misc.py39
-rwxr-xr-xtests/test_performance.py14
-rw-r--r--tests/test_replwrap.py4
-rw-r--r--tests/test_unicode.py39
13 files changed, 417 insertions, 289 deletions
diff --git a/DEVELOPERS.rst b/DEVELOPERS.rst
index d5e4475..bf2bb9f 100644
--- a/DEVELOPERS.rst
+++ b/DEVELOPERS.rst
@@ -1,6 +1,6 @@
To run the tests, use `py.test <http://pytest.org/latest/>`_::
- py.test
+ py.test tests
The tests are all located in the tests/ directory. To add a new unit
test all you have to do is create the file in the tests/ directory with a
diff --git a/doc/history.rst b/doc/history.rst
index 2b924a9..96c979e 100644
--- a/doc/history.rst
+++ b/doc/history.rst
@@ -19,6 +19,12 @@ Version 3.3
* Fixed issue where EOF was not correctly detected in ``interact()``, causing
a repeating loop of output on Linux, and blocking before EOF on BSD and
Solaris (:ghissue:`49`).
+* Several Solaris (SmartOS) bugfixes, preventing IOError exceptions, especially
+ when used with cron(1) (:ghissue:`44`).
+* Added new keyword argument ``echo=True`` for ``spawn()``. On SVR4-like
+ systems, the method ``isatty()`` will always return *False*: the child pty
+ does not appear as a terminal. Therefore, ``setecho()``, ``getwinsize()``,
+ ``setwinsize()``, and ``waitnoecho()`` are not supported on those platforms.
Version 3.2
```````````
@@ -115,11 +121,11 @@ Version 2.3
consistently on different platforms. Solaris is the most difficult to support.
* You can now put ``TIMEOUT`` in a list of expected patterns. This is just like
putting ``EOF`` in the pattern list. Expecting for a ``TIMEOUT`` may not be
- used as often as ``EOF``, but this makes Pexpect more consitent.
+ used as often as ``EOF``, but this makes Pexpect more consistent.
* Thanks to a suggestion and sample code from Chad J. Schroeder I added the ability
for Pexpect to operate on a file descriptor that is already open. This means that
Pexpect can be used to control streams such as those from serial port devices. Now,
- you just pass the integer file descriptor as the "command" when contsructing a
+ you just pass the integer file descriptor as the "command" when constructing a
spawn open. For example on a Linux box with a modem on ttyS1::
fd = os.open("/dev/ttyS1", os.O_RDWR|os.O_NONBLOCK|os.O_NOCTTY)
diff --git a/pexpect/__init__.py b/pexpect/__init__.py
index 23eef39..30c6764 100644
--- a/pexpect/__init__.py
+++ b/pexpect/__init__.py
@@ -305,7 +305,7 @@ class spawn(object):
def __init__(self, command, args=[], timeout=30, maxread=2000,
searchwindowsize=None, logfile=None, cwd=None, env=None,
- ignore_sighup=True):
+ ignore_sighup=True, echo=True):
'''This is the constructor. The command parameter may be a string that
includes a command and any arguments to the command. For example::
@@ -418,7 +418,16 @@ class spawn(object):
signalstatus will store the signal value and exitstatus will be None.
If you need more detail you can also read the self.status member which
stores the status returned by os.waitpid. You can interpret this using
- os.WIFEXITED/os.WEXITSTATUS or os.WIFSIGNALED/os.TERMSIG. '''
+ os.WIFEXITED/os.WEXITSTATUS or os.WIFSIGNALED/os.TERMSIG.
+
+ The echo attribute may be set to False to disable echoing of input.
+ As a pseudo-terminal, all input echoed by the "keyboard" (send()
+ or sendline()) will be repeated to output. For many cases, it is
+ not desirable to have echo enabled, and it may be later disabled
+ using setecho(False) followed by waitnoecho(). However, for some
+ platforms such as Solaris, this is not possible, and should be
+ disabled immediately on spawn.
+ '''
self.STDIN_FILENO = pty.STDIN_FILENO
self.STDOUT_FILENO = pty.STDOUT_FILENO
@@ -440,7 +449,7 @@ class spawn(object):
self.status = None
self.flag_eof = False
self.pid = None
- # the chile filedescriptor is initially closed
+ # the child file descriptor is initially closed
self.child_fd = -1
self.timeout = timeout
self.delimiter = EOF
@@ -469,16 +478,30 @@ class spawn(object):
self.closed = True
self.cwd = cwd
self.env = env
+ self.echo = echo
self.ignore_sighup = ignore_sighup
+ _platform = sys.platform.lower()
# This flags if we are running on irix
- self.__irix_hack = (sys.platform.lower().find('irix') >= 0)
+ self.__irix_hack = _platform.startswith('irix')
# Solaris uses internal __fork_pty(). All others use pty.fork().
- if ((sys.platform.lower().find('solaris') >= 0)
- or (sys.platform.lower().find('sunos5') >= 0)):
- self.use_native_pty_fork = False
- else:
- self.use_native_pty_fork = True
-
+ self.use_native_pty_fork = not (
+ _platform.startswith('solaris') or
+ _platform.startswith('sunos'))
+ # inherit EOF and INTR definitions from controlling process.
+ try:
+ from termios import VEOF, VINTR
+ fd = sys.__stdin__.fileno()
+ self._INTR = ord(termios.tcgetattr(fd)[6][VINTR])
+ self._EOF = ord(termios.tcgetattr(fd)[6][VEOF])
+ except (ImportError, OSError, IOError, termios.error):
+ # unless the controlling process is also not a terminal,
+ # such as cron(1). Fall-back to using CEOF and CINTR.
+ try:
+ from termios import CEOF, CINTR
+ (self._INTR, self._EOF) = (CINTR, CEOF)
+ except ImportError:
+ # ^C, ^D
+ (self._INTR, self._EOF) = (3, 4)
# Support subclasses that do not use command or args.
if command is None:
self.command = None
@@ -609,26 +632,32 @@ class spawn(object):
# Use internal __fork_pty
self.pid, self.child_fd = self.__fork_pty()
- if self.pid == 0:
+ # Some platforms must call setwinsize() and setecho() from the
+ # child process, and others from the master process. We do both,
+ # allowing IOError for either.
+
+ if self.pid == pty.CHILD:
# Child
+ self.child_fd = self.STDIN_FILENO
+
+ # set default window size of 24 rows by 80 columns
try:
- # used by setwinsize()
- self.child_fd = sys.stdout.fileno()
self.setwinsize(24, 80)
- # which exception, shouldnt' we catch explicitly .. ?
- except:
- # Some platforms do not like setwinsize (Cygwin).
- # This will cause problem when running applications that
- # are very picky about window size.
- # This is a serious limitation, but not a show stopper.
- pass
+ except IOError as err:
+ if err.args[0] not in (errno.EINVAL, errno.ENOTTY):
+ raise
+
+ # disable echo if spawn argument echo was unset
+ if not self.echo:
+ try:
+ self.setecho(self.echo)
+ except (IOError, termios.error) as err:
+ if err.args[0] not in (errno.EINVAL, errno.ENOTTY):
+ raise
+
# Do not allow child to inherit open file descriptors from parent.
max_fd = resource.getrlimit(resource.RLIMIT_NOFILE)[0]
- for i in range(3, max_fd):
- try:
- os.close(i)
- except OSError:
- pass
+ os.closerange(3, max_fd)
if self.ignore_sighup:
signal.signal(signal.SIGHUP, signal.SIG_IGN)
@@ -641,6 +670,13 @@ class spawn(object):
os.execvpe(self.command, self.args, self.env)
# Parent
+ try:
+ self.setwinsize(24, 80)
+ except IOError as err:
+ if err.args[0] not in (errno.EINVAL, errno.ENOTTY):
+ raise
+
+
self.terminated = False
self.closed = False
@@ -663,19 +699,15 @@ class spawn(object):
raise ExceptionPexpect("Could not open with os.openpty().")
pid = os.fork()
- if pid < 0: # pragma: no cover
- raise ExceptionPexpect("Failed os.fork().")
- elif pid == 0:
+ if pid == pty.CHILD:
# Child.
os.close(parent_fd)
self.__pty_make_controlling_tty(child_fd)
- os.dup2(child_fd, 0)
- os.dup2(child_fd, 1)
- os.dup2(child_fd, 2)
+ os.dup2(child_fd, self.STDIN_FILENO)
+ os.dup2(child_fd, self.STDOUT_FILENO)
+ os.dup2(child_fd, self.STDERR_FILENO)
- if child_fd > 2:
- os.close(child_fd)
else:
# Parent.
os.close(child_fd)
@@ -689,42 +721,36 @@ class spawn(object):
child_name = os.ttyname(tty_fd)
- # Disconnect from controlling tty. Harmless if not already connected.
+ # Disconnect from controlling tty, if any. Raises OSError of ENXIO
+ # if there was no controlling tty to begin with, such as when
+ # executed by a cron(1) job.
try:
fd = os.open("/dev/tty", os.O_RDWR | os.O_NOCTTY)
- if fd >= 0:
- os.close(fd)
- # which exception, shouldnt' we catch explicitly .. ?
- except:
- # Already disconnected. This happens if running inside cron.
- pass
+ os.close(fd)
+ except OSError as err:
+ if err.errno != errno.ENXIO:
+ raise
os.setsid()
- # Verify we are disconnected from controlling tty
- # by attempting to open it again.
+ # Verify we are disconnected from controlling tty by attempting to open
+ # it again. We expect that OSError of ENXIO should always be raised.
try:
fd = os.open("/dev/tty", os.O_RDWR | os.O_NOCTTY)
- if fd >= 0:
- os.close(fd)
- # which exception, shouldnt' we catch explicitly .. ?
- except:
- # Good! We are disconnected from a controlling tty.
- pass
+ os.close(fd)
+ raise ExceptionPexpect("OSError of errno.ENXIO should be raised.")
+ except OSError as err:
+ if err.errno != errno.ENXIO:
+ raise
# Verify we can open child pty.
fd = os.open(child_name, os.O_RDWR)
- if fd < 0:
- raise ExceptionPexpect("Could not open child pty, " + child_name)
- else:
- os.close(fd)
+ os.close(fd)
# Verify we now have a controlling tty.
fd = os.open("/dev/tty", os.O_WRONLY)
- if fd < 0:
- raise ExceptionPexpect("Could not open controlling tty, /dev/tty")
- else:
- os.close(fd)
+ os.close(fd)
+
def fileno(self):
'''This returns the file descriptor of the pty for the child.
@@ -758,7 +784,12 @@ class spawn(object):
def isatty(self):
'''This returns True if the file descriptor is open and connected to a
- tty(-like) device, else False. '''
+ tty(-like) device, else False.
+
+ On SVR4-style platforms implementing streams, such as SunOS and HP-UX,
+ the child pty may not appear as a terminal device. This means
+ methods such as setecho(), setwinsize(), getwinsize() may raise an
+ IOError. '''
return os.isatty(self.child_fd)
@@ -795,12 +826,20 @@ class spawn(object):
def getecho(self):
'''This returns the terminal echo mode. This returns True if echo is
on or False if echo is off. Child applications that are expecting you
- to enter a password often set ECHO False. See waitnoecho(). '''
+ to enter a password often set ECHO False. See waitnoecho().
- attr = termios.tcgetattr(self.child_fd)
- if attr[3] & termios.ECHO:
- return True
- return False
+ Not supported on platforms where ``isatty()`` returns False. '''
+
+ try:
+ attr = termios.tcgetattr(self.child_fd)
+ except termios.error as err:
+ errmsg = 'getecho() may not be called on this platform'
+ if err.args[0] == errno.EINVAL:
+ raise IOError(err.args[0], '%s: %s.' % (err.args[1], errmsg))
+ raise
+
+ self.echo = bool(attr[3] & termios.ECHO)
+ return self.echo
def setecho(self, state):
'''This sets the terminal echo mode on or off. Note that anything the
@@ -830,18 +869,35 @@ class spawn(object):
p.expect(['1234'])
p.expect(['abcd'])
p.expect(['wxyz'])
+
+
+ Not supported on platforms where ``isatty()`` returns False.
'''
- self.child_fd
- attr = termios.tcgetattr(self.child_fd)
+ errmsg = 'setecho() may not be called on this platform'
+
+ try:
+ attr = termios.tcgetattr(self.child_fd)
+ except termios.error as err:
+ if err.args[0] == errno.EINVAL:
+ raise IOError(err.args[0], '%s: %s.' % (err.args[1], errmsg))
+ raise
+
if state:
attr[3] = attr[3] | termios.ECHO
else:
attr[3] = attr[3] & ~termios.ECHO
- # I tried TCSADRAIN and TCSAFLUSH, but
- # these were inconsistent and blocked on some platforms.
- # TCSADRAIN would probably be ideal if it worked.
- termios.tcsetattr(self.child_fd, termios.TCSANOW, attr)
+
+ try:
+ # I tried TCSADRAIN and TCSAFLUSH, but these were inconsistent and
+ # blocked on some platforms. TCSADRAIN would probably be ideal.
+ termios.tcsetattr(self.child_fd, termios.TCSANOW, attr)
+ except IOError as err:
+ if err.args[0] == errno.EINVAL:
+ raise IOError(err.args[0], '%s: %s.' % (err.args[1], errmsg))
+ raise
+
+ self.echo = state
def _log(self, s, direction):
if self.logfile is not None:
@@ -1077,24 +1133,15 @@ class spawn(object):
called at the beginning of a line. This method does not send a newline.
It is the responsibility of the caller to ensure the eof is sent at the
beginning of a line. '''
- if hasattr(termios, 'VEOF'):
- char = ord(termios.tcgetattr(self.child_fd)[6][termios.VEOF])
- else:
- # platform does not define VEOF so assume CTRL-D
- char = 4
- self.send(self._chr(char))
+
+ self.send(self._chr(self._EOF))
def sendintr(self):
'''This sends a SIGINT to the child. It does not require
the SIGINT to be the first character on a line. '''
- if hasattr(termios, 'VINTR'):
- char = ord(termios.tcgetattr(self.child_fd)[6][termios.VINTR])
- else:
- # platform does not define VINTR so assume CTRL-C
- char = 3
- self.send(self._chr(char))
+ self.send(self._chr(self._INTR))
def eof(self):
@@ -1187,7 +1234,7 @@ class spawn(object):
if self.flag_eof:
# This is for Linux, which requires the blocking form
- # of waitpid to # get status of a defunct process.
+ # of waitpid to get the status of a defunct process.
# This is super-lame. The flag_eof would have been set
# in read_nonblocking(), so this should be safe.
waitpid_options = 0
@@ -2073,4 +2120,4 @@ def split_command_line(command_line):
arg_list.append(arg)
return arg_list
-# vi:set sr et ts=4 sw=4 ft=python :
+# vim: set shiftround expandtab tabstop=4 shiftwidth=4 ft=python autoindent :
diff --git a/pexpect/replwrap.py b/pexpect/replwrap.py
index af4f889..a36678c 100644
--- a/pexpect/replwrap.py
+++ b/pexpect/replwrap.py
@@ -34,11 +34,14 @@ class REPLWrapper(object):
new_prompt=PEXPECT_PROMPT,
continuation_prompt=PEXPECT_CONTINUATION_PROMPT):
if isinstance(cmd_or_spawn, str):
- self.child = pexpect.spawnu(cmd_or_spawn)
+ self.child = pexpect.spawnu(cmd_or_spawn, echo=False)
else:
self.child = cmd_or_spawn
- self.child.setecho(False) # Don't repeat our input.
- self.child.waitnoecho()
+ if self.child.echo:
+ # Existing spawn instance has echo enabled, disable it
+ # to prevent our input from being repeated to output.
+ self.child.setecho(False)
+ self.child.waitnoecho()
if prompt_change is None:
self.prompt = orig_prompt
diff --git a/tests/PexpectTestCase.py b/tests/PexpectTestCase.py
index 7ec7e53..7a9574e 100644
--- a/tests/PexpectTestCase.py
+++ b/tests/PexpectTestCase.py
@@ -20,6 +20,7 @@ PEXPECT LICENSE
'''
from __future__ import print_function
+import contextlib
import unittest
import sys
import os
@@ -40,3 +41,26 @@ class PexpectTestCase(unittest.TestCase):
def tearDown(self):
os.chdir (self.original_path)
+ if sys.version_info < (2,7):
+ # We want to use these methods, which are new/improved in 2.7, but
+ # we are still supporting 2.6 for the moment. This section can be
+ # removed when we drop Python 2.6 support.
+ @contextlib.contextmanager
+ def assertRaises(self, excClass):
+ try:
+ yield
+ except Exception as e:
+ assert isinstance(e, excClass)
+ else:
+ raise AssertionError("%s was not raised" % excClass)
+
+ @contextlib.contextmanager
+ def assertRaisesRegexp(self, excClass, pattern):
+ import re
+ try:
+ yield
+ except Exception as e:
+ assert isinstance(e, excClass)
+ assert re.match(pattern, str(e))
+ else:
+ raise AssertionError("%s was not raised" % excClass)
diff --git a/tests/getch.py b/tests/getch.py
index cb20bef..41e3224 100755
--- a/tests/getch.py
+++ b/tests/getch.py
@@ -33,7 +33,7 @@ def main():
val = ord(stdin.read(1))
except KeyboardInterrupt:
val = 3
- sys.stdout.write('%d\r\n' % (val,))
+ sys.stdout.write('%d<STOP>\r\n' % (val,))
if val == 0:
# StopIteration equivalent is ctrl+' ' (\x00, NUL)
break
diff --git a/tests/test_ctrl_chars.py b/tests/test_ctrl_chars.py
index 7b87840..9c7b869 100755
--- a/tests/test_ctrl_chars.py
+++ b/tests/test_ctrl_chars.py
@@ -37,82 +37,82 @@ class TestCtrlChars(PexpectTestCase.PexpectTestCase):
def test_control_chars(self):
'''This tests that we can send all 256 8-bit characters to a child
process.'''
- child = pexpect.spawn('python getch.py')
- child.expect('READY', timeout=5)
- try:
- for i in range(1,256):
- child.send(byte(i))
- child.expect ('%d\r\n' % (i,))
- # This needs to be last, as getch.py exits on \x00
- child.send(byte(0))
- child.expect('0\r\n')
- child.expect(pexpect.EOF)
- except Exception:
- err = sys.exc_info()[1]
- msg = "Did not echo character value: " + str(i) + "\n"
- msg = msg + str(err)
- self.fail(msg)
+ child = pexpect.spawn('python getch.py', echo=False, timeout=5)
+ child.expect('READY')
+ for i in range(1, 256):
+ child.send(byte(i))
+ child.expect ('%d<STOP>' % (i,))
+
+ # This needs to be last, as getch.py exits on \x00
+ child.send(byte(0))
+ child.expect('0<STOP>')
+ child.expect(pexpect.EOF)
+ assert not child.isalive()
+ assert child.exitstatus == 0
def test_sendintr (self):
- try:
- child = pexpect.spawn('python getch.py')
- child.expect('READY', timeout=5)
- child.sendintr()
- child.expect ('3\r\n')
- except Exception:
- err = sys.exc_info()[1]
- self.fail("Did not echo character value: 3, %s\n%s\n%s" % (
- str(err), child.before, child.after,))
+ child = pexpect.spawn('python getch.py', echo=False, timeout=5)
+ child.expect('READY')
+ child.sendintr()
+ child.expect(str(child._INTR) + '<STOP>')
+
+ child.send(byte(0))
+ child.expect('0<STOP>')
+ child.expect(pexpect.EOF)
+ assert not child.isalive()
+ assert child.exitstatus == 0
+
+ def test_sendeof(self):
+ child = pexpect.spawn('python getch.py', echo=False, timeout=5)
+ child.expect('READY')
+ child.sendeof()
+ child.expect(str(child._EOF) + '<STOP>')
+
+ child.send(byte(0))
+ child.expect('0<STOP>')
+ child.expect(pexpect.EOF)
+ assert not child.isalive()
+ assert child.exitstatus == 0
def test_bad_sendcontrol_chars (self):
'''This tests that sendcontrol will return 0 for an unknown char. '''
- child = pexpect.spawn('python getch.py')
- retval = child.sendcontrol('1')
- assert retval == 0, "sendcontrol() should have returned 0 because there is no such thing as ctrl-1."
+ child = pexpect.spawn('python getch.py', echo=False, timeout=5)
+ child.expect('READY')
+ assert 0 == child.sendcontrol('1')
def test_sendcontrol(self):
'''This tests that we can send all special control codes by name.
'''
- child = pexpect.spawn('python getch.py')
- # On slow machines, like Travis, the process is not ready in time to
- # catch the first character unless we wait for it.
- child.expect('READY', timeout=5)
- child.delaybeforesend = 0.05
+ child = pexpect.spawn('python getch.py', echo=False, timeout=5)
+ child.expect('READY')
for ctrl in 'abcdefghijklmnopqrstuvwxyz':
assert child.sendcontrol(ctrl) == 1
val = ord(ctrl) - ord('a') + 1
- try:
- child.expect_exact(str(val)+'\r\n', timeout=2)
- except:
- print(ctrl)
- raise
+ child.expect_exact(str(val)+'<STOP>')
# escape character
assert child.sendcontrol('[') == 1
- child.expect ('27\r\n')
+ child.expect('27<STOP>')
assert child.sendcontrol('\\') == 1
- child.expect ('28\r\n')
+ child.expect('28<STOP>')
# telnet escape character
assert child.sendcontrol(']') == 1
- child.expect ('29\r\n')
+ child.expect('29<STOP>')
assert child.sendcontrol('^') == 1
- child.expect ('30\r\n')
+ child.expect('30<STOP>')
# irc protocol uses this to underline ...
assert child.sendcontrol('_') == 1
- child.expect ('31\r\n')
+ child.expect('31<STOP>')
# the real "backspace is delete"
assert child.sendcontrol('?') == 1
- child.expect ('127\r\n')
+ child.expect('127<STOP>')
+
# NUL, same as ctrl + ' '
assert child.sendcontrol('@') == 1
- child.expect ('0\r\n')
- # 0 is sentinel value to getch.py, assert exit:
- # causes child to exit, but, if immediately tested,
- # isalive() still returns True unless an artifical timer
- # is used.
- time.sleep(0.5)
- assert child.isalive() == False, child.isalive()
+ child.expect('0<STOP>')
+ child.expect(pexpect.EOF)
+ assert not child.isalive()
assert child.exitstatus == 0
if __name__ == '__main__':
diff --git a/tests/test_expect.py b/tests/test_expect.py
index 4e99aad..8ccb9c5 100755
--- a/tests/test_expect.py
+++ b/tests/test_expect.py
@@ -18,22 +18,21 @@ PEXPECT LICENSE
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
'''
-import pexpect
import unittest
import subprocess
import time
+import signal
+import sys
+
+import pexpect
from . import PexpectTestCase
from .utils import no_coverage_env
-import signal
# Many of these test cases blindly assume that sequential directory
# listings of the /bin directory will yield the same results.
# This may not be true, but seems adequate for testing now.
# I should fix this at some point.
-# query: For some reason an extra newline occures under OS X evey
-# once in a while. Excessive uses of .replace resolve these
-
FILTER=''.join([(len(repr(chr(x)))==3) and chr(x) or '.' for x in range(256)])
def hex_dump(src, length=16):
result=[]
@@ -51,32 +50,10 @@ def hex_diff(left, right):
return '\n' + '\n'.join(diff,)
-class assert_raises_msg(object):
- def __init__(self, errtype, msgpart):
- self.errtype = errtype
- self.msgpart = msgpart
-
- def __enter__(self):
- pass
-
- def __exit__(self, etype, value, traceback):
- if value is None:
- raise AssertionError('Expected %s, but no exception was raised' \
- % self.errtype)
- if not isinstance(value, self.errtype):
- raise AssertionError('Expected %s, but %s was raised' \
- % (self.errtype, etype))
-
- errstr = str(value)
- if self.msgpart not in errstr:
- raise AssertionError('%r was not in %r' % (self.msgpart, errstr))
-
- return True
-
class ExpectTestCase (PexpectTestCase.PexpectTestCase):
def test_expect_basic (self):
- p = pexpect.spawn('cat')
+ p = pexpect.spawn('cat', echo=False, timeout=5)
p.sendline (b'Hello')
p.sendline (b'there')
p.sendline (b'Mr. Python')
@@ -87,7 +64,7 @@ class ExpectTestCase (PexpectTestCase.PexpectTestCase):
p.expect (pexpect.EOF)
def test_expect_exact_basic (self):
- p = pexpect.spawn('cat')
+ p = pexpect.spawn('cat', echo=False, timeout=5)
p.sendline (b'Hello')
p.sendline (b'there')
p.sendline (b'Mr. Python')
@@ -101,7 +78,7 @@ class ExpectTestCase (PexpectTestCase.PexpectTestCase):
'''This test that the ignorecase flag will match patterns
even if case is different using the regex (?i) directive.
'''
- p = pexpect.spawn('cat')
+ p = pexpect.spawn('cat', echo=False, timeout=5)
p.sendline (b'HELLO')
p.sendline (b'there')
p.expect (b'(?i)hello')
@@ -113,7 +90,7 @@ class ExpectTestCase (PexpectTestCase.PexpectTestCase):
'''This test that the ignorecase flag will match patterns
even if case is different using the ignorecase flag.
'''
- p = pexpect.spawn('cat')
+ p = pexpect.spawn('cat', echo=False, timeout=5)
p.ignorecase = True
p.sendline (b'HELLO')
p.sendline (b'there')
@@ -129,22 +106,17 @@ class ExpectTestCase (PexpectTestCase.PexpectTestCase):
(one of the) the leftmost matches in the input? -- grahn)
... agreed! -jquast, the buffer ptr isn't forwarded on match, see first two test cases
'''
- p = pexpect.spawn('cat')
+ p = pexpect.spawn('cat', echo=False, timeout=5)
self._expect_order(p)
def test_expect_order_exact (self):
'''Like test_expect_order(), but using expect_exact().
'''
- p = pexpect.spawn('cat')
+ p = pexpect.spawn('cat', echo=False, timeout=5)
p.expect = p.expect_exact
self._expect_order(p)
def _expect_order (self, p):
- # Disable echo so that the output we see is in an entirely predictable
- # order
- p.setecho(False)
- p.waitnoecho()
-
p.sendline (b'1234')
p.sendline (b'abcd')
p.sendline (b'wxyz')
@@ -187,7 +159,45 @@ class ExpectTestCase (PexpectTestCase.PexpectTestCase):
pexpect.EOF])
assert index == 3, (index, p.before, p.after)
- def test_waitnoecho (self):
+ def test_expect_setecho_off(self):
+ '''This tests that echo may be toggled off.
+ '''
+ p = pexpect.spawn('cat', echo=True, timeout=5)
+ try:
+ self._expect_echo_toggle(p)
+ except IOError:
+ if sys.platform.lower().startswith('sunos'):
+ if hasattr(unittest, 'SkipTest'):
+ raise unittest.SkipTest("Not supported on this platform.")
+ return 'skip'
+ raise
+
+ def test_expect_setecho_off_exact(self):
+ p = pexpect.spawn('cat', echo=True, timeout=5)
+ p.expect = p.expect_exact
+ try:
+ self._expect_echo_toggle(p)
+ except IOError:
+ if sys.platform.lower().startswith('sunos'):
+ if hasattr(unittest, 'SkipTest'):
+ raise unittest.SkipTest("Not supported on this platform.")
+ return 'skip'
+ raise
+
+ def test_waitnoecho(self):
+ " Tests setecho(False) followed by waitnoecho() "
+ p = pexpect.spawn('cat', echo=False, timeout=5)
+ try:
+ p.setecho(False)
+ p.waitnoecho()
+ except IOError:
+ if sys.platform.lower().startswith('sunos'):
+ if hasattr(unittest, 'SkipTest'):
+ raise unittest.SkipTest("Not supported on this platform.")
+ return 'skip'
+ raise
+
+ def test_waitnoecho_order(self):
''' This tests that we can wait on a child process to set echo mode.
For example, this tests that we could wait for SSH to set ECHO False
@@ -196,7 +206,16 @@ class ExpectTestCase (PexpectTestCase.PexpectTestCase):
p1 = pexpect.spawn('%s echo_wait.py' % self.PYTHONBIN)
start = time.time()
- p1.waitnoecho(timeout=10)
+ try:
+ p1.waitnoecho(timeout=10)
+ except IOError:
+ if sys.platform.lower().startswith('sunos'):
+ if hasattr(unittest, 'SkipTest'):
+ raise unittest.SkipTest("Not supported on this platform.")
+ return 'skip'
+ raise
+
+
end_time = time.time() - start
assert end_time < 10 and end_time > 2, "waitnoecho did not set ECHO off in the expected window of time."
@@ -216,15 +235,15 @@ class ExpectTestCase (PexpectTestCase.PexpectTestCase):
assert end_time < 10, "waitnoecho did not set ECHO off in the expected window of time."
def test_expect_echo (self):
- '''This tests that echo can be turned on and off.
+ '''This tests that echo is on by default.
'''
- p = pexpect.spawn('cat', timeout=10)
+ p = pexpect.spawn('cat', echo=True, timeout=5)
self._expect_echo(p)
def test_expect_echo_exact (self):
'''Like test_expect_echo(), but using expect_exact().
'''
- p = pexpect.spawn('cat', timeout=10)
+ p = pexpect.spawn('cat', echo=True, timeout=5)
p.expect = p.expect_exact
self._expect_echo(p)
@@ -243,7 +262,24 @@ class ExpectTestCase (PexpectTestCase.PexpectTestCase):
b'wxyz',
pexpect.EOF])
assert index == 0, "index="+str(index)
+
+ def _expect_echo_toggle(self, p):
+ p.sendline (b'1234') # Should see this twice (once from tty echo and again from cat).
+ index = p.expect ([
+ b'1234',
+ b'abcd',
+ b'wxyz',
+ pexpect.EOF,
+ pexpect.TIMEOUT])
+ assert index == 0, "index="+str(index)+"\n"+p.before
+ index = p.expect ([
+ b'1234',
+ b'abcd',
+ b'wxyz',
+ pexpect.EOF])
+ assert index == 0, "index="+str(index)
p.setecho(0) # Turn off tty echo
+ p.waitnoecho()
p.sendline (b'abcd') # Now, should only see this once.
p.sendline (b'wxyz') # Should also be only once.
index = p.expect ([
@@ -271,32 +307,32 @@ class ExpectTestCase (PexpectTestCase.PexpectTestCase):
'''This tests that mixed list of regex strings, TIMEOUT, and EOF all
return the correct index when matched.
'''
- p = pexpect.spawn('cat')
+ p = pexpect.spawn('cat', echo=False, timeout=5)
self._expect_index(p)
def test_expect_index_exact (self):
'''Like test_expect_index(), but using expect_exact().
'''
- p = pexpect.spawn('cat')
+ p = pexpect.spawn('cat', echo=False, timeout=5)
p.expect = p.expect_exact
self._expect_index(p)
def _expect_index (self, p):
- p.setecho(0)
p.sendline (b'1234')
index = p.expect ([b'abcd',b'wxyz',b'1234',pexpect.EOF])
assert index == 2, "index="+str(index)
p.sendline (b'abcd')
index = p.expect ([pexpect.TIMEOUT,b'abcd',b'wxyz',b'1234',pexpect.EOF])
- assert index == 1, "index="+str(index)
+ assert index == 1, "index="+str(index)+str(p)
p.sendline (b'wxyz')
- index = p.expect ([b'54321',pexpect.TIMEOUT,b'abcd',b'wxyz',b'1234',pexpect.EOF], timeout=5)
+ index = p.expect ([b'54321',pexpect.TIMEOUT,b'abcd',b'wxyz',b'1234',pexpect.EOF])
assert index == 3, "index="+str(index) # Expect 'wxyz'
p.sendline (b'$*!@?')
- index = p.expect ([b'54321',pexpect.TIMEOUT,b'abcd',b'wxyz',b'1234',pexpect.EOF], timeout=5)
+ index = p.expect ([b'54321',pexpect.TIMEOUT,b'abcd',b'wxyz',b'1234',pexpect.EOF],
+ timeout=1)
assert index == 1, "index="+str(index) # Expect TIMEOUT
p.sendeof ()
- index = p.expect ([b'54321',pexpect.TIMEOUT,b'abcd',b'wxyz',b'1234',pexpect.EOF], timeout=5)
+ index = p.expect ([b'54321',pexpect.TIMEOUT,b'abcd',b'wxyz',b'1234',pexpect.EOF])
assert index == 5, "index="+str(index) # Expect EOF
def test_expect (self):
@@ -471,14 +507,13 @@ class ExpectTestCase (PexpectTestCase.PexpectTestCase):
def test_bad_arg(self):
p = pexpect.spawn('cat')
- with assert_raises_msg(TypeError, 'must be one of'):
+ with self.assertRaisesRegexp(TypeError, '.*must be one of'):
p.expect(1)
- with assert_raises_msg(TypeError, 'must be one of'):
+ with self.assertRaisesRegexp(TypeError, '.*must be one of'):
p.expect([1, b'2'])
-
- with assert_raises_msg(TypeError, 'must be one of'):
+ with self.assertRaisesRegexp(TypeError, '.*must be one of'):
p.expect_exact(1)
- with assert_raises_msg(TypeError, 'must be one of'):
+ with self.assertRaisesRegexp(TypeError, '.*must be one of'):
p.expect_exact([1, b'2'])
def test_timeout_none(self):
@@ -493,7 +528,7 @@ class ExpectTestCase (PexpectTestCase.PexpectTestCase):
SIGWINCH generated when a window is resized), but in this test, we
are substituting an ALARM signal as this is much easier for testing
and is treated the same as a SIGWINCH.
-
+
To ensure that the alarm fires during the expect call, we are
setting the signal to alarm after 1 second while the spawned process
sleeps for 2 seconds prior to sending the expected output.
@@ -501,21 +536,13 @@ class ExpectTestCase (PexpectTestCase.PexpectTestCase):
def noop(x, y):
pass
signal.signal(signal.SIGALRM, noop)
-
- p1 = pexpect.spawn('%s sleep_for.py 2' % self.PYTHONBIN)
- p1.expect('READY', timeout=10)
+
+ p1 = pexpect.spawn('%s sleep_for.py 2' % self.PYTHONBIN, timeout=5)
+ p1.expect('READY')
signal.alarm(1)
- p1.expect('END', timeout=10)
+ p1.expect('END')
if __name__ == '__main__':
unittest.main()
suite = unittest.makeSuite(ExpectTestCase,'test')
-
-#fout = open('delete_me_1','wb')
-#fout.write(the_old_way)
-#fout.close
-#fout = open('delete_me_2', 'wb')
-#fout.write(the_new_way)
-#fout.close
-
diff --git a/tests/test_isalive.py b/tests/test_isalive.py
index cbb474d..5168a52 100755
--- a/tests/test_isalive.py
+++ b/tests/test_isalive.py
@@ -31,23 +31,16 @@ class IsAliveTestCase(PexpectTestCase.PexpectTestCase):
'''This tests that calling wait on a finished process works as expected.
'''
p = pexpect.spawn('sleep 3')
- if not p.isalive():
- self.fail ('Child process is not alive. It should be.')
- time.sleep(1)
+ assert p.isalive()
p.wait()
- if p.isalive():
- self.fail ('Child process is not dead. It should be.')
+ assert not p.isalive()
+
p = pexpect.spawn('sleep 3')
- if not p.isalive():
- self.fail ('Child process is not alive. It should be.')
+ assert p.isalive()
p.kill(9)
time.sleep(1)
- try:
+ with self.assertRaises(pexpect.ExceptionPexpect):
p.wait()
- except pexpect.ExceptionPexpect:
- pass
- else:
- self.fail ('Should have raised ExceptionPython because you can\'t call wait on a dead process.')
def test_signal_wait(self):
'''Test calling wait with a process terminated by a signal.'''
@@ -55,64 +48,64 @@ class IsAliveTestCase(PexpectTestCase.PexpectTestCase):
return 'SKIP'
p = pexpect.spawn(sys.executable, ['alarm_die.py'])
p.wait()
- assert p.exitstatus is None, p.exitstatus
+ assert p.exitstatus is None
self.assertEqual(p.signalstatus, signal.SIGALRM)
def test_expect_isalive_dead_after_normal_termination (self):
- p = pexpect.spawn('ls')
+ p = pexpect.spawn('ls', timeout=15)
p.expect(pexpect.EOF)
- time.sleep(1) # allow kernel status time to catch up with state.
- if p.isalive():
- self.fail ('Child process is not dead. It should be.')
+ assert not p.isalive()
- def test_expect_isalive_dead_after_SIGINT (self):
+ def test_expect_isalive_dead_after_SIGHUP(self):
+ p = pexpect.spawn('cat', timeout=5, ignore_sighup=False)
+ assert p.isalive()
+ force = False
+ if sys.platform.lower().startswith('sunos'):
+ # On Solaris (SmartOs), and only when executed from cron(1), SIGKILL
+ # is required to end the sub-process. This is done using force=True
+ force = True
+ assert p.terminate(force) == True
+ p.expect(pexpect.EOF)
+ assert not p.isalive()
+
+ def test_expect_isalive_dead_after_SIGINT(self):
p = pexpect.spawn('cat', timeout=5)
- if not p.isalive():
- self.fail ('Child process is not alive. It should be.')
- p.terminate()
- # Solaris is kind of slow.
- # Without this delay then p.expect(...) will not see
- # that the process is dead and it will timeout.
- time.sleep(1)
+ assert p.isalive()
+ force = False
+ if sys.platform.lower().startswith('sunos'):
+ # On Solaris (SmartOs), and only when executed from cron(1), SIGKILL
+ # is required to end the sub-process. This is done using force=True
+ force = True
+ assert p.terminate(force) == True
p.expect(pexpect.EOF)
- if p.isalive():
- self.fail ('Child process is not dead. It should be.')
+ assert not p.isalive()
- def test_expect_isalive_dead_after_SIGKILL (self):
- p = pexpect.spawn('cat', timeout=3)
- if not p.isalive():
- self.fail ('Child process is not alive. It should be.')
+ def test_expect_isalive_dead_after_SIGKILL(self):
+ p = pexpect.spawn('cat', timeout=5)
+ assert p.isalive()
p.kill(9)
- # Solaris is kind of slow.
- # Without this delay then p.expect(...) will not see
- # that the process is dead and it will timeout.
- time.sleep(1)
p.expect(pexpect.EOF)
- if p.isalive():
- self.fail ('Child process is not dead. It should be.')
+ assert not p.isalive()
def test_forced_terminate(self):
p = pexpect.spawn(sys.executable, ['needs_kill.py'])
p.expect('READY')
- res = p.terminate(force=True)
- assert res, res
+ assert p.terminate(force=True) == True
+ p.expect(pexpect.EOF)
+ assert not p.isalive()
### Some platforms allow this. Some reset status after call to waitpid.
+### probably not necessary, isalive() returns early when terminate is False.
def test_expect_isalive_consistent_multiple_calls (self):
'''This tests that multiple calls to isalive() return same value.
'''
-
p = pexpect.spawn('cat')
- if not p.isalive():
- self.fail ('Child process is not alive. It should be.')
- if not p.isalive():
- self.fail ('Second call. Child process is not alive. It should be.')
+ assert p.isalive()
+ assert p.isalive()
p.kill(9)
p.expect(pexpect.EOF)
- if p.isalive():
- self.fail ('Child process is not dead. It should be.')
- if p.isalive():
- self.fail ('Second call. Child process is not dead. It should be.')
+ assert not p.isalive()
+ assert not p.isalive()
if __name__ == '__main__':
unittest.main()
diff --git a/tests/test_misc.py b/tests/test_misc.py
index a052fe5..81cc6ff 100755
--- a/tests/test_misc.py
+++ b/tests/test_misc.py
@@ -25,6 +25,7 @@ import sys
import re
import signal
import time
+import os
# the program cat(1) may display ^D\x08\x08 when \x04 (EOF, Ctrl-D) is sent
_CAT_EOF = b'^D\x08\x08'
@@ -33,7 +34,11 @@ class TestCaseMisc(PexpectTestCase.PexpectTestCase):
def test_isatty (self):
child = pexpect.spawn('cat')
- assert child.isatty(), "Not returning True. Should always be True."
+ if not child.isatty() and sys.platform.lower().startswith('sunos'):
+ if hasattr(unittest, 'SkipTest'):
+ raise unittest.SkipTest("Not supported on this platform.")
+ return 'skip'
+ assert child.isatty()
def test_read (self):
child = pexpect.spawn('cat')
@@ -102,7 +107,7 @@ class TestCaseMisc(PexpectTestCase.PexpectTestCase):
assert (page == b'abc\r\nabc\r\n123\r\n123\r\n' or
page == b'abc\r\n123\r\nabc\r\n123\r\n' or
page == b'abc\r\n123abc\r\n\r\n123\r\n') , \
- "iterator did not work. page=%r"(page,)
+ "iterator did not work. page=%r" % (page,)
def test_readlines(self):
'''Note that on some slow or heavily loaded systems that the lines
@@ -208,34 +213,28 @@ class TestCaseMisc(PexpectTestCase.PexpectTestCase):
else:
self.fail ("child.isalive() should have raised a pexpect.ExceptionPexpect")
child.terminated = 1 # Force back to valid state so __del__ won't complain
+
def test_bad_arguments (self):
'''This tests that we get a graceful error when passing bad arguments.'''
- try:
- p = pexpect.spawn(1)
- except pexpect.ExceptionPexpect:
- pass
- else:
- self.fail ("pexpect.spawn(1) should have raised a pexpect.ExceptionPexpect.")
- try:
- p = pexpect.spawn('ls', '-la') # should really use pexpect.spawn('ls', ['-ls'])
- except TypeError:
- pass
- else:
- self.fail ("pexpect.spawn('ls', '-la') should have raised a TypeError.")
- try:
- p = pexpect.spawn('cat')
+ with self.assertRaises(pexpect.ExceptionPexpect):
+ pexpect.spawn(1)
+
+ with self.assertRaises(TypeError):
+ # should use pexpect.spawn('ls', ['-la'])
+ pexpect.spawn('ls', '-la')
+
+ with self.assertRaises(ValueError):
+ p = pexpect.spawn('cat', timeout=5)
p.close()
p.read_nonblocking(size=1, timeout=3)
- except ValueError:
- pass
- else:
- self.fail ("read_nonblocking on closed spawn object should have raised a ValueError.")
+
def test_isalive(self):
child = pexpect.spawn('cat')
assert child.isalive(), child.isalive()
child.sendeof()
child.expect(pexpect.EOF)
assert not child.isalive(), child.isalive()
+
def test_bad_type_in_expect(self):
child = pexpect.spawn('cat')
try:
diff --git a/tests/test_performance.py b/tests/test_performance.py
index 163e4f2..7be0cf6 100755
--- a/tests/test_performance.py
+++ b/tests/test_performance.py
@@ -31,7 +31,7 @@ class PerformanceTestCase (PexpectTestCase.PexpectTestCase):
'''Testing the performance of expect, with emphasis on wading through long
inputs. '''
-
+
if sys.version_info[0] >= 3:
@staticmethod
def _iter_n(n):
@@ -41,10 +41,10 @@ class PerformanceTestCase (PexpectTestCase.PexpectTestCase):
else:
@staticmethod
def _iter_n(n):
- return 'for n in range(1, %d+1): print(n)' % n
+ return 'for n in range(1, %d+1): print(n)' % n
def plain_range(self, n):
- e = pexpect.spawn('python')
+ e = pexpect.spawn('python', timeout=100)
self.assertEqual(e.expect(b'>>>'), 0)
e.sendline(self._iter_n(n))
self.assertEqual(e.expect(br'\.{3}'), 0)
@@ -52,7 +52,7 @@ class PerformanceTestCase (PexpectTestCase.PexpectTestCase):
self.assertEqual(e.expect([b'inquisition', '%d' % n]), 1)
def window_range(self, n):
- e = pexpect.spawn('python')
+ e = pexpect.spawn('python', timeout=100)
self.assertEqual(e.expect(b'>>>'), 0)
e.sendline(self._iter_n(n))
self.assertEqual(e.expect(r'\.{3}'), 0)
@@ -60,7 +60,7 @@ class PerformanceTestCase (PexpectTestCase.PexpectTestCase):
self.assertEqual(e.expect([b'inquisition', '%d' % n], searchwindowsize=20), 1)
def exact_range(self, n):
- e = pexpect.spawn('python')
+ e = pexpect.spawn('python', timeout=100)
self.assertEqual(e.expect_exact([b'>>>']), 0)
e.sendline(self._iter_n(n))
self.assertEqual(e.expect_exact([b'...']), 0)
@@ -68,7 +68,7 @@ class PerformanceTestCase (PexpectTestCase.PexpectTestCase):
self.assertEqual(e.expect_exact([b'inquisition', '%d' % n],timeout=520), 1)
def ewin_range(self, n):
- e = pexpect.spawn('python')
+ e = pexpect.spawn('python', timeout=100)
self.assertEqual(e.expect_exact([b'>>>']), 0)
e.sendline(self._iter_n(n))
self.assertEqual(e.expect_exact([b'...']), 0)
@@ -76,7 +76,7 @@ class PerformanceTestCase (PexpectTestCase.PexpectTestCase):
self.assertEqual(e.expect_exact([b'inquisition', '%d' % n], searchwindowsize=20), 1)
def faster_range(self, n):
- e = pexpect.spawn('python')
+ e = pexpect.spawn('python', timeout=100)
self.assertEqual(e.expect(b'>>>'), 0)
e.sendline(('list(range(1, %d+1))' % n).encode('ascii'))
self.assertEqual(e.expect([b'inquisition', '%d' % n]), 1)
diff --git a/tests/test_replwrap.py b/tests/test_replwrap.py
index a6ea956..d7aa049 100644
--- a/tests/test_replwrap.py
+++ b/tests/test_replwrap.py
@@ -43,7 +43,7 @@ class REPLWrapTestCase(unittest.TestCase):
self.assertEqual(res.strip().splitlines(), ['1 2', '3 4'])
def test_existing_spawn(self):
- child = pexpect.spawnu("bash", timeout=5)
+ child = pexpect.spawnu("bash", timeout=5, echo=False)
repl = replwrap.REPLWrapper(child, re.compile('[$#]'),
"PS1='{0}' PS2='{1}' "
"PROMPT_COMMAND=''")
@@ -66,7 +66,7 @@ class REPLWrapTestCase(unittest.TestCase):
if platform.python_implementation() == 'PyPy':
raise unittest.SkipTest("This test fails on PyPy because of REPL differences")
- child = pexpect.spawnu('python', timeout=5)
+ child = pexpect.spawnu('python', echo=False, timeout=5)
# prompt_change=None should mean no prompt change
py = replwrap.REPLWrapper(child, replwrap.u(">>> "), prompt_change=None,
continuation_prompt=replwrap.u("... "))
diff --git a/tests/test_unicode.py b/tests/test_unicode.py
index 256057d..f342bf9 100644
--- a/tests/test_unicode.py
+++ b/tests/test_unicode.py
@@ -3,6 +3,8 @@ from __future__ import unicode_literals
import platform
import tempfile
+import sys
+import time
import pexpect
import unittest
@@ -34,33 +36,60 @@ class UnicodeTests(PexpectTestCase.PexpectTestCase):
p.sendeof()
p.expect_exact (pexpect.EOF)
- def test_expect_echo (self):
- '''This tests that echo can be turned on and off.
+ def test_expect_setecho_toggle(self):
+ '''This tests that echo may be toggled off.
'''
- p = pexpect.spawnu('cat', timeout=10)
- self._expect_echo(p)
+ p = pexpect.spawnu('cat', timeout=5)
+ try:
+ self._expect_echo_toggle_off(p)
+ except IOError:
+ if sys.platform.lower().startswith('sunos'):
+ if hasattr(unittest, 'SkipTest'):
+ raise unittest.SkipTest("Not supported on this platform.")
+ return 'skip'
+ raise
+ self._expect_echo_toggle_on(p)
def test_expect_echo_exact (self):
'''Like test_expect_echo(), but using expect_exact().
'''
- p = pexpect.spawnu('cat', timeout=10)
+ p = pexpect.spawnu('cat', timeout=5)
p.expect = p.expect_exact
self._expect_echo(p)
+ def test_expect_setecho_toggle_exact(self):
+ p = pexpect.spawnu('cat', timeout=5)
+ p.expect = p.expect_exact
+ try:
+ self._expect_echo_toggle_off(p)
+ except IOError:
+ if sys.platform.lower().startswith('sunos'):
+ if hasattr(unittest, 'SkipTest'):
+ raise unittest.SkipTest("Not supported on this platform.")
+ return 'skip'
+ raise
+ self._expect_echo_toggle_on(p)
+
def _expect_echo (self, p):
p.sendline('1234') # Should see this twice (once from tty echo and again from cat).
index = p.expect (['1234', 'abcdé', 'wxyz', pexpect.EOF, pexpect.TIMEOUT])
assert index == 0, (index, p.before)
index = p.expect (['1234', 'abcdé', 'wxyz', pexpect.EOF])
assert index == 0, index
+
+ def _expect_echo_toggle_off(self, p):
p.setecho(0) # Turn off tty echo
+ p.waitnoecho()
p.sendline('abcdé') # Now, should only see this once.
p.sendline('wxyz') # Should also be only once.
index = p.expect ([pexpect.EOF,pexpect.TIMEOUT, 'abcdé', 'wxyz', '1234'])
assert index == 2, index
index = p.expect ([pexpect.EOF, 'abcdé', 'wxyz', '7890'])
assert index == 2, index
+
+ def _expect_echo_toggle_on(self, p):
p.setecho(1) # Turn on tty echo
+ time.sleep(0.2) # there is no waitecho() !
p.sendline('7890') # Should see this twice.
index = p.expect ([pexpect.EOF, 'abcdé', 'wxyz', '7890'])
assert index == 3, index