summaryrefslogtreecommitdiff
path: root/tests
diff options
context:
space:
mode:
authorJeff Quast <contact@jeffquast.com>2015-04-26 21:51:20 -0700
committerJeff Quast <contact@jeffquast.com>2015-04-26 21:51:20 -0700
commit7f046a6cf86d8f60a6cf23c40ef625e5acbc1a32 (patch)
treeabde38287c6d4d38589d448fab6f53b4561fa7c4 /tests
parentbdfaaee26d2fb9f4bf0891918e6a6039eaf3a4b6 (diff)
parent82d4937b73a2fc49824e1f60fa0e036731a03135 (diff)
downloadpexpect-git-7f046a6cf86d8f60a6cf23c40ef625e5acbc1a32.tar.gz
Merge remote-tracking branch 'origin/master' into document-blocking-write
Diffstat (limited to 'tests')
-rw-r--r--tests/PexpectTestCase.py48
-rw-r--r--tests/README18
-rwxr-xr-xtests/test_ansi.py78
-rw-r--r--tests/test_async.py51
-rwxr-xr-xtests/test_constructor.py10
-rwxr-xr-xtests/test_ctrl_chars.py7
-rwxr-xr-xtests/test_expect.py37
-rwxr-xr-xtests/test_interact.py18
-rw-r--r--tests/test_maxcanon.py176
-rwxr-xr-xtests/test_misc.py67
-rw-r--r--tests/test_replwrap.py2
-rw-r--r--tests/test_repr.py26
-rwxr-xr-xtests/test_run.py150
-rwxr-xr-xtests/test_screen.py124
-rw-r--r--tests/test_which.py109
15 files changed, 795 insertions, 126 deletions
diff --git a/tests/PexpectTestCase.py b/tests/PexpectTestCase.py
index 7a9574e..307437e 100644
--- a/tests/PexpectTestCase.py
+++ b/tests/PexpectTestCase.py
@@ -22,26 +22,68 @@ from __future__ import print_function
import contextlib
import unittest
+import signal
import sys
import os
+
class PexpectTestCase(unittest.TestCase):
def setUp(self):
self.PYTHONBIN = sys.executable
self.original_path = os.getcwd()
tests_dir = os.path.dirname(__file__)
self.project_dir = project_dir = os.path.dirname(tests_dir)
+
+ # all tests are executed in this folder; there are many auxiliary
+ # programs in this folder executed by spawn().
os.chdir(tests_dir)
- os.environ['COVERAGE_PROCESS_START'] = os.path.join(project_dir, '.coveragerc')
+
+ # If the pexpect raises an exception after fork(), but before
+ # exec(), our test runner *also* forks. We prevent this by
+ # storing our pid and asserting equality on tearDown.
+ self.pid = os.getpid()
+
+ coverage_rc = os.path.join(project_dir, '.coveragerc')
+ os.environ['COVERAGE_PROCESS_START'] = coverage_rc
os.environ['COVERAGE_FILE'] = os.path.join(project_dir, '.coverage')
print('\n', self.id(), end=' ')
sys.stdout.flush()
+
+ # some build agents will ignore SIGHUP and SIGINT, which python
+ # inherits. This causes some of the tests related to terminate()
+ # to fail. We set them to the default handlers that they should
+ # be, and restore them back to their SIG_IGN value on tearDown.
+ #
+ # I'm not entirely convinced they need to be restored, only our
+ # test runner is affected.
+ self.restore_ignored_signals = [
+ value for value in (signal.SIGHUP, signal.SIGINT,)
+ if signal.getsignal(value) == signal.SIG_IGN]
+ if signal.SIGHUP in self.restore_ignored_signals:
+ # sighup should be set to default handler
+ signal.signal(signal.SIGHUP, signal.SIG_DFL)
+ if signal.SIGINT in self.restore_ignored_signals:
+ # SIGINT should be set to signal.default_int_handler
+ signal.signal(signal.SIGINT, signal.default_int_handler)
unittest.TestCase.setUp(self)
def tearDown(self):
- os.chdir (self.original_path)
+ # restore original working folder
+ os.chdir(self.original_path)
+
+ if self.pid != os.getpid():
+ # The build server pattern-matches phrase 'Test runner has forked!'
+ print("Test runner has forked! This means a child process raised "
+ "an exception before exec() in a test case, the error is "
+ "more than likely found above this line in stderr.",
+ file=sys.stderr)
+ exit(1)
+
+ # restore signal handlers
+ for signal_value in self.restore_ignored_signals:
+ signal.signal(signal_value, signal.SIG_IGN)
- if sys.version_info < (2,7):
+ if sys.version_info < (2, 7):
# We want to use these methods, which are new/improved in 2.7, but
# we are still supporting 2.6 for the moment. This section can be
# removed when we drop Python 2.6 support.
diff --git a/tests/README b/tests/README
index 295632b..ef5b613 100644
--- a/tests/README
+++ b/tests/README
@@ -1,18 +1,8 @@
-The best way to run these tests is from the directory above this one. Source
-the test.env environment file. This will make sure that you are using the
-correct pexpect.py file otherwise Python might try to import a different
-version if it is already installed in this environment. Then run the testall.py
-script in the tools/ directory. This script will automatically build a test
-suite from all the test scripts in the tests/ directory. This allows you to add
-new test scripts simply by dropping them in the tests/ directory. You don't
-have to register the test or do anything else to integrate it into the test
-suite.
+The best way to run these tests is from the directory above this one. Run:
-For example, this is the normal set of commands you would use to run all tests
-in the tests/ directory:
+ py.test
- $ cd /home/user/pexpect_dev/
- $ . test.env
- $ ./tools/testall.py
+To run a specific test file:
+ py.test tests/test_constructor.py
diff --git a/tests/test_ansi.py b/tests/test_ansi.py
index 3b8d6a9..a9d445e 100755
--- a/tests/test_ansi.py
+++ b/tests/test_ansi.py
@@ -21,6 +21,9 @@ PEXPECT LICENSE
from pexpect import ANSI
import unittest
from . import PexpectTestCase
+import sys
+
+PY3 = (sys.version_info[0] >= 3)
write_target = 'I\'ve got a ferret sticking up my nose. \n' +\
'(He\'s got a ferret sticking up his nose.) \n' +\
@@ -142,10 +145,81 @@ class ansiTestCase (PexpectTestCase.PexpectTestCase):
def test_number_x(self):
"""Test the FSM state used to handle more than 2 numeric parameters."""
- s = ANSI.ANSI(1, 20)
+ class TestANSI(ANSI.ANSI):
+ captured_memory = None
+ def do_sgr(self, fsm):
+ assert self.captured_memory is None
+ self.captured_memory = fsm.memory
+
+ s = TestANSI(1, 20)
s.write('\x1b[0;1;32;45mtest')
assert str(s) == ('test ')
- assert(s.state.memory == [s, '0', '1', '32', '45'])
+ assert s.captured_memory is not None
+ assert s.captured_memory == [s, '0', '1', '32', '45']
+
+ def test_fsm_memory(self):
+ """Test the FSM stack/memory does not have numbers left on it
+ after some sequences with numbers are passed in."""
+ s = ANSI.ANSI(1, 20)
+ s.write('\x1b[0;1;2;3m\x1b[4;5;6;7q\x1b[?8h\x1b[?9ltest')
+ assert str(s) == ('test ')
+ assert s.state.memory == [s]
+
+ def test_utf8_bytes(self):
+ """Test that when bytes are passed in containing UTF-8 encoded
+ characters, where the encoding of each character consists of
+ multiple bytes, the characters are correctly decoded.
+ Incremental decoding is also tested."""
+ s = ANSI.ANSI(2, 10, encoding='utf-8')
+ # This is the UTF-8 encoding of the UCS character "HOURGLASS"
+ # followed by the UTF-8 encoding of the UCS character
+ # "KEYBOARD". These characters can't be encoded in cp437 or
+ # latin-1. The "KEYBOARD" character is split into two
+ # separate writes.
+ s.write(b'\xe2\x8c\x9b')
+ s.write(b'\xe2\x8c')
+ s.write(b'\xa8')
+ if PY3:
+ assert str(s) == u'\u231b\u2328 \n '
+ else:
+ assert unicode(s) == u'\u231b\u2328 \n '
+ assert str(s) == b'\xe2\x8c\x9b\xe2\x8c\xa8 \n '
+ assert s.dump() == u'\u231b\u2328 '
+ assert s.pretty() == u'+----------+\n|\u231b\u2328 |\n| |\n+----------+\n'
+ assert s.get_abs(1, 1) == u'\u231b'
+ assert s.get_region(1, 1, 1, 5) == [u'\u231b\u2328 ']
+
+ def test_unicode(self):
+ """Test passing in of a unicode string."""
+ s = ANSI.ANSI(2, 10, encoding="utf-8")
+ s.write(u'\u231b\u2328')
+ if PY3:
+ assert str(s) == u'\u231b\u2328 \n '
+ else:
+ assert unicode(s) == u'\u231b\u2328 \n '
+ assert str(s) == b'\xe2\x8c\x9b\xe2\x8c\xa8 \n '
+ assert s.dump() == u'\u231b\u2328 '
+ assert s.pretty() == u'+----------+\n|\u231b\u2328 |\n| |\n+----------+\n'
+ assert s.get_abs(1, 1) == u'\u231b'
+ assert s.get_region(1, 1, 1, 5) == [u'\u231b\u2328 ']
+
+ def test_decode_error(self):
+ """Test that default handling of decode errors replaces the
+ invalid characters."""
+ s = ANSI.ANSI(2, 10, encoding="ascii")
+ s.write(b'\xff') # a non-ASCII character
+ # In unicode, the non-ASCII character is replaced with
+ # REPLACEMENT CHARACTER.
+ if PY3:
+ assert str(s) == u'\ufffd \n '
+ else:
+ assert unicode(s) == u'\ufffd \n '
+ assert str(s) == b'? \n '
+ assert s.dump() == u'\ufffd '
+ assert s.pretty() == u'+----------+\n|\ufffd |\n| |\n+----------+\n'
+ assert s.get_abs(1, 1) == u'\ufffd'
+ assert s.get_region(1, 1, 1, 5) == [u'\ufffd ']
+
if __name__ == '__main__':
unittest.main()
diff --git a/tests/test_async.py b/tests/test_async.py
new file mode 100644
index 0000000..ce75572
--- /dev/null
+++ b/tests/test_async.py
@@ -0,0 +1,51 @@
+try:
+ import asyncio
+except ImportError:
+ asyncio = None
+
+import sys
+import unittest
+
+import pexpect
+from .PexpectTestCase import PexpectTestCase
+
+def run(coro):
+ return asyncio.get_event_loop().run_until_complete(coro)
+
+@unittest.skipIf(asyncio is None, "Requires asyncio")
+class AsyncTests(PexpectTestCase):
+ def test_simple_expect(self):
+ p = pexpect.spawn('cat')
+ p.sendline('Hello asyncio')
+ coro = p.expect(['Hello', pexpect.EOF] , async=True)
+ assert run(coro) == 0
+ print('Done')
+
+ def test_timeout(self):
+ p = pexpect.spawn('cat')
+ coro = p.expect('foo', timeout=1, async=True)
+ with self.assertRaises(pexpect.TIMEOUT):
+ run(coro)
+
+ p = pexpect.spawn('cat')
+ coro = p.expect(['foo', pexpect.TIMEOUT], timeout=1, async=True)
+ assert run(coro) == 1
+
+ def test_eof(self):
+ p = pexpect.spawn('cat')
+ p.sendline('Hi')
+ coro = p.expect(pexpect.EOF, async=True)
+ p.sendeof()
+ assert run(coro) == 0
+
+ p = pexpect.spawn('cat')
+ p.sendeof()
+ coro = p.expect('Blah', async=True)
+ with self.assertRaises(pexpect.EOF):
+ run(coro)
+
+ def test_expect_exact(self):
+ p = pexpect.spawn('%s list100.py' % sys.executable)
+ assert run(p.expect_exact(b'5', async=True)) == 0
+ assert run(p.expect_exact(['wpeok', b'11'], async=True)) == 1
+ assert run(p.expect_exact([b'foo', pexpect.EOF], async=True)) == 1
diff --git a/tests/test_constructor.py b/tests/test_constructor.py
index 60525a0..98c473a 100755
--- a/tests/test_constructor.py
+++ b/tests/test_constructor.py
@@ -28,11 +28,11 @@ class TestCaseConstructor(PexpectTestCase.PexpectTestCase):
the same results for different styles of invoking __init__().
This assumes that the root directory / is static during the test.
'''
- p1 = pexpect.spawn('/bin/ls -l /bin')
- p2 = pexpect.spawn('/bin/ls' ,['-l', '/bin'])
- p1.expect (pexpect.EOF)
- p2.expect (pexpect.EOF)
- assert (p1.before == p2.before)
+ p1 = pexpect.spawn('uname -m -n -p -r -s -v')
+ p2 = pexpect.spawn('uname', ['-m', '-n', '-p', '-r', '-s', '-v'])
+ p1.expect(pexpect.EOF)
+ p2.expect(pexpect.EOF)
+ assert p1.before == p2.before
def test_named_parameters (self):
'''This tests that named parameters work.
diff --git a/tests/test_ctrl_chars.py b/tests/test_ctrl_chars.py
index 9c7b869..10d03db 100755
--- a/tests/test_ctrl_chars.py
+++ b/tests/test_ctrl_chars.py
@@ -26,6 +26,9 @@ from . import PexpectTestCase
import time
import sys
+from ptyprocess import ptyprocess
+ptyprocess._make_eof_intr()
+
if sys.version_info[0] >= 3:
def byte(i):
return bytes([i])
@@ -54,7 +57,7 @@ class TestCtrlChars(PexpectTestCase.PexpectTestCase):
child = pexpect.spawn('python getch.py', echo=False, timeout=5)
child.expect('READY')
child.sendintr()
- child.expect(str(child._INTR) + '<STOP>')
+ child.expect(str(ord(ptyprocess._INTR)) + '<STOP>')
child.send(byte(0))
child.expect('0<STOP>')
@@ -66,7 +69,7 @@ class TestCtrlChars(PexpectTestCase.PexpectTestCase):
child = pexpect.spawn('python getch.py', echo=False, timeout=5)
child.expect('READY')
child.sendeof()
- child.expect(str(child._EOF) + '<STOP>')
+ child.expect(str(ord(ptyprocess._EOF)) + '<STOP>')
child.send(byte(0))
child.expect('0<STOP>')
diff --git a/tests/test_expect.py b/tests/test_expect.py
index 8ccb9c5..3f4c9d8 100755
--- a/tests/test_expect.py
+++ b/tests/test_expect.py
@@ -18,11 +18,13 @@ PEXPECT LICENSE
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
'''
+import multiprocessing
import unittest
import subprocess
import time
import signal
import sys
+import os
import pexpect
from . import PexpectTestCase
@@ -542,7 +544,40 @@ class ExpectTestCase (PexpectTestCase.PexpectTestCase):
signal.alarm(1)
p1.expect('END')
+ def test_stdin_closed(self):
+ '''
+ Ensure pexpect continues to operate even when stdin is closed
+ '''
+ class Closed_stdin_proc(multiprocessing.Process):
+ def run(self):
+ sys.__stdin__.close()
+ cat = pexpect.spawn('cat')
+ cat.sendeof()
+ cat.expect(pexpect.EOF)
+
+ proc = Closed_stdin_proc()
+ proc.start()
+ proc.join()
+ assert proc.exitcode == 0
+
+ def test_stdin_stdout_closed(self):
+ '''
+ Ensure pexpect continues to operate even when stdin and stdout is closed
+ '''
+ class Closed_stdin_stdout_proc(multiprocessing.Process):
+ def run(self):
+ sys.__stdin__.close()
+ sys.__stdout__.close()
+ cat = pexpect.spawn('cat')
+ cat.sendeof()
+ cat.expect(pexpect.EOF)
+
+ proc = Closed_stdin_stdout_proc()
+ proc.start()
+ proc.join()
+ assert proc.exitcode == 0
+
if __name__ == '__main__':
unittest.main()
-suite = unittest.makeSuite(ExpectTestCase,'test')
+suite = unittest.makeSuite(ExpectTestCase, 'test')
diff --git a/tests/test_interact.py b/tests/test_interact.py
index 06fc44a..e635cb0 100755
--- a/tests/test_interact.py
+++ b/tests/test_interact.py
@@ -66,9 +66,12 @@ class InteractTestCase (PexpectTestCase.PexpectTestCase):
p.expect(b'<out>alpha')
p.expect(b'<out>beta')
p.sendeof()
- p.expect_exact('<eof>')
- p.expect_exact('Escaped interact')
- p.expect(pexpect.EOF)
+ # strangely, on travis-ci, sendeof() terminates the subprocess,
+ # it doesn't receive ^D, just immediately throws EOF.
+ idx = p.expect_exact(['<eof>', pexpect.EOF])
+ if idx == 0:
+ p.expect_exact('Escaped interact')
+ p.expect(pexpect.EOF)
assert not p.isalive()
assert p.exitstatus == 0
@@ -81,9 +84,12 @@ class InteractTestCase (PexpectTestCase.PexpectTestCase):
p.expect('<out>ɑlpha')
p.expect('<out>Βeta')
p.sendeof()
- p.expect_exact('<eof>')
- p.expect_exact('Escaped interact')
- p.expect(pexpect.EOF)
+ # strangely, on travis-ci, sendeof() terminates the subprocess,
+ # it doesn't receive ^D, just immediately throws EOF.
+ idx = p.expect_exact(['<eof>', pexpect.EOF])
+ if idx == 0:
+ p.expect_exact('Escaped interact')
+ p.expect(pexpect.EOF)
assert not p.isalive()
assert p.exitstatus == 0
diff --git a/tests/test_maxcanon.py b/tests/test_maxcanon.py
new file mode 100644
index 0000000..772a3b7
--- /dev/null
+++ b/tests/test_maxcanon.py
@@ -0,0 +1,176 @@
+""" Module for canonical-mode tests. """
+# std imports
+import sys
+import os
+
+# local
+import pexpect
+from . import PexpectTestCase
+
+# 3rd-party
+import pytest
+
+
+class TestCaseCanon(PexpectTestCase.PexpectTestCase):
+ """
+ Test expected Canonical mode behavior (limited input line length).
+
+ All systems use the value of MAX_CANON which can be found using
+ fpathconf(3) value PC_MAX_CANON -- with the exception of Linux
+ and FreeBSD.
+
+ Linux, though defining a value of 255, actually honors the value
+ of 4096 from linux kernel include file tty.h definition
+ N_TTY_BUF_SIZE.
+
+ Linux also does not honor IMAXBEL. termios(3) states, "Linux does not
+ implement this bit, and acts as if it is always set." Although these
+ tests ensure it is enabled, this is a non-op for Linux.
+
+ FreeBSD supports neither, and instead uses a fraction (1/5) of the tty
+ speed which is always 9600. Therefor, the maximum limited input line
+ length is 9600 / 5 = 1920.
+
+ These tests only ensure the correctness of the behavior described by
+ the sendline() docstring. pexpect is not particularly involved in
+ these scenarios, though if we wish to expose some kind of interface
+ to tty.setraw, for example, these tests may be re-purposed as such.
+
+ Lastly, portions of these tests are skipped on Travis-CI. It produces
+ unexpected behavior not reproduced on Debian/GNU Linux.
+ """
+
+ def setUp(self):
+ super(TestCaseCanon, self).setUp()
+
+ self.echo = False
+ if sys.platform.lower().startswith('linux'):
+ # linux is 4096, N_TTY_BUF_SIZE.
+ self.max_input = 4096
+ self.echo = True
+ elif sys.platform.lower().startswith('sunos'):
+ # SunOS allows PC_MAX_CANON + 1; see
+ # https://bitbucket.org/illumos/illumos-gate/src/d07a59219ab7fd2a7f39eb47c46cf083c88e932f/usr/src/uts/common/io/ldterm.c?at=default#cl-1888
+ self.max_input = os.fpathconf(0, 'PC_MAX_CANON') + 1
+ elif sys.platform.lower().startswith('freebsd'):
+ # http://lists.freebsd.org/pipermail/freebsd-stable/2009-October/052318.html
+ self.max_input = 9600 / 5
+ else:
+ # All others (probably) limit exactly at PC_MAX_CANON
+ self.max_input = os.fpathconf(0, 'PC_MAX_CANON')
+
+ @pytest.mark.skipif(
+ sys.platform.lower().startswith('freebsd'),
+ reason='os.write to BLOCK indefinitely on FreeBSD in this case'
+ )
+ def test_under_max_canon(self):
+ " BEL is not sent by terminal driver at maximum bytes - 1. "
+ # given,
+ child = pexpect.spawn('bash', echo=self.echo, timeout=5)
+ child.sendline('echo READY')
+ child.sendline('stty icanon imaxbel')
+ child.sendline('echo BEGIN; cat')
+
+ # some systems BEL on (maximum - 1), not able to receive CR,
+ # even though all characters up until then were received, they
+ # simply cannot be transmitted, as CR is part of the transmission.
+ send_bytes = self.max_input - 1
+
+ # exercise,
+ child.sendline('_' * send_bytes)
+
+ # fast forward beyond 'cat' command, as ^G can be found as part of
+ # set-xterm-title sequence of $PROMPT_COMMAND or $PS1.
+ child.expect_exact('BEGIN')
+
+ # verify, all input is found in echo output,
+ child.expect_exact('_' * send_bytes)
+
+ # BEL is not found,
+ with self.assertRaises(pexpect.TIMEOUT):
+ child.expect_exact('\a', timeout=1)
+
+ # cleanup,
+ child.sendeof() # exit cat(1)
+ child.sendline('exit 0') # exit bash(1)
+ child.expect(pexpect.EOF)
+ assert not child.isalive()
+ assert child.exitstatus == 0
+
+ @pytest.mark.skipif(
+ sys.platform.lower().startswith('freebsd'),
+ reason='os.write to BLOCK indefinitely on FreeBSD in this case'
+ )
+ def test_beyond_max_icanon(self):
+ " a single BEL is sent when maximum bytes is reached. "
+ # given,
+ child = pexpect.spawn('bash', echo=self.echo, timeout=5)
+ child.sendline('stty icanon imaxbel erase ^H')
+ child.sendline('cat')
+ send_bytes = self.max_input
+
+ # exercise,
+ child.sendline('_' * send_bytes)
+ child.expect_exact('\a')
+
+ # exercise, we must now backspace to send CR.
+ child.sendcontrol('h')
+ child.sendline()
+
+ if os.environ.get('TRAVIS', None) == 'true':
+ # Travis-CI has intermittent behavior here, possibly
+ # because the master process is itself, a PTY?
+ return
+
+ # verify the length of (maximum - 1) received by cat(1),
+ # which has written it back out,
+ child.expect_exact('_' * (send_bytes - 1))
+ # and not a byte more.
+ with self.assertRaises(pexpect.TIMEOUT):
+ child.expect_exact('_', timeout=1)
+
+ # cleanup,
+ child.sendeof() # exit cat(1)
+ child.sendline('exit 0') # exit bash(1)
+ child.expect_exact(pexpect.EOF)
+ assert not child.isalive()
+ assert child.exitstatus == 0
+
+ @pytest.mark.skipif(
+ sys.platform.lower().startswith('freebsd'),
+ reason='os.write to BLOCK indefinitely on FreeBSD in this case'
+ )
+ def test_max_no_icanon(self):
+ " may exceed maximum input bytes if canonical mode is disabled. "
+ # given,
+ child = pexpect.spawn('bash', echo=self.echo, timeout=5)
+ child.sendline('stty -icanon imaxbel')
+ child.sendline('echo BEGIN; cat')
+ send_bytes = self.max_input + 11
+
+ # exercise,
+ child.sendline('_' * send_bytes)
+
+ # fast forward beyond 'cat' command, as ^G can be found as part of
+ # set-xterm-title sequence of $PROMPT_COMMAND or $PS1.
+ child.expect_exact('BEGIN')
+
+ if os.environ.get('TRAVIS', None) == 'true':
+ # Travis-CI has intermittent behavior here, possibly
+ # because the master process is itself, a PTY?
+ return
+
+ # BEL is *not* found,
+ with self.assertRaises(pexpect.TIMEOUT):
+ child.expect_exact('\a', timeout=1)
+
+ # verify, all input is found in output,
+ child.expect_exact('_' * send_bytes)
+
+ # cleanup,
+ child.sendcontrol('c') # exit cat(1) (eof wont work in -icanon)
+ child.sendcontrol('c')
+ child.sendline('exit 0') # exit bash(1)
+ child.expect(pexpect.EOF)
+ assert not child.isalive()
+ assert child.exitstatus == 0
diff --git a/tests/test_misc.py b/tests/test_misc.py
index d9205e4..e439240 100755
--- a/tests/test_misc.py
+++ b/tests/test_misc.py
@@ -178,6 +178,16 @@ class TestCaseMisc(PexpectTestCase.PexpectTestCase):
with self.assertRaises(pexpect.EOF):
child.expect('the unexpected')
+ def test_with(self):
+ "spawn can be used as a context manager"
+ with pexpect.spawn(sys.executable + ' echo_w_prompt.py') as p:
+ p.expect('<in >')
+ p.sendline(b'alpha')
+ p.expect(b'<out>alpha')
+ assert p.isalive()
+
+ assert not p.isalive()
+
def test_terminate(self):
" test force terminate always succeeds (SIGKILL). "
child = pexpect.spawn('cat')
@@ -186,41 +196,24 @@ class TestCaseMisc(PexpectTestCase.PexpectTestCase):
def test_sighup(self):
" validate argument `ignore_sighup=True` and `ignore_sighup=False`. "
- # If a parent process sets an Ignore handler for SIGHUP (as on Fedora's
- # build machines), this test breaks. We temporarily restore the default
- # handler, so the child process will quit. However, we can't simply
- # replace any installed handler, because getsignal returns None for
- # handlers not set in Python code, so we wouldn't be able to restore
- # them.
- if signal.getsignal(signal.SIGHUP) == signal.SIG_IGN:
- signal.signal(signal.SIGHUP, signal.SIG_DFL)
- restore_sig_ign = True
- else:
- restore_sig_ign = False
-
getch = sys.executable + ' getch.py'
- try:
- child = pexpect.spawn(getch, ignore_sighup=True)
- child.expect('READY')
- child.kill(signal.SIGHUP)
- for _ in range(10):
- if not child.isalive():
- self.fail('Child process should not have exited.')
- time.sleep(0.1)
-
- child = pexpect.spawn(getch, ignore_sighup=False)
- child.expect('READY')
- child.kill(signal.SIGHUP)
- for _ in range(10):
- if not child.isalive():
- break
- time.sleep(0.1)
- else:
- self.fail('Child process should have exited.')
-
- finally:
- if restore_sig_ign:
- signal.signal(signal.SIGHUP, signal.SIG_IGN)
+ child = pexpect.spawn(getch, ignore_sighup=True)
+ child.expect('READY')
+ child.kill(signal.SIGHUP)
+ for _ in range(10):
+ if not child.isalive():
+ self.fail('Child process should not have exited.')
+ time.sleep(0.1)
+
+ child = pexpect.spawn(getch, ignore_sighup=False)
+ child.expect('READY')
+ child.kill(signal.SIGHUP)
+ for _ in range(10):
+ if not child.isalive():
+ break
+ time.sleep(0.1)
+ else:
+ self.fail('Child process should have exited.')
def test_bad_child_pid(self):
" assert bad condition error in isalive(). "
@@ -228,7 +221,7 @@ class TestCaseMisc(PexpectTestCase.PexpectTestCase):
child = pexpect.spawn('cat')
child.terminate(force=1)
# Force an invalid state to test isalive
- child.terminated = 0
+ child.ptyproc.terminated = 0
try:
with self.assertRaisesRegexp(pexpect.ExceptionPexpect,
".*" + expect_errmsg):
@@ -361,9 +354,9 @@ class TestCaseMisc(PexpectTestCase.PexpectTestCase):
" test forced self.__fork_pty() and __pty_make_controlling_tty "
# given,
class spawn_ourptyfork(pexpect.spawn):
- def _spawn(self, command, args=[]):
+ def _spawn(self, command, args=[], preexec_fn=None):
self.use_native_pty_fork = False
- pexpect.spawn._spawn(self, command, args)
+ pexpect.spawn._spawn(self, command, args, preexec_fn)
# exercise,
p = spawn_ourptyfork('cat', echo=False)
diff --git a/tests/test_replwrap.py b/tests/test_replwrap.py
index 14f7c39..28c7599 100644
--- a/tests/test_replwrap.py
+++ b/tests/test_replwrap.py
@@ -26,7 +26,7 @@ class REPLWrapTestCase(unittest.TestCase):
assert 'real' in res, res
# PAGER should be set to cat, otherwise man hangs
- res = bash.run_command('man sleep', timeout=2)
+ res = bash.run_command('man sleep', timeout=5)
assert 'SLEEP' in res, res
def test_multiline(self):
diff --git a/tests/test_repr.py b/tests/test_repr.py
new file mode 100644
index 0000000..ce618d4
--- /dev/null
+++ b/tests/test_repr.py
@@ -0,0 +1,26 @@
+""" Test __str__ methods. """
+import pexpect
+
+from . import PexpectTestCase
+
+
+class TestCaseMisc(PexpectTestCase.PexpectTestCase):
+
+ def test_str_spawnu(self):
+ """ Exercise spawnu.__str__() """
+ # given,
+ p = pexpect.spawnu('cat')
+ # exercise,
+ value = str(p)
+ # verify
+ assert isinstance(value, str)
+
+ def test_str_spawn(self):
+ """ Exercise spawn.__str__() """
+ # given,
+ p = pexpect.spawn('cat')
+ # exercise,
+ value = str(p)
+ # verify
+ assert isinstance(value, str)
+
diff --git a/tests/test_run.py b/tests/test_run.py
index 814b70a..1b3c92f 100755
--- a/tests/test_run.py
+++ b/tests/test_run.py
@@ -22,72 +22,170 @@ PEXPECT LICENSE
import pexpect
import unittest
import subprocess
+import tempfile
import sys
+import os
from . import PexpectTestCase
-# TODO Many of these test cases blindly assume that sequential
-# TODO listing of the /bin directory will yield the same results.
-# TODO This may not always be true, but seems adequate for testing for now.
-# TODO I should fix this at some point.
-
unicode_type = str if pexpect.PY3 else unicode
-def timeout_callback (d):
-# print d["event_count"],
- if d["event_count"]>3:
+
+def timeout_callback(values):
+ if values["event_count"] > 3:
return 1
return 0
+
+def function_events_callback(values):
+ try:
+ previous_echoed = (values["child_result_list"][-1]
+ .decode().split("\n")[-2].strip())
+ if previous_echoed.endswith("stage-1"):
+ return "echo stage-2\n"
+ elif previous_echoed.endswith("stage-2"):
+ return "echo stage-3\n"
+ elif previous_echoed.endswith("stage-3"):
+ return "exit\n"
+ else:
+ raise Exception("Unexpected output {0}".format(previous_echoed))
+ except IndexError:
+ return "echo stage-1\n"
+
+
class RunFuncTestCase(PexpectTestCase.PexpectTestCase):
runfunc = staticmethod(pexpect.run)
cr = b'\r'
empty = b''
prep_subprocess_out = staticmethod(lambda x: x)
- def test_run_exit (self):
+ def setUp(self):
+ fd, self.rcfile = tempfile.mkstemp()
+ os.write(fd, b'PS1=GO: \n')
+ os.close(fd)
+ super(RunFuncTestCase, self).setUp()
+
+ def tearDown(self):
+ os.unlink(self.rcfile)
+ super(RunFuncTestCase, self).tearDown()
+
+ def test_run_exit(self):
(data, exitstatus) = self.runfunc('python exit1.py', withexitstatus=1)
assert exitstatus == 1, "Exit status of 'python exit1.py' should be 1."
- def test_run (self):
- the_old_way = subprocess.Popen(args=['ls', '-l', '/bin'],
- stdout=subprocess.PIPE).communicate()[0].rstrip()
- (the_new_way, exitstatus) = self.runfunc('ls -l /bin', withexitstatus=1)
+ def test_run(self):
+ the_old_way = subprocess.Popen(
+ args=['uname', '-m', '-n'],
+ stdout=subprocess.PIPE
+ ).communicate()[0].rstrip()
+
+ (the_new_way, exitstatus) = self.runfunc(
+ 'uname -m -n', withexitstatus=1)
the_new_way = the_new_way.replace(self.cr, self.empty).rstrip()
+
self.assertEqual(self.prep_subprocess_out(the_old_way), the_new_way)
self.assertEqual(exitstatus, 0)
- def test_run_callback (self): # TODO it seems like this test could block forever if run fails...
- self.runfunc("cat", timeout=1, events={pexpect.TIMEOUT:timeout_callback})
+ def test_run_callback(self):
+ # TODO it seems like this test could block forever if run fails...
+ events = {pexpect.TIMEOUT: timeout_callback}
+ self.runfunc("cat", timeout=1, events=events)
- def test_run_bad_exitstatus (self):
- (the_new_way, exitstatus) = self.runfunc('ls -l /najoeufhdnzkxjd',
- withexitstatus=1)
+ def test_run_bad_exitstatus(self):
+ (the_new_way, exitstatus) = self.runfunc(
+ 'ls -l /najoeufhdnzkxjd', withexitstatus=1)
assert exitstatus != 0
+ def test_run_event_as_string(self):
+ events = [
+ # second match on 'abc', echo 'def'
+ ('abc\r\n.*GO:', 'echo "def"\n'),
+ # final match on 'def': exit
+ ('def\r\n.*GO:', 'exit\n'),
+ # first match on 'GO:' prompt, echo 'abc'
+ ('GO:', 'echo "abc"\n')
+ ]
+
+ (data, exitstatus) = pexpect.run(
+ 'bash --rcfile {0}'.format(self.rcfile),
+ withexitstatus=True,
+ events=events,
+ timeout=10)
+ assert exitstatus == 0
+
+ def test_run_event_as_function(self):
+ events = [
+ ('GO:', function_events_callback)
+ ]
+
+ (data, exitstatus) = pexpect.run(
+ 'bash --rcfile {0}'.format(self.rcfile),
+ withexitstatus=True,
+ events=events,
+ timeout=10)
+ assert exitstatus == 0
+
+ def test_run_event_as_method(self):
+ events = [
+ ('GO:', self._method_events_callback)
+ ]
+
+ (data, exitstatus) = pexpect.run(
+ 'bash --rcfile {0}'.format(self.rcfile),
+ withexitstatus=True,
+ events=events,
+ timeout=10)
+ assert exitstatus == 0
+
+ def test_run_event_typeerror(self):
+ events = [('GO:', -1)]
+ with self.assertRaises(TypeError):
+ pexpect.run('bash --rcfile {0}'.format(self.rcfile),
+ withexitstatus=True,
+ events=events,
+ timeout=10)
+
+ def _method_events_callback(self, values):
+ try:
+ previous_echoed = (values["child_result_list"][-1].decode()
+ .split("\n")[-2].strip())
+ if previous_echoed.endswith("foo1"):
+ return "echo foo2\n"
+ elif previous_echoed.endswith("foo2"):
+ return "echo foo3\n"
+ elif previous_echoed.endswith("foo3"):
+ return "exit\n"
+ else:
+ raise Exception("Unexpected output {0!r}"
+ .format(previous_echoed))
+ except IndexError:
+ return "echo foo1\n"
+
+
class RunUnicodeFuncTestCase(RunFuncTestCase):
runfunc = staticmethod(pexpect.runu)
cr = b'\r'.decode('ascii')
empty = b''.decode('ascii')
prep_subprocess_out = staticmethod(lambda x: x.decode('utf-8', 'replace'))
+
def test_run_unicode(self):
if pexpect.PY3:
- c = chr(254) # þ
+ char = chr(254) # þ
pattern = '<in >'
else:
- c = unichr(254) # analysis:ignore
+ char = unichr(254) # analysis:ignore
pattern = '<in >'.decode('ascii')
- def callback(d):
- if d['event_count'] == 0:
- return c + '\n'
+ def callback(values):
+ if values['event_count'] == 0:
+ return char + '\n'
else:
return True # Stop the child process
output = pexpect.runu(sys.executable + ' echo_w_prompt.py',
- env={'PYTHONIOENCODING':'utf-8'},
- events={pattern:callback})
+ env={'PYTHONIOENCODING': 'utf-8'},
+ events={pattern: callback})
assert isinstance(output, unicode_type), type(output)
- assert '<out>'+c in output, output
+ assert ('<out>' + char) in output, output
if __name__ == '__main__':
unittest.main()
diff --git a/tests/test_screen.py b/tests/test_screen.py
index 3f0736b..2429e57 100755
--- a/tests/test_screen.py
+++ b/tests/test_screen.py
@@ -19,10 +19,14 @@ PEXPECT LICENSE
'''
+import sys
+
from pexpect import screen
import unittest
from . import PexpectTestCase
+PY3 = (sys.version_info[0] >= 3)
+
fill1_target='XXXXXXXXXX\n' + \
'XOOOOOOOOX\n' + \
'XO::::::OX\n' + \
@@ -76,6 +80,17 @@ insert_target = 'ZXZZZZZZXZ\n' +\
'ZZ/2.4.6ZZ'
get_region_target = ['......', '.\\/...', './\\...', '......']
+unicode_box_unicode_result = u'\u2554\u2557\n\u255A\u255D'
+unicode_box_pretty_result = u'''\
++--+
+|\u2554\u2557|
+|\u255A\u255D|
++--+
+'''
+unicode_box_ascii_bytes_result = b'??\n??'
+unicode_box_cp437_bytes_result = b'\xc9\xbb\n\xc8\xbc'
+unicode_box_utf8_bytes_result = b'\xe2\x95\x94\xe2\x95\x97\n\xe2\x95\x9a\xe2\x95\x9d'
+
class screenTestCase (PexpectTestCase.PexpectTestCase):
def make_screen_with_put (self):
s = screen.screen(10,10)
@@ -168,20 +183,101 @@ class screenTestCase (PexpectTestCase.PexpectTestCase):
s.insert_abs (10,9,'Z')
s.insert_abs (10,9,'Z')
assert str(s) == insert_target
- # def test_write (self):
- # s = screen.screen (6,65)
- # s.fill('.')
- # s.cursor_home()
- # for c in write_text:
- # s.write (c)
- # print str(s)
- # assert str(s) == write_target
- # def test_tetris (self):
- # s = screen.screen (24,80)
- # tetris_text = open ('tetris.data').read()
- # for c in tetris_text:
- # s.write (c)
- # assert str(s) == tetris_target
+
+ def make_screen_with_box_unicode(self, *args, **kwargs):
+ '''Creates a screen containing a box drawn using double-line
+ line drawing characters. The characters are fed in as
+ unicode. '''
+ s = screen.screen (2,2,*args,**kwargs)
+ s.put_abs (1,1,u'\u2554')
+ s.put_abs (1,2,u'\u2557')
+ s.put_abs (2,1,u'\u255A')
+ s.put_abs (2,2,u'\u255D')
+ return s
+
+ def make_screen_with_box_cp437(self, *args, **kwargs):
+ '''Creates a screen containing a box drawn using double-line
+ line drawing characters. The characters are fed in as
+ CP437. '''
+ s = screen.screen (2,2,*args,**kwargs)
+ s.put_abs (1,1,b'\xc9')
+ s.put_abs (1,2,b'\xbb')
+ s.put_abs (2,1,b'\xc8')
+ s.put_abs (2,2,b'\xbc')
+ return s
+
+ def make_screen_with_box_utf8(self, *args, **kwargs):
+ '''Creates a screen containing a box drawn using double-line
+ line drawing characters. The characters are fed in as
+ UTF-8. '''
+ s = screen.screen (2,2,*args,**kwargs)
+ s.put_abs (1,1,b'\xe2\x95\x94')
+ s.put_abs (1,2,b'\xe2\x95\x97')
+ s.put_abs (2,1,b'\xe2\x95\x9a')
+ s.put_abs (2,2,b'\xe2\x95\x9d')
+ return s
+
+ def test_unicode_ascii (self):
+ # With the default encoding set to ASCII, we should still be
+ # able to feed in unicode strings and get them back out:
+ s = self.make_screen_with_box_unicode('ascii')
+ if PY3:
+ assert str(s) == unicode_box_unicode_result
+ else:
+ assert unicode(s) == unicode_box_unicode_result
+ # And we should still get something for Python 2 str(), though
+ # it might not be very useful
+ str(s)
+
+ assert s.pretty() == unicode_box_pretty_result
+
+ def test_decoding_errors(self):
+ # With strict error handling, it should reject bytes it can't decode
+ with self.assertRaises(UnicodeDecodeError):
+ self.make_screen_with_box_cp437('ascii', 'strict')
+
+ # replace should turn them into unicode replacement characters, U+FFFD
+ s = self.make_screen_with_box_cp437('ascii', 'replace')
+ expected = u'\ufffd\ufffd\n\ufffd\ufffd'
+ if PY3:
+ assert str(s) == expected
+ else:
+ assert unicode(s) == expected
+
+ def test_unicode_cp437 (self):
+ # Verify decoding from and re-encoding to CP437.
+ s = self.make_screen_with_box_cp437('cp437','strict')
+ if PY3:
+ assert str(s) == unicode_box_unicode_result
+ else:
+ assert unicode(s) == unicode_box_unicode_result
+ assert str(s) == unicode_box_cp437_bytes_result
+ assert s.pretty() == unicode_box_pretty_result
+
+ def test_unicode_utf8 (self):
+ # Verify decoding from and re-encoding to UTF-8.
+ s = self.make_screen_with_box_utf8('utf-8','strict')
+ if PY3:
+ assert str(s) == unicode_box_unicode_result
+ else:
+ assert unicode(s) == unicode_box_unicode_result
+ assert str(s) == unicode_box_utf8_bytes_result
+ assert s.pretty() == unicode_box_pretty_result
+
+ def test_no_bytes(self):
+ s = screen.screen(2, 2, encoding=None)
+ s.put_abs(1, 1, u'A')
+ s.put_abs(2, 2, u'D')
+
+ with self.assertRaises(TypeError):
+ s.put_abs(1, 2, b'B')
+
+ if PY3:
+ assert str(s) == u'A \n D'
+ else:
+ assert unicode(s) == u'A \n D'
+ # This will still work if it's limited to ascii
+ assert str(s) == b'A \n D'
if __name__ == '__main__':
unittest.main()
diff --git a/tests/test_which.py b/tests/test_which.py
index 83575fb..bda3333 100644
--- a/tests/test_which.py
+++ b/tests/test_which.py
@@ -1,9 +1,14 @@
+import subprocess
import tempfile
+import shutil
+import errno
import os
import pexpect
from . import PexpectTestCase
+import pytest
+
class TestCaseWhich(PexpectTestCase.PexpectTestCase):
" Tests for pexpect.which(). "
@@ -162,27 +167,101 @@ class TestCaseWhich(PexpectTestCase.PexpectTestCase):
try:
# setup
os.environ['PATH'] = bin_dir
- with open(bin_path, 'w') as fp:
- fp.write('#!/bin/sh\necho hello, world\n')
- for should_match, mode in ((False, 0o000),
- (True, 0o005),
- (True, 0o050),
- (True, 0o500),
- (False, 0o004),
- (False, 0o040),
- (False, 0o400)):
+
+ # an interpreted script requires the ability to read,
+ # whereas a binary program requires only to be executable.
+ #
+ # to gain access to a binary program, we make a copy of
+ # the existing system program echo(1).
+ bin_echo = None
+ for pth in ('/bin/echo', '/usr/bin/echo'):
+ if os.path.exists(pth):
+ bin_echo = pth
+ break
+ bin_which = None
+ for pth in ('/bin/which', '/usr/bin/which'):
+ if os.path.exists(pth):
+ bin_which = pth
+ break
+ if not bin_echo or not bin_which:
+ pytest.skip('needs `echo` and `which` binaries')
+ shutil.copy(bin_echo, bin_path)
+ isroot = os.getuid() == 0
+ for should_match, mode in (
+ # note that although the file may have matching 'group' or
+ # 'other' executable permissions, it is *not* executable
+ # because the current uid is the owner of the file -- which
+ # takes precedence
+ (False, 0o000), # ----------, no
+ (isroot, 0o001), # ---------x, no
+ (isroot, 0o010), # ------x---, no
+ (True, 0o100), # ---x------, yes
+ (False, 0o002), # --------w-, no
+ (False, 0o020), # -----w----, no
+ (False, 0o200), # --w-------, no
+ (isroot, 0o003), # --------wx, no
+ (isroot, 0o030), # -----wx---, no
+ (True, 0o300), # --wx------, yes
+ (False, 0o004), # -------r--, no
+ (False, 0o040), # ----r-----, no
+ (False, 0o400), # -r--------, no
+ (isroot, 0o005), # -------r-x, no
+ (isroot, 0o050), # ----r-x---, no
+ (True, 0o500), # -r-x------, yes
+ (False, 0o006), # -------rw-, no
+ (False, 0o060), # ----rw----, no
+ (False, 0o600), # -rw-------, no
+ (isroot, 0o007), # -------rwx, no
+ (isroot, 0o070), # ----rwx---, no
+ (True, 0o700), # -rwx------, yes
+ (isroot, 0o4001), # ---S-----x, no
+ (isroot, 0o4010), # ---S--x---, no
+ (True, 0o4100), # ---s------, yes
+ (isroot, 0o4003), # ---S----wx, no
+ (isroot, 0o4030), # ---S-wx---, no
+ (True, 0o4300), # --ws------, yes
+ (isroot, 0o2001), # ------S--x, no
+ (isroot, 0o2010), # ------s---, no
+ (True, 0o2100), # ---x--S---, yes
+
+ ):
+ mode_str = '{0:0>4o}'.format(mode)
+
+ # given file mode,
os.chmod(bin_path, mode)
- if not should_match:
- # should not be found because it is not executable
- assert pexpect.which(fname) is None
- else:
- # should match full path
- assert pexpect.which(fname) == bin_path
+ # exercise whether we may execute
+ can_execute = True
+ try:
+ subprocess.Popen(fname).wait() == 0
+ except OSError as err:
+ if err.errno != errno.EACCES:
+ raise
+ # permission denied
+ can_execute = False
+
+ assert should_match == can_execute, (
+ should_match, can_execute, mode_str)
+
+ # exercise whether which(1) would match
+ proc = subprocess.Popen((bin_which, fname),
+ env={'PATH': bin_dir},
+ stdout=subprocess.PIPE)
+ bin_which_match = bool(not proc.wait())
+ assert should_match == bin_which_match, (
+ should_match, bin_which_match, mode_str)
+
+ # finally, exercise pexpect's which(1) matches
+ # the same.
+ pexpect_match = bool(pexpect.which(fname))
+
+ assert should_match == pexpect_match == bin_which_match, (
+ should_match, pexpect_match, bin_which_match, mode_str)
finally:
# restore,
os.environ['PATH'] = save_path
+
# destroy scratch files and folders,
if os.path.exists(bin_path):
os.unlink(bin_path)