diff options
author | Jeff Quast <contact@jeffquast.com> | 2015-04-26 21:51:20 -0700 |
---|---|---|
committer | Jeff Quast <contact@jeffquast.com> | 2015-04-26 21:51:20 -0700 |
commit | 7f046a6cf86d8f60a6cf23c40ef625e5acbc1a32 (patch) | |
tree | abde38287c6d4d38589d448fab6f53b4561fa7c4 /tests | |
parent | bdfaaee26d2fb9f4bf0891918e6a6039eaf3a4b6 (diff) | |
parent | 82d4937b73a2fc49824e1f60fa0e036731a03135 (diff) | |
download | pexpect-git-7f046a6cf86d8f60a6cf23c40ef625e5acbc1a32.tar.gz |
Merge remote-tracking branch 'origin/master' into document-blocking-write
Diffstat (limited to 'tests')
-rw-r--r-- | tests/PexpectTestCase.py | 48 | ||||
-rw-r--r-- | tests/README | 18 | ||||
-rwxr-xr-x | tests/test_ansi.py | 78 | ||||
-rw-r--r-- | tests/test_async.py | 51 | ||||
-rwxr-xr-x | tests/test_constructor.py | 10 | ||||
-rwxr-xr-x | tests/test_ctrl_chars.py | 7 | ||||
-rwxr-xr-x | tests/test_expect.py | 37 | ||||
-rwxr-xr-x | tests/test_interact.py | 18 | ||||
-rw-r--r-- | tests/test_maxcanon.py | 176 | ||||
-rwxr-xr-x | tests/test_misc.py | 67 | ||||
-rw-r--r-- | tests/test_replwrap.py | 2 | ||||
-rw-r--r-- | tests/test_repr.py | 26 | ||||
-rwxr-xr-x | tests/test_run.py | 150 | ||||
-rwxr-xr-x | tests/test_screen.py | 124 | ||||
-rw-r--r-- | tests/test_which.py | 109 |
15 files changed, 795 insertions, 126 deletions
diff --git a/tests/PexpectTestCase.py b/tests/PexpectTestCase.py index 7a9574e..307437e 100644 --- a/tests/PexpectTestCase.py +++ b/tests/PexpectTestCase.py @@ -22,26 +22,68 @@ from __future__ import print_function import contextlib import unittest +import signal import sys import os + class PexpectTestCase(unittest.TestCase): def setUp(self): self.PYTHONBIN = sys.executable self.original_path = os.getcwd() tests_dir = os.path.dirname(__file__) self.project_dir = project_dir = os.path.dirname(tests_dir) + + # all tests are executed in this folder; there are many auxiliary + # programs in this folder executed by spawn(). os.chdir(tests_dir) - os.environ['COVERAGE_PROCESS_START'] = os.path.join(project_dir, '.coveragerc') + + # If the pexpect raises an exception after fork(), but before + # exec(), our test runner *also* forks. We prevent this by + # storing our pid and asserting equality on tearDown. + self.pid = os.getpid() + + coverage_rc = os.path.join(project_dir, '.coveragerc') + os.environ['COVERAGE_PROCESS_START'] = coverage_rc os.environ['COVERAGE_FILE'] = os.path.join(project_dir, '.coverage') print('\n', self.id(), end=' ') sys.stdout.flush() + + # some build agents will ignore SIGHUP and SIGINT, which python + # inherits. This causes some of the tests related to terminate() + # to fail. We set them to the default handlers that they should + # be, and restore them back to their SIG_IGN value on tearDown. + # + # I'm not entirely convinced they need to be restored, only our + # test runner is affected. + self.restore_ignored_signals = [ + value for value in (signal.SIGHUP, signal.SIGINT,) + if signal.getsignal(value) == signal.SIG_IGN] + if signal.SIGHUP in self.restore_ignored_signals: + # sighup should be set to default handler + signal.signal(signal.SIGHUP, signal.SIG_DFL) + if signal.SIGINT in self.restore_ignored_signals: + # SIGINT should be set to signal.default_int_handler + signal.signal(signal.SIGINT, signal.default_int_handler) unittest.TestCase.setUp(self) def tearDown(self): - os.chdir (self.original_path) + # restore original working folder + os.chdir(self.original_path) + + if self.pid != os.getpid(): + # The build server pattern-matches phrase 'Test runner has forked!' + print("Test runner has forked! This means a child process raised " + "an exception before exec() in a test case, the error is " + "more than likely found above this line in stderr.", + file=sys.stderr) + exit(1) + + # restore signal handlers + for signal_value in self.restore_ignored_signals: + signal.signal(signal_value, signal.SIG_IGN) - if sys.version_info < (2,7): + if sys.version_info < (2, 7): # We want to use these methods, which are new/improved in 2.7, but # we are still supporting 2.6 for the moment. This section can be # removed when we drop Python 2.6 support. diff --git a/tests/README b/tests/README index 295632b..ef5b613 100644 --- a/tests/README +++ b/tests/README @@ -1,18 +1,8 @@ -The best way to run these tests is from the directory above this one. Source -the test.env environment file. This will make sure that you are using the -correct pexpect.py file otherwise Python might try to import a different -version if it is already installed in this environment. Then run the testall.py -script in the tools/ directory. This script will automatically build a test -suite from all the test scripts in the tests/ directory. This allows you to add -new test scripts simply by dropping them in the tests/ directory. You don't -have to register the test or do anything else to integrate it into the test -suite. +The best way to run these tests is from the directory above this one. Run: -For example, this is the normal set of commands you would use to run all tests -in the tests/ directory: + py.test - $ cd /home/user/pexpect_dev/ - $ . test.env - $ ./tools/testall.py +To run a specific test file: + py.test tests/test_constructor.py diff --git a/tests/test_ansi.py b/tests/test_ansi.py index 3b8d6a9..a9d445e 100755 --- a/tests/test_ansi.py +++ b/tests/test_ansi.py @@ -21,6 +21,9 @@ PEXPECT LICENSE from pexpect import ANSI import unittest from . import PexpectTestCase +import sys + +PY3 = (sys.version_info[0] >= 3) write_target = 'I\'ve got a ferret sticking up my nose. \n' +\ '(He\'s got a ferret sticking up his nose.) \n' +\ @@ -142,10 +145,81 @@ class ansiTestCase (PexpectTestCase.PexpectTestCase): def test_number_x(self): """Test the FSM state used to handle more than 2 numeric parameters.""" - s = ANSI.ANSI(1, 20) + class TestANSI(ANSI.ANSI): + captured_memory = None + def do_sgr(self, fsm): + assert self.captured_memory is None + self.captured_memory = fsm.memory + + s = TestANSI(1, 20) s.write('\x1b[0;1;32;45mtest') assert str(s) == ('test ') - assert(s.state.memory == [s, '0', '1', '32', '45']) + assert s.captured_memory is not None + assert s.captured_memory == [s, '0', '1', '32', '45'] + + def test_fsm_memory(self): + """Test the FSM stack/memory does not have numbers left on it + after some sequences with numbers are passed in.""" + s = ANSI.ANSI(1, 20) + s.write('\x1b[0;1;2;3m\x1b[4;5;6;7q\x1b[?8h\x1b[?9ltest') + assert str(s) == ('test ') + assert s.state.memory == [s] + + def test_utf8_bytes(self): + """Test that when bytes are passed in containing UTF-8 encoded + characters, where the encoding of each character consists of + multiple bytes, the characters are correctly decoded. + Incremental decoding is also tested.""" + s = ANSI.ANSI(2, 10, encoding='utf-8') + # This is the UTF-8 encoding of the UCS character "HOURGLASS" + # followed by the UTF-8 encoding of the UCS character + # "KEYBOARD". These characters can't be encoded in cp437 or + # latin-1. The "KEYBOARD" character is split into two + # separate writes. + s.write(b'\xe2\x8c\x9b') + s.write(b'\xe2\x8c') + s.write(b'\xa8') + if PY3: + assert str(s) == u'\u231b\u2328 \n ' + else: + assert unicode(s) == u'\u231b\u2328 \n ' + assert str(s) == b'\xe2\x8c\x9b\xe2\x8c\xa8 \n ' + assert s.dump() == u'\u231b\u2328 ' + assert s.pretty() == u'+----------+\n|\u231b\u2328 |\n| |\n+----------+\n' + assert s.get_abs(1, 1) == u'\u231b' + assert s.get_region(1, 1, 1, 5) == [u'\u231b\u2328 '] + + def test_unicode(self): + """Test passing in of a unicode string.""" + s = ANSI.ANSI(2, 10, encoding="utf-8") + s.write(u'\u231b\u2328') + if PY3: + assert str(s) == u'\u231b\u2328 \n ' + else: + assert unicode(s) == u'\u231b\u2328 \n ' + assert str(s) == b'\xe2\x8c\x9b\xe2\x8c\xa8 \n ' + assert s.dump() == u'\u231b\u2328 ' + assert s.pretty() == u'+----------+\n|\u231b\u2328 |\n| |\n+----------+\n' + assert s.get_abs(1, 1) == u'\u231b' + assert s.get_region(1, 1, 1, 5) == [u'\u231b\u2328 '] + + def test_decode_error(self): + """Test that default handling of decode errors replaces the + invalid characters.""" + s = ANSI.ANSI(2, 10, encoding="ascii") + s.write(b'\xff') # a non-ASCII character + # In unicode, the non-ASCII character is replaced with + # REPLACEMENT CHARACTER. + if PY3: + assert str(s) == u'\ufffd \n ' + else: + assert unicode(s) == u'\ufffd \n ' + assert str(s) == b'? \n ' + assert s.dump() == u'\ufffd ' + assert s.pretty() == u'+----------+\n|\ufffd |\n| |\n+----------+\n' + assert s.get_abs(1, 1) == u'\ufffd' + assert s.get_region(1, 1, 1, 5) == [u'\ufffd '] + if __name__ == '__main__': unittest.main() diff --git a/tests/test_async.py b/tests/test_async.py new file mode 100644 index 0000000..ce75572 --- /dev/null +++ b/tests/test_async.py @@ -0,0 +1,51 @@ +try: + import asyncio +except ImportError: + asyncio = None + +import sys +import unittest + +import pexpect +from .PexpectTestCase import PexpectTestCase + +def run(coro): + return asyncio.get_event_loop().run_until_complete(coro) + +@unittest.skipIf(asyncio is None, "Requires asyncio") +class AsyncTests(PexpectTestCase): + def test_simple_expect(self): + p = pexpect.spawn('cat') + p.sendline('Hello asyncio') + coro = p.expect(['Hello', pexpect.EOF] , async=True) + assert run(coro) == 0 + print('Done') + + def test_timeout(self): + p = pexpect.spawn('cat') + coro = p.expect('foo', timeout=1, async=True) + with self.assertRaises(pexpect.TIMEOUT): + run(coro) + + p = pexpect.spawn('cat') + coro = p.expect(['foo', pexpect.TIMEOUT], timeout=1, async=True) + assert run(coro) == 1 + + def test_eof(self): + p = pexpect.spawn('cat') + p.sendline('Hi') + coro = p.expect(pexpect.EOF, async=True) + p.sendeof() + assert run(coro) == 0 + + p = pexpect.spawn('cat') + p.sendeof() + coro = p.expect('Blah', async=True) + with self.assertRaises(pexpect.EOF): + run(coro) + + def test_expect_exact(self): + p = pexpect.spawn('%s list100.py' % sys.executable) + assert run(p.expect_exact(b'5', async=True)) == 0 + assert run(p.expect_exact(['wpeok', b'11'], async=True)) == 1 + assert run(p.expect_exact([b'foo', pexpect.EOF], async=True)) == 1 diff --git a/tests/test_constructor.py b/tests/test_constructor.py index 60525a0..98c473a 100755 --- a/tests/test_constructor.py +++ b/tests/test_constructor.py @@ -28,11 +28,11 @@ class TestCaseConstructor(PexpectTestCase.PexpectTestCase): the same results for different styles of invoking __init__(). This assumes that the root directory / is static during the test. ''' - p1 = pexpect.spawn('/bin/ls -l /bin') - p2 = pexpect.spawn('/bin/ls' ,['-l', '/bin']) - p1.expect (pexpect.EOF) - p2.expect (pexpect.EOF) - assert (p1.before == p2.before) + p1 = pexpect.spawn('uname -m -n -p -r -s -v') + p2 = pexpect.spawn('uname', ['-m', '-n', '-p', '-r', '-s', '-v']) + p1.expect(pexpect.EOF) + p2.expect(pexpect.EOF) + assert p1.before == p2.before def test_named_parameters (self): '''This tests that named parameters work. diff --git a/tests/test_ctrl_chars.py b/tests/test_ctrl_chars.py index 9c7b869..10d03db 100755 --- a/tests/test_ctrl_chars.py +++ b/tests/test_ctrl_chars.py @@ -26,6 +26,9 @@ from . import PexpectTestCase import time import sys +from ptyprocess import ptyprocess +ptyprocess._make_eof_intr() + if sys.version_info[0] >= 3: def byte(i): return bytes([i]) @@ -54,7 +57,7 @@ class TestCtrlChars(PexpectTestCase.PexpectTestCase): child = pexpect.spawn('python getch.py', echo=False, timeout=5) child.expect('READY') child.sendintr() - child.expect(str(child._INTR) + '<STOP>') + child.expect(str(ord(ptyprocess._INTR)) + '<STOP>') child.send(byte(0)) child.expect('0<STOP>') @@ -66,7 +69,7 @@ class TestCtrlChars(PexpectTestCase.PexpectTestCase): child = pexpect.spawn('python getch.py', echo=False, timeout=5) child.expect('READY') child.sendeof() - child.expect(str(child._EOF) + '<STOP>') + child.expect(str(ord(ptyprocess._EOF)) + '<STOP>') child.send(byte(0)) child.expect('0<STOP>') diff --git a/tests/test_expect.py b/tests/test_expect.py index 8ccb9c5..3f4c9d8 100755 --- a/tests/test_expect.py +++ b/tests/test_expect.py @@ -18,11 +18,13 @@ PEXPECT LICENSE OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. ''' +import multiprocessing import unittest import subprocess import time import signal import sys +import os import pexpect from . import PexpectTestCase @@ -542,7 +544,40 @@ class ExpectTestCase (PexpectTestCase.PexpectTestCase): signal.alarm(1) p1.expect('END') + def test_stdin_closed(self): + ''' + Ensure pexpect continues to operate even when stdin is closed + ''' + class Closed_stdin_proc(multiprocessing.Process): + def run(self): + sys.__stdin__.close() + cat = pexpect.spawn('cat') + cat.sendeof() + cat.expect(pexpect.EOF) + + proc = Closed_stdin_proc() + proc.start() + proc.join() + assert proc.exitcode == 0 + + def test_stdin_stdout_closed(self): + ''' + Ensure pexpect continues to operate even when stdin and stdout is closed + ''' + class Closed_stdin_stdout_proc(multiprocessing.Process): + def run(self): + sys.__stdin__.close() + sys.__stdout__.close() + cat = pexpect.spawn('cat') + cat.sendeof() + cat.expect(pexpect.EOF) + + proc = Closed_stdin_stdout_proc() + proc.start() + proc.join() + assert proc.exitcode == 0 + if __name__ == '__main__': unittest.main() -suite = unittest.makeSuite(ExpectTestCase,'test') +suite = unittest.makeSuite(ExpectTestCase, 'test') diff --git a/tests/test_interact.py b/tests/test_interact.py index 06fc44a..e635cb0 100755 --- a/tests/test_interact.py +++ b/tests/test_interact.py @@ -66,9 +66,12 @@ class InteractTestCase (PexpectTestCase.PexpectTestCase): p.expect(b'<out>alpha') p.expect(b'<out>beta') p.sendeof() - p.expect_exact('<eof>') - p.expect_exact('Escaped interact') - p.expect(pexpect.EOF) + # strangely, on travis-ci, sendeof() terminates the subprocess, + # it doesn't receive ^D, just immediately throws EOF. + idx = p.expect_exact(['<eof>', pexpect.EOF]) + if idx == 0: + p.expect_exact('Escaped interact') + p.expect(pexpect.EOF) assert not p.isalive() assert p.exitstatus == 0 @@ -81,9 +84,12 @@ class InteractTestCase (PexpectTestCase.PexpectTestCase): p.expect('<out>ɑlpha') p.expect('<out>Βeta') p.sendeof() - p.expect_exact('<eof>') - p.expect_exact('Escaped interact') - p.expect(pexpect.EOF) + # strangely, on travis-ci, sendeof() terminates the subprocess, + # it doesn't receive ^D, just immediately throws EOF. + idx = p.expect_exact(['<eof>', pexpect.EOF]) + if idx == 0: + p.expect_exact('Escaped interact') + p.expect(pexpect.EOF) assert not p.isalive() assert p.exitstatus == 0 diff --git a/tests/test_maxcanon.py b/tests/test_maxcanon.py new file mode 100644 index 0000000..772a3b7 --- /dev/null +++ b/tests/test_maxcanon.py @@ -0,0 +1,176 @@ +""" Module for canonical-mode tests. """ +# std imports +import sys +import os + +# local +import pexpect +from . import PexpectTestCase + +# 3rd-party +import pytest + + +class TestCaseCanon(PexpectTestCase.PexpectTestCase): + """ + Test expected Canonical mode behavior (limited input line length). + + All systems use the value of MAX_CANON which can be found using + fpathconf(3) value PC_MAX_CANON -- with the exception of Linux + and FreeBSD. + + Linux, though defining a value of 255, actually honors the value + of 4096 from linux kernel include file tty.h definition + N_TTY_BUF_SIZE. + + Linux also does not honor IMAXBEL. termios(3) states, "Linux does not + implement this bit, and acts as if it is always set." Although these + tests ensure it is enabled, this is a non-op for Linux. + + FreeBSD supports neither, and instead uses a fraction (1/5) of the tty + speed which is always 9600. Therefor, the maximum limited input line + length is 9600 / 5 = 1920. + + These tests only ensure the correctness of the behavior described by + the sendline() docstring. pexpect is not particularly involved in + these scenarios, though if we wish to expose some kind of interface + to tty.setraw, for example, these tests may be re-purposed as such. + + Lastly, portions of these tests are skipped on Travis-CI. It produces + unexpected behavior not reproduced on Debian/GNU Linux. + """ + + def setUp(self): + super(TestCaseCanon, self).setUp() + + self.echo = False + if sys.platform.lower().startswith('linux'): + # linux is 4096, N_TTY_BUF_SIZE. + self.max_input = 4096 + self.echo = True + elif sys.platform.lower().startswith('sunos'): + # SunOS allows PC_MAX_CANON + 1; see + # https://bitbucket.org/illumos/illumos-gate/src/d07a59219ab7fd2a7f39eb47c46cf083c88e932f/usr/src/uts/common/io/ldterm.c?at=default#cl-1888 + self.max_input = os.fpathconf(0, 'PC_MAX_CANON') + 1 + elif sys.platform.lower().startswith('freebsd'): + # http://lists.freebsd.org/pipermail/freebsd-stable/2009-October/052318.html + self.max_input = 9600 / 5 + else: + # All others (probably) limit exactly at PC_MAX_CANON + self.max_input = os.fpathconf(0, 'PC_MAX_CANON') + + @pytest.mark.skipif( + sys.platform.lower().startswith('freebsd'), + reason='os.write to BLOCK indefinitely on FreeBSD in this case' + ) + def test_under_max_canon(self): + " BEL is not sent by terminal driver at maximum bytes - 1. " + # given, + child = pexpect.spawn('bash', echo=self.echo, timeout=5) + child.sendline('echo READY') + child.sendline('stty icanon imaxbel') + child.sendline('echo BEGIN; cat') + + # some systems BEL on (maximum - 1), not able to receive CR, + # even though all characters up until then were received, they + # simply cannot be transmitted, as CR is part of the transmission. + send_bytes = self.max_input - 1 + + # exercise, + child.sendline('_' * send_bytes) + + # fast forward beyond 'cat' command, as ^G can be found as part of + # set-xterm-title sequence of $PROMPT_COMMAND or $PS1. + child.expect_exact('BEGIN') + + # verify, all input is found in echo output, + child.expect_exact('_' * send_bytes) + + # BEL is not found, + with self.assertRaises(pexpect.TIMEOUT): + child.expect_exact('\a', timeout=1) + + # cleanup, + child.sendeof() # exit cat(1) + child.sendline('exit 0') # exit bash(1) + child.expect(pexpect.EOF) + assert not child.isalive() + assert child.exitstatus == 0 + + @pytest.mark.skipif( + sys.platform.lower().startswith('freebsd'), + reason='os.write to BLOCK indefinitely on FreeBSD in this case' + ) + def test_beyond_max_icanon(self): + " a single BEL is sent when maximum bytes is reached. " + # given, + child = pexpect.spawn('bash', echo=self.echo, timeout=5) + child.sendline('stty icanon imaxbel erase ^H') + child.sendline('cat') + send_bytes = self.max_input + + # exercise, + child.sendline('_' * send_bytes) + child.expect_exact('\a') + + # exercise, we must now backspace to send CR. + child.sendcontrol('h') + child.sendline() + + if os.environ.get('TRAVIS', None) == 'true': + # Travis-CI has intermittent behavior here, possibly + # because the master process is itself, a PTY? + return + + # verify the length of (maximum - 1) received by cat(1), + # which has written it back out, + child.expect_exact('_' * (send_bytes - 1)) + # and not a byte more. + with self.assertRaises(pexpect.TIMEOUT): + child.expect_exact('_', timeout=1) + + # cleanup, + child.sendeof() # exit cat(1) + child.sendline('exit 0') # exit bash(1) + child.expect_exact(pexpect.EOF) + assert not child.isalive() + assert child.exitstatus == 0 + + @pytest.mark.skipif( + sys.platform.lower().startswith('freebsd'), + reason='os.write to BLOCK indefinitely on FreeBSD in this case' + ) + def test_max_no_icanon(self): + " may exceed maximum input bytes if canonical mode is disabled. " + # given, + child = pexpect.spawn('bash', echo=self.echo, timeout=5) + child.sendline('stty -icanon imaxbel') + child.sendline('echo BEGIN; cat') + send_bytes = self.max_input + 11 + + # exercise, + child.sendline('_' * send_bytes) + + # fast forward beyond 'cat' command, as ^G can be found as part of + # set-xterm-title sequence of $PROMPT_COMMAND or $PS1. + child.expect_exact('BEGIN') + + if os.environ.get('TRAVIS', None) == 'true': + # Travis-CI has intermittent behavior here, possibly + # because the master process is itself, a PTY? + return + + # BEL is *not* found, + with self.assertRaises(pexpect.TIMEOUT): + child.expect_exact('\a', timeout=1) + + # verify, all input is found in output, + child.expect_exact('_' * send_bytes) + + # cleanup, + child.sendcontrol('c') # exit cat(1) (eof wont work in -icanon) + child.sendcontrol('c') + child.sendline('exit 0') # exit bash(1) + child.expect(pexpect.EOF) + assert not child.isalive() + assert child.exitstatus == 0 diff --git a/tests/test_misc.py b/tests/test_misc.py index d9205e4..e439240 100755 --- a/tests/test_misc.py +++ b/tests/test_misc.py @@ -178,6 +178,16 @@ class TestCaseMisc(PexpectTestCase.PexpectTestCase): with self.assertRaises(pexpect.EOF): child.expect('the unexpected') + def test_with(self): + "spawn can be used as a context manager" + with pexpect.spawn(sys.executable + ' echo_w_prompt.py') as p: + p.expect('<in >') + p.sendline(b'alpha') + p.expect(b'<out>alpha') + assert p.isalive() + + assert not p.isalive() + def test_terminate(self): " test force terminate always succeeds (SIGKILL). " child = pexpect.spawn('cat') @@ -186,41 +196,24 @@ class TestCaseMisc(PexpectTestCase.PexpectTestCase): def test_sighup(self): " validate argument `ignore_sighup=True` and `ignore_sighup=False`. " - # If a parent process sets an Ignore handler for SIGHUP (as on Fedora's - # build machines), this test breaks. We temporarily restore the default - # handler, so the child process will quit. However, we can't simply - # replace any installed handler, because getsignal returns None for - # handlers not set in Python code, so we wouldn't be able to restore - # them. - if signal.getsignal(signal.SIGHUP) == signal.SIG_IGN: - signal.signal(signal.SIGHUP, signal.SIG_DFL) - restore_sig_ign = True - else: - restore_sig_ign = False - getch = sys.executable + ' getch.py' - try: - child = pexpect.spawn(getch, ignore_sighup=True) - child.expect('READY') - child.kill(signal.SIGHUP) - for _ in range(10): - if not child.isalive(): - self.fail('Child process should not have exited.') - time.sleep(0.1) - - child = pexpect.spawn(getch, ignore_sighup=False) - child.expect('READY') - child.kill(signal.SIGHUP) - for _ in range(10): - if not child.isalive(): - break - time.sleep(0.1) - else: - self.fail('Child process should have exited.') - - finally: - if restore_sig_ign: - signal.signal(signal.SIGHUP, signal.SIG_IGN) + child = pexpect.spawn(getch, ignore_sighup=True) + child.expect('READY') + child.kill(signal.SIGHUP) + for _ in range(10): + if not child.isalive(): + self.fail('Child process should not have exited.') + time.sleep(0.1) + + child = pexpect.spawn(getch, ignore_sighup=False) + child.expect('READY') + child.kill(signal.SIGHUP) + for _ in range(10): + if not child.isalive(): + break + time.sleep(0.1) + else: + self.fail('Child process should have exited.') def test_bad_child_pid(self): " assert bad condition error in isalive(). " @@ -228,7 +221,7 @@ class TestCaseMisc(PexpectTestCase.PexpectTestCase): child = pexpect.spawn('cat') child.terminate(force=1) # Force an invalid state to test isalive - child.terminated = 0 + child.ptyproc.terminated = 0 try: with self.assertRaisesRegexp(pexpect.ExceptionPexpect, ".*" + expect_errmsg): @@ -361,9 +354,9 @@ class TestCaseMisc(PexpectTestCase.PexpectTestCase): " test forced self.__fork_pty() and __pty_make_controlling_tty " # given, class spawn_ourptyfork(pexpect.spawn): - def _spawn(self, command, args=[]): + def _spawn(self, command, args=[], preexec_fn=None): self.use_native_pty_fork = False - pexpect.spawn._spawn(self, command, args) + pexpect.spawn._spawn(self, command, args, preexec_fn) # exercise, p = spawn_ourptyfork('cat', echo=False) diff --git a/tests/test_replwrap.py b/tests/test_replwrap.py index 14f7c39..28c7599 100644 --- a/tests/test_replwrap.py +++ b/tests/test_replwrap.py @@ -26,7 +26,7 @@ class REPLWrapTestCase(unittest.TestCase): assert 'real' in res, res # PAGER should be set to cat, otherwise man hangs - res = bash.run_command('man sleep', timeout=2) + res = bash.run_command('man sleep', timeout=5) assert 'SLEEP' in res, res def test_multiline(self): diff --git a/tests/test_repr.py b/tests/test_repr.py new file mode 100644 index 0000000..ce618d4 --- /dev/null +++ b/tests/test_repr.py @@ -0,0 +1,26 @@ +""" Test __str__ methods. """ +import pexpect + +from . import PexpectTestCase + + +class TestCaseMisc(PexpectTestCase.PexpectTestCase): + + def test_str_spawnu(self): + """ Exercise spawnu.__str__() """ + # given, + p = pexpect.spawnu('cat') + # exercise, + value = str(p) + # verify + assert isinstance(value, str) + + def test_str_spawn(self): + """ Exercise spawn.__str__() """ + # given, + p = pexpect.spawn('cat') + # exercise, + value = str(p) + # verify + assert isinstance(value, str) + diff --git a/tests/test_run.py b/tests/test_run.py index 814b70a..1b3c92f 100755 --- a/tests/test_run.py +++ b/tests/test_run.py @@ -22,72 +22,170 @@ PEXPECT LICENSE import pexpect import unittest import subprocess +import tempfile import sys +import os from . import PexpectTestCase -# TODO Many of these test cases blindly assume that sequential -# TODO listing of the /bin directory will yield the same results. -# TODO This may not always be true, but seems adequate for testing for now. -# TODO I should fix this at some point. - unicode_type = str if pexpect.PY3 else unicode -def timeout_callback (d): -# print d["event_count"], - if d["event_count"]>3: + +def timeout_callback(values): + if values["event_count"] > 3: return 1 return 0 + +def function_events_callback(values): + try: + previous_echoed = (values["child_result_list"][-1] + .decode().split("\n")[-2].strip()) + if previous_echoed.endswith("stage-1"): + return "echo stage-2\n" + elif previous_echoed.endswith("stage-2"): + return "echo stage-3\n" + elif previous_echoed.endswith("stage-3"): + return "exit\n" + else: + raise Exception("Unexpected output {0}".format(previous_echoed)) + except IndexError: + return "echo stage-1\n" + + class RunFuncTestCase(PexpectTestCase.PexpectTestCase): runfunc = staticmethod(pexpect.run) cr = b'\r' empty = b'' prep_subprocess_out = staticmethod(lambda x: x) - def test_run_exit (self): + def setUp(self): + fd, self.rcfile = tempfile.mkstemp() + os.write(fd, b'PS1=GO: \n') + os.close(fd) + super(RunFuncTestCase, self).setUp() + + def tearDown(self): + os.unlink(self.rcfile) + super(RunFuncTestCase, self).tearDown() + + def test_run_exit(self): (data, exitstatus) = self.runfunc('python exit1.py', withexitstatus=1) assert exitstatus == 1, "Exit status of 'python exit1.py' should be 1." - def test_run (self): - the_old_way = subprocess.Popen(args=['ls', '-l', '/bin'], - stdout=subprocess.PIPE).communicate()[0].rstrip() - (the_new_way, exitstatus) = self.runfunc('ls -l /bin', withexitstatus=1) + def test_run(self): + the_old_way = subprocess.Popen( + args=['uname', '-m', '-n'], + stdout=subprocess.PIPE + ).communicate()[0].rstrip() + + (the_new_way, exitstatus) = self.runfunc( + 'uname -m -n', withexitstatus=1) the_new_way = the_new_way.replace(self.cr, self.empty).rstrip() + self.assertEqual(self.prep_subprocess_out(the_old_way), the_new_way) self.assertEqual(exitstatus, 0) - def test_run_callback (self): # TODO it seems like this test could block forever if run fails... - self.runfunc("cat", timeout=1, events={pexpect.TIMEOUT:timeout_callback}) + def test_run_callback(self): + # TODO it seems like this test could block forever if run fails... + events = {pexpect.TIMEOUT: timeout_callback} + self.runfunc("cat", timeout=1, events=events) - def test_run_bad_exitstatus (self): - (the_new_way, exitstatus) = self.runfunc('ls -l /najoeufhdnzkxjd', - withexitstatus=1) + def test_run_bad_exitstatus(self): + (the_new_way, exitstatus) = self.runfunc( + 'ls -l /najoeufhdnzkxjd', withexitstatus=1) assert exitstatus != 0 + def test_run_event_as_string(self): + events = [ + # second match on 'abc', echo 'def' + ('abc\r\n.*GO:', 'echo "def"\n'), + # final match on 'def': exit + ('def\r\n.*GO:', 'exit\n'), + # first match on 'GO:' prompt, echo 'abc' + ('GO:', 'echo "abc"\n') + ] + + (data, exitstatus) = pexpect.run( + 'bash --rcfile {0}'.format(self.rcfile), + withexitstatus=True, + events=events, + timeout=10) + assert exitstatus == 0 + + def test_run_event_as_function(self): + events = [ + ('GO:', function_events_callback) + ] + + (data, exitstatus) = pexpect.run( + 'bash --rcfile {0}'.format(self.rcfile), + withexitstatus=True, + events=events, + timeout=10) + assert exitstatus == 0 + + def test_run_event_as_method(self): + events = [ + ('GO:', self._method_events_callback) + ] + + (data, exitstatus) = pexpect.run( + 'bash --rcfile {0}'.format(self.rcfile), + withexitstatus=True, + events=events, + timeout=10) + assert exitstatus == 0 + + def test_run_event_typeerror(self): + events = [('GO:', -1)] + with self.assertRaises(TypeError): + pexpect.run('bash --rcfile {0}'.format(self.rcfile), + withexitstatus=True, + events=events, + timeout=10) + + def _method_events_callback(self, values): + try: + previous_echoed = (values["child_result_list"][-1].decode() + .split("\n")[-2].strip()) + if previous_echoed.endswith("foo1"): + return "echo foo2\n" + elif previous_echoed.endswith("foo2"): + return "echo foo3\n" + elif previous_echoed.endswith("foo3"): + return "exit\n" + else: + raise Exception("Unexpected output {0!r}" + .format(previous_echoed)) + except IndexError: + return "echo foo1\n" + + class RunUnicodeFuncTestCase(RunFuncTestCase): runfunc = staticmethod(pexpect.runu) cr = b'\r'.decode('ascii') empty = b''.decode('ascii') prep_subprocess_out = staticmethod(lambda x: x.decode('utf-8', 'replace')) + def test_run_unicode(self): if pexpect.PY3: - c = chr(254) # þ + char = chr(254) # þ pattern = '<in >' else: - c = unichr(254) # analysis:ignore + char = unichr(254) # analysis:ignore pattern = '<in >'.decode('ascii') - def callback(d): - if d['event_count'] == 0: - return c + '\n' + def callback(values): + if values['event_count'] == 0: + return char + '\n' else: return True # Stop the child process output = pexpect.runu(sys.executable + ' echo_w_prompt.py', - env={'PYTHONIOENCODING':'utf-8'}, - events={pattern:callback}) + env={'PYTHONIOENCODING': 'utf-8'}, + events={pattern: callback}) assert isinstance(output, unicode_type), type(output) - assert '<out>'+c in output, output + assert ('<out>' + char) in output, output if __name__ == '__main__': unittest.main() diff --git a/tests/test_screen.py b/tests/test_screen.py index 3f0736b..2429e57 100755 --- a/tests/test_screen.py +++ b/tests/test_screen.py @@ -19,10 +19,14 @@ PEXPECT LICENSE ''' +import sys + from pexpect import screen import unittest from . import PexpectTestCase +PY3 = (sys.version_info[0] >= 3) + fill1_target='XXXXXXXXXX\n' + \ 'XOOOOOOOOX\n' + \ 'XO::::::OX\n' + \ @@ -76,6 +80,17 @@ insert_target = 'ZXZZZZZZXZ\n' +\ 'ZZ/2.4.6ZZ' get_region_target = ['......', '.\\/...', './\\...', '......'] +unicode_box_unicode_result = u'\u2554\u2557\n\u255A\u255D' +unicode_box_pretty_result = u'''\ ++--+ +|\u2554\u2557| +|\u255A\u255D| ++--+ +''' +unicode_box_ascii_bytes_result = b'??\n??' +unicode_box_cp437_bytes_result = b'\xc9\xbb\n\xc8\xbc' +unicode_box_utf8_bytes_result = b'\xe2\x95\x94\xe2\x95\x97\n\xe2\x95\x9a\xe2\x95\x9d' + class screenTestCase (PexpectTestCase.PexpectTestCase): def make_screen_with_put (self): s = screen.screen(10,10) @@ -168,20 +183,101 @@ class screenTestCase (PexpectTestCase.PexpectTestCase): s.insert_abs (10,9,'Z') s.insert_abs (10,9,'Z') assert str(s) == insert_target - # def test_write (self): - # s = screen.screen (6,65) - # s.fill('.') - # s.cursor_home() - # for c in write_text: - # s.write (c) - # print str(s) - # assert str(s) == write_target - # def test_tetris (self): - # s = screen.screen (24,80) - # tetris_text = open ('tetris.data').read() - # for c in tetris_text: - # s.write (c) - # assert str(s) == tetris_target + + def make_screen_with_box_unicode(self, *args, **kwargs): + '''Creates a screen containing a box drawn using double-line + line drawing characters. The characters are fed in as + unicode. ''' + s = screen.screen (2,2,*args,**kwargs) + s.put_abs (1,1,u'\u2554') + s.put_abs (1,2,u'\u2557') + s.put_abs (2,1,u'\u255A') + s.put_abs (2,2,u'\u255D') + return s + + def make_screen_with_box_cp437(self, *args, **kwargs): + '''Creates a screen containing a box drawn using double-line + line drawing characters. The characters are fed in as + CP437. ''' + s = screen.screen (2,2,*args,**kwargs) + s.put_abs (1,1,b'\xc9') + s.put_abs (1,2,b'\xbb') + s.put_abs (2,1,b'\xc8') + s.put_abs (2,2,b'\xbc') + return s + + def make_screen_with_box_utf8(self, *args, **kwargs): + '''Creates a screen containing a box drawn using double-line + line drawing characters. The characters are fed in as + UTF-8. ''' + s = screen.screen (2,2,*args,**kwargs) + s.put_abs (1,1,b'\xe2\x95\x94') + s.put_abs (1,2,b'\xe2\x95\x97') + s.put_abs (2,1,b'\xe2\x95\x9a') + s.put_abs (2,2,b'\xe2\x95\x9d') + return s + + def test_unicode_ascii (self): + # With the default encoding set to ASCII, we should still be + # able to feed in unicode strings and get them back out: + s = self.make_screen_with_box_unicode('ascii') + if PY3: + assert str(s) == unicode_box_unicode_result + else: + assert unicode(s) == unicode_box_unicode_result + # And we should still get something for Python 2 str(), though + # it might not be very useful + str(s) + + assert s.pretty() == unicode_box_pretty_result + + def test_decoding_errors(self): + # With strict error handling, it should reject bytes it can't decode + with self.assertRaises(UnicodeDecodeError): + self.make_screen_with_box_cp437('ascii', 'strict') + + # replace should turn them into unicode replacement characters, U+FFFD + s = self.make_screen_with_box_cp437('ascii', 'replace') + expected = u'\ufffd\ufffd\n\ufffd\ufffd' + if PY3: + assert str(s) == expected + else: + assert unicode(s) == expected + + def test_unicode_cp437 (self): + # Verify decoding from and re-encoding to CP437. + s = self.make_screen_with_box_cp437('cp437','strict') + if PY3: + assert str(s) == unicode_box_unicode_result + else: + assert unicode(s) == unicode_box_unicode_result + assert str(s) == unicode_box_cp437_bytes_result + assert s.pretty() == unicode_box_pretty_result + + def test_unicode_utf8 (self): + # Verify decoding from and re-encoding to UTF-8. + s = self.make_screen_with_box_utf8('utf-8','strict') + if PY3: + assert str(s) == unicode_box_unicode_result + else: + assert unicode(s) == unicode_box_unicode_result + assert str(s) == unicode_box_utf8_bytes_result + assert s.pretty() == unicode_box_pretty_result + + def test_no_bytes(self): + s = screen.screen(2, 2, encoding=None) + s.put_abs(1, 1, u'A') + s.put_abs(2, 2, u'D') + + with self.assertRaises(TypeError): + s.put_abs(1, 2, b'B') + + if PY3: + assert str(s) == u'A \n D' + else: + assert unicode(s) == u'A \n D' + # This will still work if it's limited to ascii + assert str(s) == b'A \n D' if __name__ == '__main__': unittest.main() diff --git a/tests/test_which.py b/tests/test_which.py index 83575fb..bda3333 100644 --- a/tests/test_which.py +++ b/tests/test_which.py @@ -1,9 +1,14 @@ +import subprocess import tempfile +import shutil +import errno import os import pexpect from . import PexpectTestCase +import pytest + class TestCaseWhich(PexpectTestCase.PexpectTestCase): " Tests for pexpect.which(). " @@ -162,27 +167,101 @@ class TestCaseWhich(PexpectTestCase.PexpectTestCase): try: # setup os.environ['PATH'] = bin_dir - with open(bin_path, 'w') as fp: - fp.write('#!/bin/sh\necho hello, world\n') - for should_match, mode in ((False, 0o000), - (True, 0o005), - (True, 0o050), - (True, 0o500), - (False, 0o004), - (False, 0o040), - (False, 0o400)): + + # an interpreted script requires the ability to read, + # whereas a binary program requires only to be executable. + # + # to gain access to a binary program, we make a copy of + # the existing system program echo(1). + bin_echo = None + for pth in ('/bin/echo', '/usr/bin/echo'): + if os.path.exists(pth): + bin_echo = pth + break + bin_which = None + for pth in ('/bin/which', '/usr/bin/which'): + if os.path.exists(pth): + bin_which = pth + break + if not bin_echo or not bin_which: + pytest.skip('needs `echo` and `which` binaries') + shutil.copy(bin_echo, bin_path) + isroot = os.getuid() == 0 + for should_match, mode in ( + # note that although the file may have matching 'group' or + # 'other' executable permissions, it is *not* executable + # because the current uid is the owner of the file -- which + # takes precedence + (False, 0o000), # ----------, no + (isroot, 0o001), # ---------x, no + (isroot, 0o010), # ------x---, no + (True, 0o100), # ---x------, yes + (False, 0o002), # --------w-, no + (False, 0o020), # -----w----, no + (False, 0o200), # --w-------, no + (isroot, 0o003), # --------wx, no + (isroot, 0o030), # -----wx---, no + (True, 0o300), # --wx------, yes + (False, 0o004), # -------r--, no + (False, 0o040), # ----r-----, no + (False, 0o400), # -r--------, no + (isroot, 0o005), # -------r-x, no + (isroot, 0o050), # ----r-x---, no + (True, 0o500), # -r-x------, yes + (False, 0o006), # -------rw-, no + (False, 0o060), # ----rw----, no + (False, 0o600), # -rw-------, no + (isroot, 0o007), # -------rwx, no + (isroot, 0o070), # ----rwx---, no + (True, 0o700), # -rwx------, yes + (isroot, 0o4001), # ---S-----x, no + (isroot, 0o4010), # ---S--x---, no + (True, 0o4100), # ---s------, yes + (isroot, 0o4003), # ---S----wx, no + (isroot, 0o4030), # ---S-wx---, no + (True, 0o4300), # --ws------, yes + (isroot, 0o2001), # ------S--x, no + (isroot, 0o2010), # ------s---, no + (True, 0o2100), # ---x--S---, yes + + ): + mode_str = '{0:0>4o}'.format(mode) + + # given file mode, os.chmod(bin_path, mode) - if not should_match: - # should not be found because it is not executable - assert pexpect.which(fname) is None - else: - # should match full path - assert pexpect.which(fname) == bin_path + # exercise whether we may execute + can_execute = True + try: + subprocess.Popen(fname).wait() == 0 + except OSError as err: + if err.errno != errno.EACCES: + raise + # permission denied + can_execute = False + + assert should_match == can_execute, ( + should_match, can_execute, mode_str) + + # exercise whether which(1) would match + proc = subprocess.Popen((bin_which, fname), + env={'PATH': bin_dir}, + stdout=subprocess.PIPE) + bin_which_match = bool(not proc.wait()) + assert should_match == bin_which_match, ( + should_match, bin_which_match, mode_str) + + # finally, exercise pexpect's which(1) matches + # the same. + pexpect_match = bool(pexpect.which(fname)) + + assert should_match == pexpect_match == bin_which_match, ( + should_match, pexpect_match, bin_which_match, mode_str) finally: # restore, os.environ['PATH'] = save_path + # destroy scratch files and folders, if os.path.exists(bin_path): os.unlink(bin_path) |