summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNitin Madhok <nmadhok@g.clemson.edu>2015-03-27 03:54:17 -0400
committerNitin Madhok <nmadhok@g.clemson.edu>2015-03-27 03:54:17 -0400
commit33a8b0596346e4f73979ce80809b7034646777d9 (patch)
treedd79795a1dcc88a49f486a6b745a702837a82332
parentc628a62bd65b46b991f308d7bca0e4bb5ffb786c (diff)
parent450f390eaecf88ad95f429ce8b5d03fc8381f158 (diff)
downloadpexpect-33a8b0596346e4f73979ce80809b7034646777d9.tar.gz
Merge pull request #1 from pexpect/master
Merge
-rw-r--r--.travis.yml9
-rw-r--r--LICENSE2
-rw-r--r--Makefile81
-rw-r--r--README.rst1
-rw-r--r--doc/api/pxssh.rst4
-rw-r--r--doc/commonissues.rst12
-rw-r--r--doc/history.rst3
-rw-r--r--doc/overview.rst2
-rw-r--r--doc/requirements.txt1
-rwxr-xr-xexamples/cgishell.cgi4
-rw-r--r--pexpect/__init__.py1939
-rw-r--r--pexpect/async.py2
-rw-r--r--pexpect/bashrc.sh5
-rw-r--r--pexpect/exceptions.py35
-rw-r--r--pexpect/expect.py194
-rw-r--r--pexpect/fdpexpect.py21
-rw-r--r--pexpect/pty_spawn.py819
-rw-r--r--pexpect/pxssh.py21
-rw-r--r--pexpect/replwrap.py7
-rw-r--r--pexpect/spawnbase.py484
-rw-r--r--pexpect/utils.py112
-rw-r--r--setup.cfg2
-rw-r--r--setup.py14
-rw-r--r--tests/PexpectTestCase.py48
-rw-r--r--tests/README18
-rwxr-xr-xtests/test_constructor.py10
-rwxr-xr-xtests/test_ctrl_chars.py7
-rw-r--r--tests/test_maxcanon.py152
-rwxr-xr-xtests/test_misc.py67
-rw-r--r--tests/test_replwrap.py2
-rw-r--r--tests/test_repr.py26
-rwxr-xr-xtests/test_run.py38
-rw-r--r--tests/test_which.py109
-rwxr-xr-xtools/display-sighandlers.py20
-rwxr-xr-xtools/display-terminalinfo.py209
-rw-r--r--tools/dotfiles.tar.gzbin292124 -> 0 bytes
-rwxr-xr-xtools/getkey.py46
-rwxr-xr-xtools/merge_templates.py52
-rwxr-xr-xtools/pyed.py180
-rwxr-xr-xtools/sfupload.py46
-rwxr-xr-xtools/step.py47
-rwxr-xr-xtools/teamcity-coverage-report.sh27
-rwxr-xr-xtools/teamcity-runtests.sh61
-rwxr-xr-xtools/tweak_files.py46
-rwxr-xr-xtools/websync.py63
45 files changed, 2465 insertions, 2583 deletions
diff --git a/.travis.yml b/.travis.yml
index 3a2f331..51c967d 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -6,15 +6,18 @@ python:
- 3.4
- pypy
-before_install:
- - sudo apt-get install python-yaml python3-yaml
install:
- export PYTHONIOENCODING=UTF8
- - pip install coveralls pytest-cov
+ - pip install coveralls pytest-cov ptyprocess
script:
+ - ./tools/display-sighandlers.py
+ - ./tools/display-terminalinfo.py
- py.test --cov pexpect --cov-config .coveragerc
after_success:
- coverage combine
- coveralls
+
+# Use new Travis stack, should be faster
+sudo: false
diff --git a/LICENSE b/LICENSE
index 18ff9db..9e10acb 100644
--- a/LICENSE
+++ b/LICENSE
@@ -3,7 +3,9 @@ PEXPECT LICENSE
This license is approved by the OSI and FSF as GPL-compatible.
http://opensource.org/licenses/isc-license.txt
+ Copyright (c) 2013-2014, Pexpect development team
Copyright (c) 2012, Noah Spurrier <noah@noah.org>
+
PERMISSION TO USE, COPY, MODIFY, AND/OR DISTRIBUTE THIS SOFTWARE FOR ANY
PURPOSE WITH OR WITHOUT FEE IS HEREBY GRANTED, PROVIDED THAT THE ABOVE
COPYRIGHT NOTICE AND THIS PERMISSION NOTICE APPEAR IN ALL COPIES.
diff --git a/Makefile b/Makefile
deleted file mode 100644
index ef9eea2..0000000
--- a/Makefile
+++ /dev/null
@@ -1,81 +0,0 @@
-
-#
-# PEXPECT LICENSE
-#
-# This license is approved by the OSI and FSF as GPL-compatible.
-# http://opensource.org/licenses/isc-license.txt
-#
-# Copyright (c) 2012, Noah Spurrier <noah@noah.org>
-# PERMISSION TO USE, COPY, MODIFY, AND/OR DISTRIBUTE THIS SOFTWARE FOR ANY
-# PURPOSE WITH OR WITHOUT FEE IS HEREBY GRANTED, PROVIDED THAT THE ABOVE
-# COPYRIGHT NOTICE AND THIS PERMISSION NOTICE APPEAR IN ALL COPIES.
-# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
-# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
-# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
-# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
-# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
-# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
-# OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
-#
-
-SHELL = /bin/sh
-
-VERSION=2.5
-#DOCGENERATOR= happydoc
-DOCGENERATOR=pydoc -w
-# This is for GNU Make. This does not work on BSD Make.
-#MANIFEST_LINES := $(shell cat MANIFEST)
-# This is for BSD Make. This does not work on GNU Make.
-#MANIFEST_LINES != cat MANIFEST
-# I hate Makefiles.
-
-all: merge_templates docs dist
-
-merge_templates:
- python tools/merge_templates.py
-
-docs: doc/index.template.html doc/examples.html doc/clean.css doc/email.png
- make clean_docs
- make merge_templates
- #-rm -f `ls doc/*.html | sed -e 's/doc\/index\.template\.html//' | sed -e 's/doc\/index\.html//'`
- #$(DOCGENERATOR) `echo "$(MANIFEST_LINES)" | sed -e "s/\.py//g" -e "s/setup *//" -e "s/README *//"`
- #mv *.html doc/
- cd doc;\
- $(DOCGENERATOR) ../pexpect.py ../pxssh.py ../fdpexpect.py ../FSM.py ../screen.py ../ANSI.py;\
- cd ..;\
-# tar zcf pexpect-doc-$(VERSION).tar.gz doc/
-
-dist: dist/pexpect-$(VERSION).tar.gz
-
-# $(MANIFEST_LINES)
-
-dist/pexpect-$(VERSION).tar.gz:
- rm -f *.pyc
- rm -f pexpect-$(VERSION).tar.gz
- rm -f dist/pexpect-$(VERSION).tar.gz
- python setup.py sdist
-
-clean: clean_docs
- -rm -f MANIFEST
- -rm -rf __pycache__
- -rm -f *.pyc
- -rm -f tests/*.pyc
- -rm -f tools/*.pyc
- -rm -f dist/*.pyc
- -rm -f *.cover
- -rm -f tests/*.cover
- -rm -f tools/*.cover
- -rm -f dist/pexpect-$(VERSION).tar.gz
- -cd dist;rm -rf pexpect-$(VERSION)/
- -rm -f pexpect-$(VERSION).tar.gz
- -rm -f pexpect-$(VERSION)-examples.tar.gz
- -rm -f pexpect-$(VERSION)-doc.tar.gz
- -rm -f python.core
- -rm -f core
- -rm -f setup.py
- -rm -f doc/index.html
-
-clean_docs:
- -rm -f `ls doc/*.html | sed -e 's/doc\/index\.template\.html//' | sed -e 's/doc\/examples\.html//'`
-
-
diff --git a/README.rst b/README.rst
index e0bbd84..dde7ade 100644
--- a/README.rst
+++ b/README.rst
@@ -36,6 +36,7 @@ PEXPECT LICENSE
http://opensource.org/licenses/isc-license.txt
+ Copyright (c) 2013-2014, Pexpect development team
Copyright (c) 2012, Noah Spurrier <noah@noah.org>
PERMISSION TO USE, COPY, MODIFY, AND/OR DISTRIBUTE THIS SOFTWARE FOR ANY
diff --git a/doc/api/pxssh.rst b/doc/api/pxssh.rst
index 0b67839..b947f4b 100644
--- a/doc/api/pxssh.rst
+++ b/doc/api/pxssh.rst
@@ -23,6 +23,10 @@ pxssh class
server to ask for a password. Note that the sysadmin can disable password
logins, in which case this won't work.
+ .. attribute:: options
+
+ The dictionary of user specified SSH options, eg, ``options = dict(StrictHostKeyChecking="no", UserKnownHostsFile="/dev/null")``
+
.. automethod:: login
.. automethod:: logout
.. automethod:: prompt
diff --git a/doc/commonissues.rst b/doc/commonissues.rst
index c4fc58d..26d0e2b 100644
--- a/doc/commonissues.rst
+++ b/doc/commonissues.rst
@@ -101,3 +101,15 @@ The only solution I have found is to use public key authentication with SSH.
This bypasses the need for a password. I'm not happy with this solution. The
problem is due to poor support for Solaris Pseudo TTYs in the Python Standard
Library.
+
+child does not receive full input, emits BEL
+--------------------------------------------
+
+You may notice when running for example cat(1) or base64(1), when sending a
+very long input line, that it is not fully received, and the BEL ('\a') may
+be found in output.
+
+By default the child terminal matches the parent, which is often in "canonical
+mode processing". You may wish to disable this mode. The exact limit of a line
+varies by operating system, and details of disabling canonical mode may be
+found in the docstring of :meth:`~pexpect.spawn.send`.
diff --git a/doc/history.rst b/doc/history.rst
index ec1c9c3..c9d5640 100644
--- a/doc/history.rst
+++ b/doc/history.rst
@@ -15,6 +15,9 @@ Version 4.0
Version 3.4
```````````
+* Fix regression that prevented executable, but unreadable files from
+ being found when not specified by absolute path -- such as
+ /usr/bin/sudo (:ghissue:`104`).
* Fixed regression when executing pexpect with some prior releases of
the multiprocessing module where stdin has been closed (:ghissue:`86`).
diff --git a/doc/overview.rst b/doc/overview.rst
index 133767f..a04e389 100644
--- a/doc/overview.rst
+++ b/doc/overview.rst
@@ -17,7 +17,7 @@ Here is an example of Pexpect in action::
child.expect('ftp> ')
child.sendline('lcd /tmp')
child.expect('ftp> ')
- child.sendline('cd pub')
+ child.sendline('cd pub/OpenBSD')
child.expect('ftp> ')
child.sendline('get README')
child.expect('ftp> ')
diff --git a/doc/requirements.txt b/doc/requirements.txt
new file mode 100644
index 0000000..57ebb2d
--- /dev/null
+++ b/doc/requirements.txt
@@ -0,0 +1 @@
+ptyprocess
diff --git a/examples/cgishell.cgi b/examples/cgishell.cgi
index b807a8b..23bef5f 100755
--- a/examples/cgishell.cgi
+++ b/examples/cgishell.cgi
@@ -176,11 +176,11 @@ def daemonize (stdin=None, stdout=None, stderr=None, daemon_pid_filename=None):
if stderr is None: stderr = DEVNULL
try:
- pid = os.fork()
+ pid = os.fork() # fork first child
except OSError as e:
raise Exception("%s [%d]" % (e.strerror, e.errno))
- if pid != 0: # The first child.
+ if pid != 0:
os.waitpid(pid,0)
if daemon_pid_filename is not None:
daemon_pid = int(file(daemon_pid_filename,'r').read())
diff --git a/pexpect/__init__.py b/pexpect/__init__.py
index f070067..c906e89 100644
--- a/pexpect/__init__.py
+++ b/pexpect/__init__.py
@@ -63,83 +63,20 @@ PEXPECT LICENSE
'''
-try:
- import os
- import sys
- import time
- import select
- import re
- import struct
- import resource
- import types
- import pty
- import tty
- import termios
- import fcntl
- import errno
- import traceback
- import signal
- import codecs
- import stat
-except ImportError: # pragma: no cover
- err = sys.exc_info()[1]
- raise ImportError(str(err) + '''
+import sys
+import types
-A critical module was not found. Probably this operating system does not
-support it. Pexpect is intended for UNIX-like operating systems.''')
-
-from .expect import Expecter
+from .exceptions import ExceptionPexpect, EOF, TIMEOUT
+from .utils import split_command_line, which, is_executable_file
+from .pty_spawn import spawn, spawnu, PY3
+from .expect import Expecter, searcher_re, searcher_string
__version__ = '3.3'
__revision__ = ''
__all__ = ['ExceptionPexpect', 'EOF', 'TIMEOUT', 'spawn', 'spawnu', 'run', 'runu',
'which', 'split_command_line', '__version__', '__revision__']
-PY3 = (sys.version_info[0] >= 3)
-
-# Exception classes used by this module.
-class ExceptionPexpect(Exception):
- '''Base class for all exceptions raised by this module.
- '''
-
- def __init__(self, value):
- super(ExceptionPexpect, self).__init__(value)
- self.value = value
-
- def __str__(self):
- return str(self.value)
-
- def get_trace(self):
- '''This returns an abbreviated stack trace with lines that only concern
- the caller. In other words, the stack trace inside the Pexpect module
- is not included. '''
-
- tblist = traceback.extract_tb(sys.exc_info()[2])
- tblist = [item for item in tblist if ('pexpect/__init__' not in item[0])
- and ('pexpect/expect' not in item[0])]
- tblist = traceback.format_list(tblist)
- return ''.join(tblist)
-
-
-class EOF(ExceptionPexpect):
- '''Raised when EOF is read from a child.
- This usually means the child has exited.'''
-
-
-class TIMEOUT(ExceptionPexpect):
- '''Raised when a read time exceeds the timeout. '''
-
-##class TIMEOUT_PATTERN(TIMEOUT):
-## '''Raised when the pattern match time exceeds the timeout.
-## This is different than a read TIMEOUT because the child process may
-## give output, thus never give a TIMEOUT, but the output
-## may never match a pattern.
-## '''
-##class MAXBUFFER(ExceptionPexpect):
-## '''Raised when a buffer fills before matching an expected pattern.'''
-
-
-def run(command, timeout=-1, withexitstatus=False, events=None,
+def run(command, timeout=30, withexitstatus=False, events=None,
extra_args=None, logfile=None, cwd=None, env=None):
'''
@@ -198,25 +135,36 @@ def run(command, timeout=-1, withexitstatus=False, events=None,
run("mencoder dvd://1 -o video.avi -oac copy -ovc copy",
events={TIMEOUT:print_ticks}, timeout=5)
- The 'events' argument should be a dictionary of patterns and responses.
- Whenever one of the patterns is seen in the command out run() will send the
- associated response string. Note that you should put newlines in your
- string if Enter is necessary. The responses may also contain callback
- functions. Any callback is function that takes a dictionary as an argument.
+ The 'events' argument should be either a dictionary or a tuple list that
+ contains patterns and responses. Whenever one of the patterns is seen
+ in the command output, run() will send the associated response string.
+ So, run() in the above example can be also written as:
+
+ run("mencoder dvd://1 -o video.avi -oac copy -ovc copy",
+ events=[(TIMEOUT,print_ticks)], timeout=5)
+
+ Use a tuple list for events if the command output requires a delicate
+ control over what pattern should be matched, since the tuple list is passed
+ to pexpect() as its pattern list, with the order of patterns preserved.
+
+ Note that you should put newlines in your string if Enter is necessary.
+
+ Like the example above, the responses may also contain callback functions.
+ Any callback is a function that takes a dictionary as an argument.
The dictionary contains all the locals from the run() function, so you can
access the child spawn object or any other variable defined in run()
(event_count, child, and extra_args are the most useful). A callback may
- return True to stop the current run process otherwise run() continues until
- the next event. A callback may also return a string which will be sent to
- the child. 'extra_args' is not used by directly run(). It provides a way to
- pass data to a callback function through run() through the locals
+ return True to stop the current run process. Otherwise run() continues
+ until the next event. A callback may also return a string which will be
+ sent to the child. 'extra_args' is not used by directly run(). It provides
+ a way to pass data to a callback function through run() through the locals
dictionary passed to a callback.
'''
return _run(command, timeout=timeout, withexitstatus=withexitstatus,
events=events, extra_args=extra_args, logfile=logfile, cwd=cwd,
env=env, _spawn=spawn)
-def runu(command, timeout=-1, withexitstatus=False, events=None,
+def runu(command, timeout=30, withexitstatus=False, events=None,
extra_args=None, logfile=None, cwd=None, env=None, **kwargs):
"""This offers the same interface as :func:`run`, but using unicode.
@@ -235,7 +183,10 @@ def _run(command, timeout, withexitstatus, events, extra_args, logfile, cwd,
else:
child = _spawn(command, timeout=timeout, maxread=2000, logfile=logfile,
cwd=cwd, env=env, **kwargs)
- if events is not None:
+ if isinstance(events, list):
+ patterns= [x for x,y in events]
+ responses = [y for x,y in events]
+ elif isinstance(events, dict):
patterns = list(events.keys())
responses = list(events.values())
else:
@@ -278,1830 +229,4 @@ def _run(command, timeout, withexitstatus, events, extra_args, logfile, cwd,
else:
return child_result
-class spawn(object):
- '''This is the main class interface for Pexpect. Use this class to start
- and control child applications. '''
- string_type = bytes
- if PY3:
- allowed_string_types = (bytes, str)
- @staticmethod
- def _chr(c):
- return bytes([c])
- linesep = os.linesep.encode('ascii')
- crlf = '\r\n'.encode('ascii')
-
- @staticmethod
- def write_to_stdout(b):
- try:
- return sys.stdout.buffer.write(b)
- except AttributeError:
- # If stdout has been replaced, it may not have .buffer
- return sys.stdout.write(b.decode('ascii', 'replace'))
- else:
- allowed_string_types = (basestring,) # analysis:ignore
- _chr = staticmethod(chr)
- linesep = os.linesep
- crlf = '\r\n'
- write_to_stdout = sys.stdout.write
-
- encoding = None
-
- def __init__(self, command, args=[], timeout=30, maxread=2000,
- searchwindowsize=None, logfile=None, cwd=None, env=None,
- ignore_sighup=True, echo=True):
-
- '''This is the constructor. The command parameter may be a string that
- includes a command and any arguments to the command. For example::
-
- child = pexpect.spawn('/usr/bin/ftp')
- child = pexpect.spawn('/usr/bin/ssh user@example.com')
- child = pexpect.spawn('ls -latr /tmp')
-
- You may also construct it with a list of arguments like so::
-
- child = pexpect.spawn('/usr/bin/ftp', [])
- child = pexpect.spawn('/usr/bin/ssh', ['user@example.com'])
- child = pexpect.spawn('ls', ['-latr', '/tmp'])
-
- After this the child application will be created and will be ready to
- talk to. For normal use, see expect() and send() and sendline().
-
- Remember that Pexpect does NOT interpret shell meta characters such as
- redirect, pipe, or wild cards (``>``, ``|``, or ``*``). This is a
- common mistake. If you want to run a command and pipe it through
- another command then you must also start a shell. For example::
-
- child = pexpect.spawn('/bin/bash -c "ls -l | grep LOG > logs.txt"')
- child.expect(pexpect.EOF)
-
- The second form of spawn (where you pass a list of arguments) is useful
- in situations where you wish to spawn a command and pass it its own
- argument list. This can make syntax more clear. For example, the
- following is equivalent to the previous example::
-
- shell_cmd = 'ls -l | grep LOG > logs.txt'
- child = pexpect.spawn('/bin/bash', ['-c', shell_cmd])
- child.expect(pexpect.EOF)
-
- The maxread attribute sets the read buffer size. This is maximum number
- of bytes that Pexpect will try to read from a TTY at one time. Setting
- the maxread size to 1 will turn off buffering. Setting the maxread
- value higher may help performance in cases where large amounts of
- output are read back from the child. This feature is useful in
- conjunction with searchwindowsize.
-
- The searchwindowsize attribute sets the how far back in the incoming
- seach buffer Pexpect will search for pattern matches. Every time
- Pexpect reads some data from the child it will append the data to the
- incoming buffer. The default is to search from the beginning of the
- incoming buffer each time new data is read from the child. But this is
- very inefficient if you are running a command that generates a large
- amount of data where you want to match. The searchwindowsize does not
- affect the size of the incoming data buffer. You will still have
- access to the full buffer after expect() returns.
-
- The logfile member turns on or off logging. All input and output will
- be copied to the given file object. Set logfile to None to stop
- logging. This is the default. Set logfile to sys.stdout to echo
- everything to standard output. The logfile is flushed after each write.
-
- Example log input and output to a file::
-
- child = pexpect.spawn('some_command')
- fout = open('mylog.txt','wb')
- child.logfile = fout
-
- Example log to stdout::
-
- # In Python 2:
- child = pexpect.spawn('some_command')
- child.logfile = sys.stdout
-
- # In Python 3, spawnu should be used to give str to stdout:
- child = pexpect.spawnu('some_command')
- child.logfile = sys.stdout
-
- The logfile_read and logfile_send members can be used to separately log
- the input from the child and output sent to the child. Sometimes you
- don't want to see everything you write to the child. You only want to
- log what the child sends back. For example::
-
- child = pexpect.spawn('some_command')
- child.logfile_read = sys.stdout
-
- Remember to use spawnu instead of spawn for the above code if you are
- using Python 3.
-
- To separately log output sent to the child use logfile_send::
-
- child.logfile_send = fout
-
- If ``ignore_sighup`` is True, the child process will ignore SIGHUP
- signals. For now, the default is True, to preserve the behaviour of
- earlier versions of Pexpect, but you should pass this explicitly if you
- want to rely on it.
-
- The delaybeforesend helps overcome a weird behavior that many users
- were experiencing. The typical problem was that a user would expect() a
- "Password:" prompt and then immediately call sendline() to send the
- password. The user would then see that their password was echoed back
- to them. Passwords don't normally echo. The problem is caused by the
- fact that most applications print out the "Password" prompt and then
- turn off stdin echo, but if you send your password before the
- application turned off echo, then you get your password echoed.
- Normally this wouldn't be a problem when interacting with a human at a
- real keyboard. If you introduce a slight delay just before writing then
- this seems to clear up the problem. This was such a common problem for
- many users that I decided that the default pexpect behavior should be
- to sleep just before writing to the child application. 1/20th of a
- second (50 ms) seems to be enough to clear up the problem. You can set
- delaybeforesend to 0 to return to the old behavior. Most Linux machines
- don't like this to be below 0.03. I don't know why.
-
- Note that spawn is clever about finding commands on your path.
- It uses the same logic that "which" uses to find executables.
-
- If you wish to get the exit status of the child you must call the
- close() method. The exit or signal status of the child will be stored
- in self.exitstatus or self.signalstatus. If the child exited normally
- then exitstatus will store the exit return code and signalstatus will
- be None. If the child was terminated abnormally with a signal then
- signalstatus will store the signal value and exitstatus will be None.
- If you need more detail you can also read the self.status member which
- stores the status returned by os.waitpid. You can interpret this using
- os.WIFEXITED/os.WEXITSTATUS or os.WIFSIGNALED/os.TERMSIG.
-
- The echo attribute may be set to False to disable echoing of input.
- As a pseudo-terminal, all input echoed by the "keyboard" (send()
- or sendline()) will be repeated to output. For many cases, it is
- not desirable to have echo enabled, and it may be later disabled
- using setecho(False) followed by waitnoecho(). However, for some
- platforms such as Solaris, this is not possible, and should be
- disabled immediately on spawn.
- '''
-
- self.STDIN_FILENO = pty.STDIN_FILENO
- self.STDOUT_FILENO = pty.STDOUT_FILENO
- self.STDERR_FILENO = pty.STDERR_FILENO
- self.stdin = sys.stdin
- self.stdout = sys.stdout
- self.stderr = sys.stderr
-
- self.searcher = None
- self.ignorecase = False
- self.before = None
- self.after = None
- self.match = None
- self.match_index = None
- self.terminated = True
- self.exitstatus = None
- self.signalstatus = None
- # status returned by os.waitpid
- self.status = None
- self.flag_eof = False
- self.pid = None
- # the child file descriptor is initially closed
- self.child_fd = -1
- self.timeout = timeout
- self.delimiter = EOF
- self.logfile = logfile
- # input from child (read_nonblocking)
- self.logfile_read = None
- # output to send (send, sendline)
- self.logfile_send = None
- # max bytes to read at one time into buffer
- self.maxread = maxread
- # This is the read buffer. See maxread.
- self.buffer = self.string_type()
- # Data before searchwindowsize point is preserved, but not searched.
- self.searchwindowsize = searchwindowsize
- # Delay used before sending data to child. Time in seconds.
- # Most Linux machines don't like this to be below 0.03 (30 ms).
- self.delaybeforesend = 0.05
- # Used by close() to give kernel time to update process status.
- # Time in seconds.
- self.delayafterclose = 0.1
- # Used by terminate() to give kernel time to update process status.
- # Time in seconds.
- self.delayafterterminate = 0.1
- self.softspace = False
- self.name = '<' + repr(self) + '>'
- self.closed = True
- self.cwd = cwd
- self.env = env
- self.echo = echo
- self.ignore_sighup = ignore_sighup
- _platform = sys.platform.lower()
- # This flags if we are running on irix
- self.__irix_hack = _platform.startswith('irix')
- # Solaris uses internal __fork_pty(). All others use pty.fork().
- self.use_native_pty_fork = not (
- _platform.startswith('solaris') or
- _platform.startswith('sunos'))
- # inherit EOF and INTR definitions from controlling process.
- try:
- from termios import VEOF, VINTR
- try:
- fd = sys.__stdin__.fileno()
- except ValueError:
- # ValueError: I/O operation on closed file
- fd = sys.__stdout__.fileno()
- self._INTR = ord(termios.tcgetattr(fd)[6][VINTR])
- self._EOF = ord(termios.tcgetattr(fd)[6][VEOF])
- except (ImportError, OSError, IOError, ValueError, termios.error):
- # unless the controlling process is also not a terminal,
- # such as cron(1), or when stdin and stdout are both closed.
- # Fall-back to using CEOF and CINTR. There
- try:
- from termios import CEOF, CINTR
- (self._INTR, self._EOF) = (CINTR, CEOF)
- except ImportError:
- # ^C, ^D
- (self._INTR, self._EOF) = (3, 4)
- # Support subclasses that do not use command or args.
- if command is None:
- self.command = None
- self.args = None
- self.name = '<pexpect factory incomplete>'
- else:
- self._spawn(command, args)
-
- @staticmethod
- def _coerce_expect_string(s):
- if not isinstance(s, bytes):
- return s.encode('ascii')
- return s
-
- @staticmethod
- def _coerce_send_string(s):
- if not isinstance(s, bytes):
- return s.encode('utf-8')
- return s
-
- @staticmethod
- def _coerce_read_string(s):
- return s
-
- def __del__(self):
- '''This makes sure that no system resources are left open. Python only
- garbage collects Python objects. OS file descriptors are not Python
- objects, so they must be handled explicitly. If the child file
- descriptor was opened outside of this class (passed to the constructor)
- then this does not close it. '''
-
- if not self.closed:
- # It is possible for __del__ methods to execute during the
- # teardown of the Python VM itself. Thus self.close() may
- # trigger an exception because os.close may be None.
- try:
- self.close()
- # which exception, shouldnt' we catch explicitly .. ?
- except:
- pass
-
- def __str__(self):
- '''This returns a human-readable string that represents the state of
- the object. '''
-
- s = []
- s.append(repr(self))
- s.append('version: ' + __version__)
- s.append('command: ' + str(self.command))
- s.append('args: %r' % (self.args,))
- s.append('searcher: %r' % (self.searcher,))
- s.append('buffer (last 100 chars): %r' % (self.buffer)[-100:],)
- s.append('before (last 100 chars): %r' % (self.before)[-100:],)
- s.append('after: %r' % (self.after,))
- s.append('match: %r' % (self.match,))
- s.append('match_index: ' + str(self.match_index))
- s.append('exitstatus: ' + str(self.exitstatus))
- s.append('flag_eof: ' + str(self.flag_eof))
- s.append('pid: ' + str(self.pid))
- s.append('child_fd: ' + str(self.child_fd))
- s.append('closed: ' + str(self.closed))
- s.append('timeout: ' + str(self.timeout))
- s.append('delimiter: ' + str(self.delimiter))
- s.append('logfile: ' + str(self.logfile))
- s.append('logfile_read: ' + str(self.logfile_read))
- s.append('logfile_send: ' + str(self.logfile_send))
- s.append('maxread: ' + str(self.maxread))
- s.append('ignorecase: ' + str(self.ignorecase))
- s.append('searchwindowsize: ' + str(self.searchwindowsize))
- s.append('delaybeforesend: ' + str(self.delaybeforesend))
- s.append('delayafterclose: ' + str(self.delayafterclose))
- s.append('delayafterterminate: ' + str(self.delayafterterminate))
- return '\n'.join(s)
-
- def _spawn(self, command, args=[]):
- '''This starts the given command in a child process. This does all the
- fork/exec type of stuff for a pty. This is called by __init__. If args
- is empty then command will be parsed (split on spaces) and args will be
- set to parsed arguments. '''
-
- # The pid and child_fd of this object get set by this method.
- # Note that it is difficult for this method to fail.
- # You cannot detect if the child process cannot start.
- # So the only way you can tell if the child process started
- # or not is to try to read from the file descriptor. If you get
- # EOF immediately then it means that the child is already dead.
- # That may not necessarily be bad because you may have spawned a child
- # that performs some task; creates no stdout output; and then dies.
-
- # If command is an int type then it may represent a file descriptor.
- if isinstance(command, type(0)):
- raise ExceptionPexpect('Command is an int type. ' +
- 'If this is a file descriptor then maybe you want to ' +
- 'use fdpexpect.fdspawn which takes an existing ' +
- 'file descriptor instead of a command string.')
-
- if not isinstance(args, type([])):
- raise TypeError('The argument, args, must be a list.')
-
- if args == []:
- self.args = split_command_line(command)
- self.command = self.args[0]
- else:
- # Make a shallow copy of the args list.
- self.args = args[:]
- self.args.insert(0, command)
- self.command = command
-
- command_with_path = which(self.command)
- if command_with_path is None:
- raise ExceptionPexpect('The command was not found or was not ' +
- 'executable: %s.' % self.command)
- self.command = command_with_path
- self.args[0] = self.command
-
- self.name = '<' + ' '.join(self.args) + '>'
-
- assert self.pid is None, 'The pid member must be None.'
- assert self.command is not None, 'The command member must not be None.'
-
- if self.use_native_pty_fork:
- try:
- self.pid, self.child_fd = pty.fork()
- except OSError: # pragma: no cover
- err = sys.exc_info()[1]
- raise ExceptionPexpect('pty.fork() failed: ' + str(err))
- else:
- # Use internal __fork_pty
- self.pid, self.child_fd = self.__fork_pty()
-
- # Some platforms must call setwinsize() and setecho() from the
- # child process, and others from the master process. We do both,
- # allowing IOError for either.
-
- if self.pid == pty.CHILD:
- # Child
- self.child_fd = self.STDIN_FILENO
-
- # set default window size of 24 rows by 80 columns
- try:
- self.setwinsize(24, 80)
- except IOError as err:
- if err.args[0] not in (errno.EINVAL, errno.ENOTTY):
- raise
-
- # disable echo if spawn argument echo was unset
- if not self.echo:
- try:
- self.setecho(self.echo)
- except (IOError, termios.error) as err:
- if err.args[0] not in (errno.EINVAL, errno.ENOTTY):
- raise
-
- # Do not allow child to inherit open file descriptors from parent.
- max_fd = resource.getrlimit(resource.RLIMIT_NOFILE)[0]
- os.closerange(3, max_fd)
-
- if self.ignore_sighup:
- signal.signal(signal.SIGHUP, signal.SIG_IGN)
-
- if self.cwd is not None:
- os.chdir(self.cwd)
- if self.env is None:
- os.execv(self.command, self.args)
- else:
- os.execvpe(self.command, self.args, self.env)
-
- # Parent
- try:
- self.setwinsize(24, 80)
- except IOError as err:
- if err.args[0] not in (errno.EINVAL, errno.ENOTTY):
- raise
-
-
- self.terminated = False
- self.closed = False
-
- def __fork_pty(self):
- '''This implements a substitute for the forkpty system call. This
- should be more portable than the pty.fork() function. Specifically,
- this should work on Solaris.
-
- Modified 10.06.05 by Geoff Marshall: Implemented __fork_pty() method to
- resolve the issue with Python's pty.fork() not supporting Solaris,
- particularly ssh. Based on patch to posixmodule.c authored by Noah
- Spurrier::
-
- http://mail.python.org/pipermail/python-dev/2003-May/035281.html
-
- '''
-
- parent_fd, child_fd = os.openpty()
- if parent_fd < 0 or child_fd < 0:
- raise ExceptionPexpect("Could not open with os.openpty().")
-
- pid = os.fork()
- if pid == pty.CHILD:
- # Child.
- os.close(parent_fd)
- self.__pty_make_controlling_tty(child_fd)
-
- os.dup2(child_fd, self.STDIN_FILENO)
- os.dup2(child_fd, self.STDOUT_FILENO)
- os.dup2(child_fd, self.STDERR_FILENO)
-
- else:
- # Parent.
- os.close(child_fd)
-
- return pid, parent_fd
-
- def __pty_make_controlling_tty(self, tty_fd):
- '''This makes the pseudo-terminal the controlling tty. This should be
- more portable than the pty.fork() function. Specifically, this should
- work on Solaris. '''
-
- child_name = os.ttyname(tty_fd)
-
- # Disconnect from controlling tty, if any. Raises OSError of ENXIO
- # if there was no controlling tty to begin with, such as when
- # executed by a cron(1) job.
- try:
- fd = os.open("/dev/tty", os.O_RDWR | os.O_NOCTTY)
- os.close(fd)
- except OSError as err:
- if err.errno != errno.ENXIO:
- raise
-
- os.setsid()
-
- # Verify we are disconnected from controlling tty by attempting to open
- # it again. We expect that OSError of ENXIO should always be raised.
- try:
- fd = os.open("/dev/tty", os.O_RDWR | os.O_NOCTTY)
- os.close(fd)
- raise ExceptionPexpect("OSError of errno.ENXIO should be raised.")
- except OSError as err:
- if err.errno != errno.ENXIO:
- raise
-
- # Verify we can open child pty.
- fd = os.open(child_name, os.O_RDWR)
- os.close(fd)
-
- # Verify we now have a controlling tty.
- fd = os.open("/dev/tty", os.O_WRONLY)
- os.close(fd)
-
-
- def fileno(self):
- '''This returns the file descriptor of the pty for the child.
- '''
- return self.child_fd
-
- def close(self, force=True):
- '''This closes the connection with the child application. Note that
- calling close() more than once is valid. This emulates standard Python
- behavior with files. Set force to True if you want to make sure that
- the child is terminated (SIGKILL is sent if the child ignores SIGHUP
- and SIGINT). '''
-
- if not self.closed:
- self.flush()
- os.close(self.child_fd)
- # Give kernel time to update process status.
- time.sleep(self.delayafterclose)
- if self.isalive():
- if not self.terminate(force):
- raise ExceptionPexpect('Could not terminate the child.')
- self.child_fd = -1
- self.closed = True
- #self.pid = None
-
- def flush(self):
- '''This does nothing. It is here to support the interface for a
- File-like object. '''
-
- pass
-
- def isatty(self):
- '''This returns True if the file descriptor is open and connected to a
- tty(-like) device, else False.
-
- On SVR4-style platforms implementing streams, such as SunOS and HP-UX,
- the child pty may not appear as a terminal device. This means
- methods such as setecho(), setwinsize(), getwinsize() may raise an
- IOError. '''
-
- return os.isatty(self.child_fd)
-
- def waitnoecho(self, timeout=-1):
- '''This waits until the terminal ECHO flag is set False. This returns
- True if the echo mode is off. This returns False if the ECHO flag was
- not set False before the timeout. This can be used to detect when the
- child is waiting for a password. Usually a child application will turn
- off echo mode when it is waiting for the user to enter a password. For
- example, instead of expecting the "password:" prompt you can wait for
- the child to set ECHO off::
-
- p = pexpect.spawn('ssh user@example.com')
- p.waitnoecho()
- p.sendline(mypassword)
-
- If timeout==-1 then this method will use the value in self.timeout.
- If timeout==None then this method to block until ECHO flag is False.
- '''
-
- if timeout == -1:
- timeout = self.timeout
- if timeout is not None:
- end_time = time.time() + timeout
- while True:
- if not self.getecho():
- return True
- if timeout < 0 and timeout is not None:
- return False
- if timeout is not None:
- timeout = end_time - time.time()
- time.sleep(0.1)
-
- def getecho(self):
- '''This returns the terminal echo mode. This returns True if echo is
- on or False if echo is off. Child applications that are expecting you
- to enter a password often set ECHO False. See waitnoecho().
-
- Not supported on platforms where ``isatty()`` returns False. '''
-
- try:
- attr = termios.tcgetattr(self.child_fd)
- except termios.error as err:
- errmsg = 'getecho() may not be called on this platform'
- if err.args[0] == errno.EINVAL:
- raise IOError(err.args[0], '%s: %s.' % (err.args[1], errmsg))
- raise
-
- self.echo = bool(attr[3] & termios.ECHO)
- return self.echo
-
- def setecho(self, state):
- '''This sets the terminal echo mode on or off. Note that anything the
- child sent before the echo will be lost, so you should be sure that
- your input buffer is empty before you call setecho(). For example, the
- following will work as expected::
-
- p = pexpect.spawn('cat') # Echo is on by default.
- p.sendline('1234') # We expect see this twice from the child...
- p.expect(['1234']) # ... once from the tty echo...
- p.expect(['1234']) # ... and again from cat itself.
- p.setecho(False) # Turn off tty echo
- p.sendline('abcd') # We will set this only once (echoed by cat).
- p.sendline('wxyz') # We will set this only once (echoed by cat)
- p.expect(['abcd'])
- p.expect(['wxyz'])
-
- The following WILL NOT WORK because the lines sent before the setecho
- will be lost::
-
- p = pexpect.spawn('cat')
- p.sendline('1234')
- p.setecho(False) # Turn off tty echo
- p.sendline('abcd') # We will set this only once (echoed by cat).
- p.sendline('wxyz') # We will set this only once (echoed by cat)
- p.expect(['1234'])
- p.expect(['1234'])
- p.expect(['abcd'])
- p.expect(['wxyz'])
-
-
- Not supported on platforms where ``isatty()`` returns False.
- '''
-
- errmsg = 'setecho() may not be called on this platform'
-
- try:
- attr = termios.tcgetattr(self.child_fd)
- except termios.error as err:
- if err.args[0] == errno.EINVAL:
- raise IOError(err.args[0], '%s: %s.' % (err.args[1], errmsg))
- raise
-
- if state:
- attr[3] = attr[3] | termios.ECHO
- else:
- attr[3] = attr[3] & ~termios.ECHO
-
- try:
- # I tried TCSADRAIN and TCSAFLUSH, but these were inconsistent and
- # blocked on some platforms. TCSADRAIN would probably be ideal.
- termios.tcsetattr(self.child_fd, termios.TCSANOW, attr)
- except IOError as err:
- if err.args[0] == errno.EINVAL:
- raise IOError(err.args[0], '%s: %s.' % (err.args[1], errmsg))
- raise
-
- self.echo = state
-
- def _log(self, s, direction):
- if self.logfile is not None:
- self.logfile.write(s)
- self.logfile.flush()
- second_log = self.logfile_send if (direction=='send') else self.logfile_read
- if second_log is not None:
- second_log.write(s)
- second_log.flush()
-
- def read_nonblocking(self, size=1, timeout=-1):
- '''This reads at most size characters from the child application. It
- includes a timeout. If the read does not complete within the timeout
- period then a TIMEOUT exception is raised. If the end of file is read
- then an EOF exception will be raised. If a log file was set using
- setlog() then all data will also be written to the log file.
-
- If timeout is None then the read may block indefinitely.
- If timeout is -1 then the self.timeout value is used. If timeout is 0
- then the child is polled and if there is no data immediately ready
- then this will raise a TIMEOUT exception.
-
- The timeout refers only to the amount of time to read at least one
- character. This is not effected by the 'size' parameter, so if you call
- read_nonblocking(size=100, timeout=30) and only one character is
- available right away then one character will be returned immediately.
- It will not wait for 30 seconds for another 99 characters to come in.
-
- This is a wrapper around os.read(). It uses select.select() to
- implement the timeout. '''
-
- if self.closed:
- raise ValueError('I/O operation on closed file.')
-
- if timeout == -1:
- timeout = self.timeout
-
- # Note that some systems such as Solaris do not give an EOF when
- # the child dies. In fact, you can still try to read
- # from the child_fd -- it will block forever or until TIMEOUT.
- # For this case, I test isalive() before doing any reading.
- # If isalive() is false, then I pretend that this is the same as EOF.
- if not self.isalive():
- # timeout of 0 means "poll"
- r, w, e = self.__select([self.child_fd], [], [], 0)
- if not r:
- self.flag_eof = True
- raise EOF('End Of File (EOF). Braindead platform.')
- elif self.__irix_hack:
- # Irix takes a long time before it realizes a child was terminated.
- # FIXME So does this mean Irix systems are forced to always have
- # FIXME a 2 second delay when calling read_nonblocking? That sucks.
- r, w, e = self.__select([self.child_fd], [], [], 2)
- if not r and not self.isalive():
- self.flag_eof = True
- raise EOF('End Of File (EOF). Slow platform.')
-
- r, w, e = self.__select([self.child_fd], [], [], timeout)
-
- if not r:
- if not self.isalive():
- # Some platforms, such as Irix, will claim that their
- # processes are alive; timeout on the select; and
- # then finally admit that they are not alive.
- self.flag_eof = True
- raise EOF('End of File (EOF). Very slow platform.')
- else:
- raise TIMEOUT('Timeout exceeded.')
-
- if self.child_fd in r:
- try:
- s = os.read(self.child_fd, size)
- except OSError as err:
- if err.args[0] == errno.EIO:
- # Linux-style EOF
- self.flag_eof = True
- raise EOF('End Of File (EOF). Exception style platform.')
- raise
- if s == b'':
- # BSD-style EOF
- self.flag_eof = True
- raise EOF('End Of File (EOF). Empty string style platform.')
-
- s = self._coerce_read_string(s)
- self._log(s, 'read')
- return s
-
- raise ExceptionPexpect('Reached an unexpected state.') # pragma: no cover
-
- def read(self, size=-1):
- '''This reads at most "size" bytes from the file (less if the read hits
- EOF before obtaining size bytes). If the size argument is negative or
- omitted, read all data until EOF is reached. The bytes are returned as
- a string object. An empty string is returned when EOF is encountered
- immediately. '''
-
- if size == 0:
- return self.string_type()
- if size < 0:
- # delimiter default is EOF
- self.expect(self.delimiter)
- return self.before
-
- # I could have done this more directly by not using expect(), but
- # I deliberately decided to couple read() to expect() so that
- # I would catch any bugs early and ensure consistant behavior.
- # It's a little less efficient, but there is less for me to
- # worry about if I have to later modify read() or expect().
- # Note, it's OK if size==-1 in the regex. That just means it
- # will never match anything in which case we stop only on EOF.
- cre = re.compile(self._coerce_expect_string('.{%d}' % size), re.DOTALL)
- # delimiter default is EOF
- index = self.expect([cre, self.delimiter])
- if index == 0:
- ### FIXME self.before should be ''. Should I assert this?
- return self.after
- return self.before
-
- def readline(self, size=-1):
- '''This reads and returns one entire line. The newline at the end of
- line is returned as part of the string, unless the file ends without a
- newline. An empty string is returned if EOF is encountered immediately.
- This looks for a newline as a CR/LF pair (\\r\\n) even on UNIX because
- this is what the pseudotty device returns. So contrary to what you may
- expect you will receive newlines as \\r\\n.
-
- If the size argument is 0 then an empty string is returned. In all
- other cases the size argument is ignored, which is not standard
- behavior for a file-like object. '''
-
- if size == 0:
- return self.string_type()
- # delimiter default is EOF
- index = self.expect([self.crlf, self.delimiter])
- if index == 0:
- return self.before + self.crlf
- else:
- return self.before
-
- def __iter__(self):
- '''This is to support iterators over a file-like object.
- '''
- return iter(self.readline, self.string_type())
-
- def readlines(self, sizehint=-1):
- '''This reads until EOF using readline() and returns a list containing
- the lines thus read. The optional 'sizehint' argument is ignored.
- Remember, because this reads until EOF that means the child
- process should have closed its stdout. If you run this method on
- a child that is still running with its stdout open then this
- method will block until it timesout.'''
-
- lines = []
- while True:
- line = self.readline()
- if not line:
- break
- lines.append(line)
- return lines
-
- def write(self, s):
- '''This is similar to send() except that there is no return value.
- '''
-
- self.send(s)
-
- def writelines(self, sequence):
- '''This calls write() for each element in the sequence. The sequence
- can be any iterable object producing strings, typically a list of
- strings. This does not add line separators. There is no return value.
- '''
-
- for s in sequence:
- self.write(s)
-
- def send(self, s):
- '''Sends string ``s`` to the child process, returning the number of
- bytes written. If a logfile is specified, a copy is written to that
- log. '''
-
- time.sleep(self.delaybeforesend)
-
- s = self._coerce_send_string(s)
- self._log(s, 'send')
-
- return self._send(s)
-
- def _send(self, s):
- return os.write(self.child_fd, s)
-
- def sendline(self, s=''):
- '''Wraps send(), sending string ``s`` to child process, with os.linesep
- automatically appended. Returns number of bytes written. '''
-
- n = self.send(s)
- n = n + self.send(self.linesep)
- return n
-
- def sendcontrol(self, char):
-
- '''Helper method that wraps send() with mnemonic access for sending control
- character to the child (such as Ctrl-C or Ctrl-D). For example, to send
- Ctrl-G (ASCII 7, bell, '\a')::
-
- child.sendcontrol('g')
-
- See also, sendintr() and sendeof().
- '''
-
- char = char.lower()
- a = ord(char)
- if a >= 97 and a <= 122:
- a = a - ord('a') + 1
- return self.send(self._chr(a))
- d = {'@': 0, '`': 0,
- '[': 27, '{': 27,
- '\\': 28, '|': 28,
- ']': 29, '}': 29,
- '^': 30, '~': 30,
- '_': 31,
- '?': 127}
- if char not in d:
- return 0
- return self.send(self._chr(d[char]))
-
- def sendeof(self):
-
- '''This sends an EOF to the child. This sends a character which causes
- the pending parent output buffer to be sent to the waiting child
- program without waiting for end-of-line. If it is the first character
- of the line, the read() in the user program returns 0, which signifies
- end-of-file. This means to work as expected a sendeof() has to be
- called at the beginning of a line. This method does not send a newline.
- It is the responsibility of the caller to ensure the eof is sent at the
- beginning of a line. '''
-
- self.send(self._chr(self._EOF))
-
- def sendintr(self):
-
- '''This sends a SIGINT to the child. It does not require
- the SIGINT to be the first character on a line. '''
-
- self.send(self._chr(self._INTR))
-
- def eof(self):
-
- '''This returns True if the EOF exception was ever raised.
- '''
-
- return self.flag_eof
-
- def terminate(self, force=False):
-
- '''This forces a child process to terminate. It starts nicely with
- SIGHUP and SIGINT. If "force" is True then moves onto SIGKILL. This
- returns True if the child was terminated. This returns False if the
- child could not be terminated. '''
-
- if not self.isalive():
- return True
- try:
- self.kill(signal.SIGHUP)
- time.sleep(self.delayafterterminate)
- if not self.isalive():
- return True
- self.kill(signal.SIGCONT)
- time.sleep(self.delayafterterminate)
- if not self.isalive():
- return True
- self.kill(signal.SIGINT)
- time.sleep(self.delayafterterminate)
- if not self.isalive():
- return True
- if force:
- self.kill(signal.SIGKILL)
- time.sleep(self.delayafterterminate)
- if not self.isalive():
- return True
- else:
- return False
- return False
- except OSError:
- # I think there are kernel timing issues that sometimes cause
- # this to happen. I think isalive() reports True, but the
- # process is dead to the kernel.
- # Make one last attempt to see if the kernel is up to date.
- time.sleep(self.delayafterterminate)
- if not self.isalive():
- return True
- else:
- return False
-
- def wait(self):
-
- '''This waits until the child exits. This is a blocking call. This will
- not read any data from the child, so this will block forever if the
- child has unread output and has terminated. In other words, the child
- may have printed output then called exit(), but, the child is
- technically still alive until its output is read by the parent. '''
-
- if self.isalive():
- pid, status = os.waitpid(self.pid, 0)
- else:
- raise ExceptionPexpect('Cannot wait for dead child process.')
- self.exitstatus = os.WEXITSTATUS(status)
- if os.WIFEXITED(status):
- self.status = status
- self.exitstatus = os.WEXITSTATUS(status)
- self.signalstatus = None
- self.terminated = True
- elif os.WIFSIGNALED(status):
- self.status = status
- self.exitstatus = None
- self.signalstatus = os.WTERMSIG(status)
- self.terminated = True
- elif os.WIFSTOPPED(status): # pragma: no cover
- # You can't call wait() on a child process in the stopped state.
- raise ExceptionPexpect('Called wait() on a stopped child ' +
- 'process. This is not supported. Is some other ' +
- 'process attempting job control with our child pid?')
- return self.exitstatus
-
- def isalive(self):
-
- '''This tests if the child process is running or not. This is
- non-blocking. If the child was terminated then this will read the
- exitstatus or signalstatus of the child. This returns True if the child
- process appears to be running or False if not. It can take literally
- SECONDS for Solaris to return the right status. '''
-
- if self.terminated:
- return False
-
- if self.flag_eof:
- # This is for Linux, which requires the blocking form
- # of waitpid to get the status of a defunct process.
- # This is super-lame. The flag_eof would have been set
- # in read_nonblocking(), so this should be safe.
- waitpid_options = 0
- else:
- waitpid_options = os.WNOHANG
-
- try:
- pid, status = os.waitpid(self.pid, waitpid_options)
- except OSError:
- err = sys.exc_info()[1]
- # No child processes
- if err.errno == errno.ECHILD:
- raise ExceptionPexpect('isalive() encountered condition ' +
- 'where "terminated" is 0, but there was no child ' +
- 'process. Did someone else call waitpid() ' +
- 'on our process?')
- else:
- raise err
-
- # I have to do this twice for Solaris.
- # I can't even believe that I figured this out...
- # If waitpid() returns 0 it means that no child process
- # wishes to report, and the value of status is undefined.
- if pid == 0:
- try:
- ### os.WNOHANG) # Solaris!
- pid, status = os.waitpid(self.pid, waitpid_options)
- except OSError as e: # pragma: no cover
- # This should never happen...
- if e.errno == errno.ECHILD:
- raise ExceptionPexpect('isalive() encountered condition ' +
- 'that should never happen. There was no child ' +
- 'process. Did someone else call waitpid() ' +
- 'on our process?')
- else:
- raise
-
- # If pid is still 0 after two calls to waitpid() then the process
- # really is alive. This seems to work on all platforms, except for
- # Irix which seems to require a blocking call on waitpid or select,
- # so I let read_nonblocking take care of this situation
- # (unfortunately, this requires waiting through the timeout).
- if pid == 0:
- return True
-
- if pid == 0:
- return True
-
- if os.WIFEXITED(status):
- self.status = status
- self.exitstatus = os.WEXITSTATUS(status)
- self.signalstatus = None
- self.terminated = True
- elif os.WIFSIGNALED(status):
- self.status = status
- self.exitstatus = None
- self.signalstatus = os.WTERMSIG(status)
- self.terminated = True
- elif os.WIFSTOPPED(status):
- raise ExceptionPexpect('isalive() encountered condition ' +
- 'where child process is stopped. This is not ' +
- 'supported. Is some other process attempting ' +
- 'job control with our child pid?')
- return False
-
- def kill(self, sig):
-
- '''This sends the given signal to the child application. In keeping
- with UNIX tradition it has a misleading name. It does not necessarily
- kill the child unless you send the right signal. '''
-
- # Same as os.kill, but the pid is given for you.
- if self.isalive():
- os.kill(self.pid, sig)
-
- def _pattern_type_err(self, pattern):
- raise TypeError('got {badtype} ({badobj!r}) as pattern, must be one'
- ' of: {goodtypes}, pexpect.EOF, pexpect.TIMEOUT'\
- .format(badtype=type(pattern),
- badobj=pattern,
- goodtypes=', '.join([str(ast)\
- for ast in self.allowed_string_types])
- )
- )
-
- def compile_pattern_list(self, patterns):
-
- '''This compiles a pattern-string or a list of pattern-strings.
- Patterns must be a StringType, EOF, TIMEOUT, SRE_Pattern, or a list of
- those. Patterns may also be None which results in an empty list (you
- might do this if waiting for an EOF or TIMEOUT condition without
- expecting any pattern).
-
- This is used by expect() when calling expect_list(). Thus expect() is
- nothing more than::
-
- cpl = self.compile_pattern_list(pl)
- return self.expect_list(cpl, timeout)
-
- If you are using expect() within a loop it may be more
- efficient to compile the patterns first and then call expect_list().
- This avoid calls in a loop to compile_pattern_list()::
-
- cpl = self.compile_pattern_list(my_pattern)
- while some_condition:
- ...
- i = self.expect_list(clp, timeout)
- ...
- '''
-
- if patterns is None:
- return []
- if not isinstance(patterns, list):
- patterns = [patterns]
-
- # Allow dot to match \n
- compile_flags = re.DOTALL
- if self.ignorecase:
- compile_flags = compile_flags | re.IGNORECASE
- compiled_pattern_list = []
- for idx, p in enumerate(patterns):
- if isinstance(p, self.allowed_string_types):
- p = self._coerce_expect_string(p)
- compiled_pattern_list.append(re.compile(p, compile_flags))
- elif p is EOF:
- compiled_pattern_list.append(EOF)
- elif p is TIMEOUT:
- compiled_pattern_list.append(TIMEOUT)
- elif isinstance(p, type(re.compile(''))):
- compiled_pattern_list.append(p)
- else:
- self._pattern_type_err(p)
- return compiled_pattern_list
-
- def expect(self, pattern, timeout=-1, searchwindowsize=-1, async=False):
-
- '''This seeks through the stream until a pattern is matched. The
- pattern is overloaded and may take several types. The pattern can be a
- StringType, EOF, a compiled re, or a list of any of those types.
- Strings will be compiled to re types. This returns the index into the
- pattern list. If the pattern was not a list this returns index 0 on a
- successful match. This may raise exceptions for EOF or TIMEOUT. To
- avoid the EOF or TIMEOUT exceptions add EOF or TIMEOUT to the pattern
- list. That will cause expect to match an EOF or TIMEOUT condition
- instead of raising an exception.
-
- If you pass a list of patterns and more than one matches, the first
- match in the stream is chosen. If more than one pattern matches at that
- point, the leftmost in the pattern list is chosen. For example::
-
- # the input is 'foobar'
- index = p.expect(['bar', 'foo', 'foobar'])
- # returns 1('foo') even though 'foobar' is a "better" match
-
- Please note, however, that buffering can affect this behavior, since
- input arrives in unpredictable chunks. For example::
-
- # the input is 'foobar'
- index = p.expect(['foobar', 'foo'])
- # returns 0('foobar') if all input is available at once,
- # but returs 1('foo') if parts of the final 'bar' arrive late
-
- After a match is found the instance attributes 'before', 'after' and
- 'match' will be set. You can see all the data read before the match in
- 'before'. You can see the data that was matched in 'after'. The
- re.MatchObject used in the re match will be in 'match'. If an error
- occurred then 'before' will be set to all the data read so far and
- 'after' and 'match' will be None.
-
- If timeout is -1 then timeout will be set to the self.timeout value.
-
- A list entry may be EOF or TIMEOUT instead of a string. This will
- catch these exceptions and return the index of the list entry instead
- of raising the exception. The attribute 'after' will be set to the
- exception type. The attribute 'match' will be None. This allows you to
- write code like this::
-
- index = p.expect(['good', 'bad', pexpect.EOF, pexpect.TIMEOUT])
- if index == 0:
- do_something()
- elif index == 1:
- do_something_else()
- elif index == 2:
- do_some_other_thing()
- elif index == 3:
- do_something_completely_different()
-
- instead of code like this::
-
- try:
- index = p.expect(['good', 'bad'])
- if index == 0:
- do_something()
- elif index == 1:
- do_something_else()
- except EOF:
- do_some_other_thing()
- except TIMEOUT:
- do_something_completely_different()
-
- These two forms are equivalent. It all depends on what you want. You
- can also just expect the EOF if you are waiting for all output of a
- child to finish. For example::
-
- p = pexpect.spawn('/bin/ls')
- p.expect(pexpect.EOF)
- print p.before
-
- If you are trying to optimize for speed then see expect_list().
-
- On Python 3.4, or Python 3.3 with asyncio installed, passing
- ``async=True`` will make this return an :mod:`asyncio` coroutine,
- which you can yield from to get the same result that this method would
- normally give directly. So, inside a coroutine, you can replace this code::
-
- index = p.expect(patterns)
-
- With this non-blocking form::
-
- index = yield from p.expect(patterns, async=True)
- '''
-
- compiled_pattern_list = self.compile_pattern_list(pattern)
- return self.expect_list(compiled_pattern_list,
- timeout, searchwindowsize, async)
-
- def expect_list(self, pattern_list, timeout=-1, searchwindowsize=-1,
- async=False):
- '''This takes a list of compiled regular expressions and returns the
- index into the pattern_list that matched the child output. The list may
- also contain EOF or TIMEOUT(which are not compiled regular
- expressions). This method is similar to the expect() method except that
- expect_list() does not recompile the pattern list on every call. This
- may help if you are trying to optimize for speed, otherwise just use
- the expect() method. This is called by expect(). If timeout==-1 then
- the self.timeout value is used. If searchwindowsize==-1 then the
- self.searchwindowsize value is used.
-
- Like :meth:`expect`, passing ``async=True`` will make this return an
- asyncio coroutine.
- '''
- if timeout == -1:
- timeout = self.timeout
-
- exp = Expecter(self, searcher_re(pattern_list), searchwindowsize)
- if async:
- from .async import expect_async
- return expect_async(exp, timeout)
- else:
- return exp.expect_loop(timeout)
-
- def expect_exact(self, pattern_list, timeout=-1, searchwindowsize=-1,
- async=False):
-
- '''This is similar to expect(), but uses plain string matching instead
- of compiled regular expressions in 'pattern_list'. The 'pattern_list'
- may be a string; a list or other sequence of strings; or TIMEOUT and
- EOF.
-
- This call might be faster than expect() for two reasons: string
- searching is faster than RE matching and it is possible to limit the
- search to just the end of the input buffer.
-
- This method is also useful when you don't want to have to worry about
- escaping regular expression characters that you want to match.
-
- Like :meth:`expect`, passing ``async=True`` will make this return an
- asyncio coroutine.
- '''
- if timeout == -1:
- timeout = self.timeout
-
- if (isinstance(pattern_list, self.allowed_string_types) or
- pattern_list in (TIMEOUT, EOF)):
- pattern_list = [pattern_list]
-
- def prepare_pattern(pattern):
- if pattern in (TIMEOUT, EOF):
- return pattern
- if isinstance(pattern, self.allowed_string_types):
- return self._coerce_expect_string(pattern)
- self._pattern_type_err(pattern)
-
- try:
- pattern_list = iter(pattern_list)
- except TypeError:
- self._pattern_type_err(pattern_list)
- pattern_list = [prepare_pattern(p) for p in pattern_list]
-
- exp = Expecter(self, searcher_string(pattern_list), searchwindowsize)
- if async:
- from .async import expect_async
- return expect_async(exp, timeout)
- else:
- return exp.expect_loop(timeout)
-
- def expect_loop(self, searcher, timeout=-1, searchwindowsize=-1):
- '''This is the common loop used inside expect. The 'searcher' should be
- an instance of searcher_re or searcher_string, which describes how and
- what to search for in the input.
-
- See expect() for other arguments, return value and exceptions. '''
-
- exp = Expecter(self, searcher, searchwindowsize)
- return exp.expect_loop(timeout)
-
- def getwinsize(self):
-
- '''This returns the terminal window size of the child tty. The return
- value is a tuple of (rows, cols). '''
-
- TIOCGWINSZ = getattr(termios, 'TIOCGWINSZ', 1074295912)
- s = struct.pack('HHHH', 0, 0, 0, 0)
- x = fcntl.ioctl(self.child_fd, TIOCGWINSZ, s)
- return struct.unpack('HHHH', x)[0:2]
-
- def setwinsize(self, rows, cols):
-
- '''This sets the terminal window size of the child tty. This will cause
- a SIGWINCH signal to be sent to the child. This does not change the
- physical window size. It changes the size reported to TTY-aware
- applications like vi or curses -- applications that respond to the
- SIGWINCH signal. '''
-
- # Some very old platforms have a bug that causes the value for
- # termios.TIOCSWINSZ to be truncated. There was a hack here to work
- # around this, but it caused problems with newer platforms so has been
- # removed. For details see https://github.com/pexpect/pexpect/issues/39
- TIOCSWINSZ = getattr(termios, 'TIOCSWINSZ', -2146929561)
- # Note, assume ws_xpixel and ws_ypixel are zero.
- s = struct.pack('HHHH', rows, cols, 0, 0)
- fcntl.ioctl(self.fileno(), TIOCSWINSZ, s)
-
- def interact(self, escape_character=chr(29),
- input_filter=None, output_filter=None):
-
- '''This gives control of the child process to the interactive user (the
- human at the keyboard). Keystrokes are sent to the child process, and
- the stdout and stderr output of the child process is printed. This
- simply echos the child stdout and child stderr to the real stdout and
- it echos the real stdin to the child stdin. When the user types the
- escape_character this method will stop. The default for
- escape_character is ^]. This should not be confused with ASCII 27 --
- the ESC character. ASCII 29 was chosen for historical merit because
- this is the character used by 'telnet' as the escape character. The
- escape_character will not be sent to the child process.
-
- You may pass in optional input and output filter functions. These
- functions should take a string and return a string. The output_filter
- will be passed all the output from the child process. The input_filter
- will be passed all the keyboard input from the user. The input_filter
- is run BEFORE the check for the escape_character.
-
- Note that if you change the window size of the parent the SIGWINCH
- signal will not be passed through to the child. If you want the child
- window size to change when the parent's window size changes then do
- something like the following example::
-
- import pexpect, struct, fcntl, termios, signal, sys
- def sigwinch_passthrough (sig, data):
- s = struct.pack("HHHH", 0, 0, 0, 0)
- a = struct.unpack('hhhh', fcntl.ioctl(sys.stdout.fileno(),
- termios.TIOCGWINSZ , s))
- global p
- p.setwinsize(a[0],a[1])
- # Note this 'p' global and used in sigwinch_passthrough.
- p = pexpect.spawn('/bin/bash')
- signal.signal(signal.SIGWINCH, sigwinch_passthrough)
- p.interact()
- '''
-
- # Flush the buffer.
- self.write_to_stdout(self.buffer)
- self.stdout.flush()
- self.buffer = self.string_type()
- mode = tty.tcgetattr(self.STDIN_FILENO)
- tty.setraw(self.STDIN_FILENO)
- if PY3:
- escape_character = escape_character.encode('latin-1')
- try:
- self.__interact_copy(escape_character, input_filter, output_filter)
- finally:
- tty.tcsetattr(self.STDIN_FILENO, tty.TCSAFLUSH, mode)
-
- def __interact_writen(self, fd, data):
- '''This is used by the interact() method.
- '''
-
- while data != b'' and self.isalive():
- n = os.write(fd, data)
- data = data[n:]
-
- def __interact_read(self, fd):
- '''This is used by the interact() method.
- '''
-
- return os.read(fd, 1000)
-
- def __interact_copy(self, escape_character=None,
- input_filter=None, output_filter=None):
-
- '''This is used by the interact() method.
- '''
-
- while self.isalive():
- r, w, e = self.__select([self.child_fd, self.STDIN_FILENO], [], [])
- if self.child_fd in r:
- try:
- data = self.__interact_read(self.child_fd)
- except OSError as err:
- if err.args[0] == errno.EIO:
- # Linux-style EOF
- break
- raise
- if data == b'':
- # BSD-style EOF
- break
- if output_filter:
- data = output_filter(data)
- if self.logfile is not None:
- self.logfile.write(data)
- self.logfile.flush()
- os.write(self.STDOUT_FILENO, data)
- if self.STDIN_FILENO in r:
- data = self.__interact_read(self.STDIN_FILENO)
- if input_filter:
- data = input_filter(data)
- i = data.rfind(escape_character)
- if i != -1:
- data = data[:i]
- self.__interact_writen(self.child_fd, data)
- break
- self.__interact_writen(self.child_fd, data)
-
- def __select(self, iwtd, owtd, ewtd, timeout=None):
-
- '''This is a wrapper around select.select() that ignores signals. If
- select.select raises a select.error exception and errno is an EINTR
- error then it is ignored. Mainly this is used to ignore sigwinch
- (terminal resize). '''
-
- # if select() is interrupted by a signal (errno==EINTR) then
- # we loop back and enter the select() again.
- if timeout is not None:
- end_time = time.time() + timeout
- while True:
- try:
- return select.select(iwtd, owtd, ewtd, timeout)
- except select.error:
- err = sys.exc_info()[1]
- if err.args[0] == errno.EINTR:
- # if we loop back we have to subtract the
- # amount of time we already waited.
- if timeout is not None:
- timeout = end_time - time.time()
- if timeout < 0:
- return([], [], [])
- else:
- # something else caused the select.error, so
- # this actually is an exception.
- raise
-
-##############################################################################
-# The following methods are no longer supported or allowed.
-
- def setmaxread(self, maxread): # pragma: no cover
-
- '''This method is no longer supported or allowed. I don't like getters
- and setters without a good reason. '''
-
- raise ExceptionPexpect('This method is no longer supported ' +
- 'or allowed. Just assign a value to the ' +
- 'maxread member variable.')
-
- def setlog(self, fileobject): # pragma: no cover
-
- '''This method is no longer supported or allowed.
- '''
-
- raise ExceptionPexpect('This method is no longer supported ' +
- 'or allowed. Just assign a value to the logfile ' +
- 'member variable.')
-
-##############################################################################
-# End of spawn class
-##############################################################################
-
-class spawnu(spawn):
- """Works like spawn, but accepts and returns unicode strings.
-
- Extra parameters:
-
- :param encoding: The encoding to use for communications (default: 'utf-8')
- :param errors: How to handle encoding/decoding errors; one of 'strict'
- (the default), 'ignore', or 'replace', as described
- for :meth:`~bytes.decode` and :meth:`~str.encode`.
- """
- if PY3:
- string_type = str
- allowed_string_types = (str, )
- _chr = staticmethod(chr)
- linesep = os.linesep
- crlf = '\r\n'
- else:
- string_type = unicode
- allowed_string_types = (unicode, )
- _chr = staticmethod(unichr)
- linesep = os.linesep.decode('ascii')
- crlf = '\r\n'.decode('ascii')
- # This can handle unicode in both Python 2 and 3
- write_to_stdout = sys.stdout.write
-
- def __init__(self, *args, **kwargs):
- self.encoding = kwargs.pop('encoding', 'utf-8')
- self.errors = kwargs.pop('errors', 'strict')
- self._decoder = codecs.getincrementaldecoder(self.encoding)(errors=self.errors)
- super(spawnu, self).__init__(*args, **kwargs)
-
- @staticmethod
- def _coerce_expect_string(s):
- return s
-
- @staticmethod
- def _coerce_send_string(s):
- return s
-
- def _coerce_read_string(self, s):
- return self._decoder.decode(s, final=False)
-
- def _send(self, s):
- return os.write(self.child_fd, s.encode(self.encoding, self.errors))
-
-
-class searcher_string(object):
-
- '''This is a plain string search helper for the spawn.expect_any() method.
- This helper class is for speed. For more powerful regex patterns
- see the helper class, searcher_re.
-
- Attributes:
-
- eof_index - index of EOF, or -1
- timeout_index - index of TIMEOUT, or -1
-
- After a successful match by the search() method the following attributes
- are available:
-
- start - index into the buffer, first byte of match
- end - index into the buffer, first byte after match
- match - the matching string itself
-
- '''
-
- def __init__(self, strings):
-
- '''This creates an instance of searcher_string. This argument 'strings'
- may be a list; a sequence of strings; or the EOF or TIMEOUT types. '''
-
- self.eof_index = -1
- self.timeout_index = -1
- self._strings = []
- for n, s in enumerate(strings):
- if s is EOF:
- self.eof_index = n
- continue
- if s is TIMEOUT:
- self.timeout_index = n
- continue
- self._strings.append((n, s))
-
- def __str__(self):
-
- '''This returns a human-readable string that represents the state of
- the object.'''
-
- ss = [(ns[0], ' %d: "%s"' % ns) for ns in self._strings]
- ss.append((-1, 'searcher_string:'))
- if self.eof_index >= 0:
- ss.append((self.eof_index, ' %d: EOF' % self.eof_index))
- if self.timeout_index >= 0:
- ss.append((self.timeout_index,
- ' %d: TIMEOUT' % self.timeout_index))
- ss.sort()
- ss = list(zip(*ss))[1]
- return '\n'.join(ss)
-
- def search(self, buffer, freshlen, searchwindowsize=None):
-
- '''This searches 'buffer' for the first occurence of one of the search
- strings. 'freshlen' must indicate the number of bytes at the end of
- 'buffer' which have not been searched before. It helps to avoid
- searching the same, possibly big, buffer over and over again.
-
- See class spawn for the 'searchwindowsize' argument.
-
- If there is a match this returns the index of that string, and sets
- 'start', 'end' and 'match'. Otherwise, this returns -1. '''
-
- first_match = None
-
- # 'freshlen' helps a lot here. Further optimizations could
- # possibly include:
- #
- # using something like the Boyer-Moore Fast String Searching
- # Algorithm; pre-compiling the search through a list of
- # strings into something that can scan the input once to
- # search for all N strings; realize that if we search for
- # ['bar', 'baz'] and the input is '...foo' we need not bother
- # rescanning until we've read three more bytes.
- #
- # Sadly, I don't know enough about this interesting topic. /grahn
-
- for index, s in self._strings:
- if searchwindowsize is None:
- # the match, if any, can only be in the fresh data,
- # or at the very end of the old data
- offset = -(freshlen + len(s))
- else:
- # better obey searchwindowsize
- offset = -searchwindowsize
- n = buffer.find(s, offset)
- if n >= 0 and (first_match is None or n < first_match):
- first_match = n
- best_index, best_match = index, s
- if first_match is None:
- return -1
- self.match = best_match
- self.start = first_match
- self.end = self.start + len(self.match)
- return best_index
-
-
-class searcher_re(object):
-
- '''This is regular expression string search helper for the
- spawn.expect_any() method. This helper class is for powerful
- pattern matching. For speed, see the helper class, searcher_string.
-
- Attributes:
-
- eof_index - index of EOF, or -1
- timeout_index - index of TIMEOUT, or -1
-
- After a successful match by the search() method the following attributes
- are available:
-
- start - index into the buffer, first byte of match
- end - index into the buffer, first byte after match
- match - the re.match object returned by a succesful re.search
-
- '''
-
- def __init__(self, patterns):
-
- '''This creates an instance that searches for 'patterns' Where
- 'patterns' may be a list or other sequence of compiled regular
- expressions, or the EOF or TIMEOUT types.'''
-
- self.eof_index = -1
- self.timeout_index = -1
- self._searches = []
- for n, s in zip(list(range(len(patterns))), patterns):
- if s is EOF:
- self.eof_index = n
- continue
- if s is TIMEOUT:
- self.timeout_index = n
- continue
- self._searches.append((n, s))
-
- def __str__(self):
-
- '''This returns a human-readable string that represents the state of
- the object.'''
-
- #ss = [(n, ' %d: re.compile("%s")' %
- # (n, repr(s.pattern))) for n, s in self._searches]
- ss = list()
- for n, s in self._searches:
- try:
- ss.append((n, ' %d: re.compile("%s")' % (n, s.pattern)))
- except UnicodeEncodeError:
- # for test cases that display __str__ of searches, dont throw
- # another exception just because stdout is ascii-only, using
- # repr()
- ss.append((n, ' %d: re.compile(%r)' % (n, s.pattern)))
- ss.append((-1, 'searcher_re:'))
- if self.eof_index >= 0:
- ss.append((self.eof_index, ' %d: EOF' % self.eof_index))
- if self.timeout_index >= 0:
- ss.append((self.timeout_index, ' %d: TIMEOUT' %
- self.timeout_index))
- ss.sort()
- ss = list(zip(*ss))[1]
- return '\n'.join(ss)
-
- def search(self, buffer, freshlen, searchwindowsize=None):
-
- '''This searches 'buffer' for the first occurence of one of the regular
- expressions. 'freshlen' must indicate the number of bytes at the end of
- 'buffer' which have not been searched before.
-
- See class spawn for the 'searchwindowsize' argument.
-
- If there is a match this returns the index of that string, and sets
- 'start', 'end' and 'match'. Otherwise, returns -1.'''
-
- first_match = None
- # 'freshlen' doesn't help here -- we cannot predict the
- # length of a match, and the re module provides no help.
- if searchwindowsize is None:
- searchstart = 0
- else:
- searchstart = max(0, len(buffer) - searchwindowsize)
- for index, s in self._searches:
- match = s.search(buffer, searchstart)
- if match is None:
- continue
- n = match.start()
- if first_match is None or n < first_match:
- first_match = n
- the_match = match
- best_index = index
- if first_match is None:
- return -1
- self.start = first_match
- self.match = the_match
- self.end = self.match.end()
- return best_index
-
-
-def is_executable_file(path):
- """Checks that path is an executable regular file (or a symlink to a file).
-
- This is roughly ``os.path isfile(path) and os.access(path, os.X_OK)``, but
- on some platforms :func:`os.access` gives us the wrong answer, so this
- checks permission bits directly.
- """
- # follow symlinks,
- fpath = os.path.realpath(path)
-
- # return False for non-files (directories, fifo, etc.)
- if not os.path.isfile(fpath):
- return False
-
- # On Solaris, etc., "If the process has appropriate privileges, an
- # implementation may indicate success for X_OK even if none of the
- # execute file permission bits are set."
- #
- # For this reason, it is necessary to explicitly check st_mode
-
- # get file mode using os.stat, and check if `other',
- # that is anybody, may read and execute.
- mode = os.stat(fpath).st_mode
- if mode & stat.S_IROTH and mode & stat.S_IXOTH:
- return True
-
- # get current user's group ids, and check if `group',
- # when matching ours, may read and execute.
- user_gids = os.getgroups() + [os.getgid()]
- if (os.stat(fpath).st_gid in user_gids and
- mode & stat.S_IRGRP and mode & stat.S_IXGRP):
- return True
-
- # finally, if file owner matches our effective userid,
- # check if `user', may read and execute.
- user_gids = os.getgroups() + [os.getgid()]
- if (os.stat(fpath).st_uid == os.geteuid() and
- mode & stat.S_IRUSR and mode & stat.S_IXUSR):
- return True
-
- return False
-
-def which(filename):
- '''This takes a given filename; tries to find it in the environment path;
- then checks if it is executable. This returns the full path to the filename
- if found and executable. Otherwise this returns None.'''
-
- # Special case where filename contains an explicit path.
- if os.path.dirname(filename) != '' and is_executable_file(filename):
- return filename
- if 'PATH' not in os.environ or os.environ['PATH'] == '':
- p = os.defpath
- else:
- p = os.environ['PATH']
- pathlist = p.split(os.pathsep)
- for path in pathlist:
- ff = os.path.join(path, filename)
- if is_executable_file(ff):
- return ff
- return None
-
-
-def split_command_line(command_line):
-
- '''This splits a command line into a list of arguments. It splits arguments
- on spaces, but handles embedded quotes, doublequotes, and escaped
- characters. It's impossible to do this with a regular expression, so I
- wrote a little state machine to parse the command line. '''
-
- arg_list = []
- arg = ''
-
- # Constants to name the states we can be in.
- state_basic = 0
- state_esc = 1
- state_singlequote = 2
- state_doublequote = 3
- # The state when consuming whitespace between commands.
- state_whitespace = 4
- state = state_basic
-
- for c in command_line:
- if state == state_basic or state == state_whitespace:
- if c == '\\':
- # Escape the next character
- state = state_esc
- elif c == r"'":
- # Handle single quote
- state = state_singlequote
- elif c == r'"':
- # Handle double quote
- state = state_doublequote
- elif c.isspace():
- # Add arg to arg_list if we aren't in the middle of whitespace.
- if state == state_whitespace:
- # Do nothing.
- None
- else:
- arg_list.append(arg)
- arg = ''
- state = state_whitespace
- else:
- arg = arg + c
- state = state_basic
- elif state == state_esc:
- arg = arg + c
- state = state_basic
- elif state == state_singlequote:
- if c == r"'":
- state = state_basic
- else:
- arg = arg + c
- elif state == state_doublequote:
- if c == r'"':
- state = state_basic
- else:
- arg = arg + c
-
- if arg != '':
- arg_list.append(arg)
- return arg_list
-
# vim: set shiftround expandtab tabstop=4 shiftwidth=4 ft=python autoindent :
diff --git a/pexpect/async.py b/pexpect/async.py
index 8ec9c3c..50eae3b 100644
--- a/pexpect/async.py
+++ b/pexpect/async.py
@@ -53,6 +53,8 @@ class PatternWaiter(asyncio.Protocol):
self.error(e)
def eof_received(self):
+ # N.B. If this gets called, async will close the pipe (the spawn object)
+ # for us
try:
index = self.expecter.eof()
except EOF as e:
diff --git a/pexpect/bashrc.sh b/pexpect/bashrc.sh
new file mode 100644
index 0000000..99a3ac2
--- /dev/null
+++ b/pexpect/bashrc.sh
@@ -0,0 +1,5 @@
+source /etc/bash.bashrc
+source ~/.bashrc
+
+# Reset PS1 so pexpect can find it
+PS1="$"
diff --git a/pexpect/exceptions.py b/pexpect/exceptions.py
new file mode 100644
index 0000000..cb360f0
--- /dev/null
+++ b/pexpect/exceptions.py
@@ -0,0 +1,35 @@
+"""Exception classes used by Pexpect"""
+
+import traceback
+import sys
+
+class ExceptionPexpect(Exception):
+ '''Base class for all exceptions raised by this module.
+ '''
+
+ def __init__(self, value):
+ super(ExceptionPexpect, self).__init__(value)
+ self.value = value
+
+ def __str__(self):
+ return str(self.value)
+
+ def get_trace(self):
+ '''This returns an abbreviated stack trace with lines that only concern
+ the caller. In other words, the stack trace inside the Pexpect module
+ is not included. '''
+
+ tblist = traceback.extract_tb(sys.exc_info()[2])
+ tblist = [item for item in tblist if ('pexpect/__init__' not in item[0])
+ and ('pexpect/expect' not in item[0])]
+ tblist = traceback.format_list(tblist)
+ return ''.join(tblist)
+
+
+class EOF(ExceptionPexpect):
+ '''Raised when EOF is read from a child.
+ This usually means the child has exited.'''
+
+
+class TIMEOUT(ExceptionPexpect):
+ '''Raised when a read time exceeds the timeout. '''
diff --git a/pexpect/expect.py b/pexpect/expect.py
index b8da406..6fde9e8 100644
--- a/pexpect/expect.py
+++ b/pexpect/expect.py
@@ -1,5 +1,7 @@
import time
+from .exceptions import EOF, TIMEOUT
+
class Expecter(object):
def __init__(self, spawn, searcher, searchwindowsize=-1):
self.spawn = spawn
@@ -102,4 +104,194 @@ class Expecter(object):
return self.timeout(e)
except:
self.errored()
- raise \ No newline at end of file
+ raise
+
+
+class searcher_string(object):
+ '''This is a plain string search helper for the spawn.expect_any() method.
+ This helper class is for speed. For more powerful regex patterns
+ see the helper class, searcher_re.
+
+ Attributes:
+
+ eof_index - index of EOF, or -1
+ timeout_index - index of TIMEOUT, or -1
+
+ After a successful match by the search() method the following attributes
+ are available:
+
+ start - index into the buffer, first byte of match
+ end - index into the buffer, first byte after match
+ match - the matching string itself
+
+ '''
+
+ def __init__(self, strings):
+ '''This creates an instance of searcher_string. This argument 'strings'
+ may be a list; a sequence of strings; or the EOF or TIMEOUT types. '''
+
+ self.eof_index = -1
+ self.timeout_index = -1
+ self._strings = []
+ for n, s in enumerate(strings):
+ if s is EOF:
+ self.eof_index = n
+ continue
+ if s is TIMEOUT:
+ self.timeout_index = n
+ continue
+ self._strings.append((n, s))
+
+ def __str__(self):
+ '''This returns a human-readable string that represents the state of
+ the object.'''
+
+ ss = [(ns[0], ' %d: "%s"' % ns) for ns in self._strings]
+ ss.append((-1, 'searcher_string:'))
+ if self.eof_index >= 0:
+ ss.append((self.eof_index, ' %d: EOF' % self.eof_index))
+ if self.timeout_index >= 0:
+ ss.append((self.timeout_index,
+ ' %d: TIMEOUT' % self.timeout_index))
+ ss.sort()
+ ss = list(zip(*ss))[1]
+ return '\n'.join(ss)
+
+ def search(self, buffer, freshlen, searchwindowsize=None):
+ '''This searches 'buffer' for the first occurence of one of the search
+ strings. 'freshlen' must indicate the number of bytes at the end of
+ 'buffer' which have not been searched before. It helps to avoid
+ searching the same, possibly big, buffer over and over again.
+
+ See class spawn for the 'searchwindowsize' argument.
+
+ If there is a match this returns the index of that string, and sets
+ 'start', 'end' and 'match'. Otherwise, this returns -1. '''
+
+ first_match = None
+
+ # 'freshlen' helps a lot here. Further optimizations could
+ # possibly include:
+ #
+ # using something like the Boyer-Moore Fast String Searching
+ # Algorithm; pre-compiling the search through a list of
+ # strings into something that can scan the input once to
+ # search for all N strings; realize that if we search for
+ # ['bar', 'baz'] and the input is '...foo' we need not bother
+ # rescanning until we've read three more bytes.
+ #
+ # Sadly, I don't know enough about this interesting topic. /grahn
+
+ for index, s in self._strings:
+ if searchwindowsize is None:
+ # the match, if any, can only be in the fresh data,
+ # or at the very end of the old data
+ offset = -(freshlen + len(s))
+ else:
+ # better obey searchwindowsize
+ offset = -searchwindowsize
+ n = buffer.find(s, offset)
+ if n >= 0 and (first_match is None or n < first_match):
+ first_match = n
+ best_index, best_match = index, s
+ if first_match is None:
+ return -1
+ self.match = best_match
+ self.start = first_match
+ self.end = self.start + len(self.match)
+ return best_index
+
+
+class searcher_re(object):
+ '''This is regular expression string search helper for the
+ spawn.expect_any() method. This helper class is for powerful
+ pattern matching. For speed, see the helper class, searcher_string.
+
+ Attributes:
+
+ eof_index - index of EOF, or -1
+ timeout_index - index of TIMEOUT, or -1
+
+ After a successful match by the search() method the following attributes
+ are available:
+
+ start - index into the buffer, first byte of match
+ end - index into the buffer, first byte after match
+ match - the re.match object returned by a succesful re.search
+
+ '''
+
+ def __init__(self, patterns):
+ '''This creates an instance that searches for 'patterns' Where
+ 'patterns' may be a list or other sequence of compiled regular
+ expressions, or the EOF or TIMEOUT types.'''
+
+ self.eof_index = -1
+ self.timeout_index = -1
+ self._searches = []
+ for n, s in zip(list(range(len(patterns))), patterns):
+ if s is EOF:
+ self.eof_index = n
+ continue
+ if s is TIMEOUT:
+ self.timeout_index = n
+ continue
+ self._searches.append((n, s))
+
+ def __str__(self):
+ '''This returns a human-readable string that represents the state of
+ the object.'''
+
+ #ss = [(n, ' %d: re.compile("%s")' %
+ # (n, repr(s.pattern))) for n, s in self._searches]
+ ss = list()
+ for n, s in self._searches:
+ try:
+ ss.append((n, ' %d: re.compile("%s")' % (n, s.pattern)))
+ except UnicodeEncodeError:
+ # for test cases that display __str__ of searches, dont throw
+ # another exception just because stdout is ascii-only, using
+ # repr()
+ ss.append((n, ' %d: re.compile(%r)' % (n, s.pattern)))
+ ss.append((-1, 'searcher_re:'))
+ if self.eof_index >= 0:
+ ss.append((self.eof_index, ' %d: EOF' % self.eof_index))
+ if self.timeout_index >= 0:
+ ss.append((self.timeout_index, ' %d: TIMEOUT' %
+ self.timeout_index))
+ ss.sort()
+ ss = list(zip(*ss))[1]
+ return '\n'.join(ss)
+
+ def search(self, buffer, freshlen, searchwindowsize=None):
+ '''This searches 'buffer' for the first occurence of one of the regular
+ expressions. 'freshlen' must indicate the number of bytes at the end of
+ 'buffer' which have not been searched before.
+
+ See class spawn for the 'searchwindowsize' argument.
+
+ If there is a match this returns the index of that string, and sets
+ 'start', 'end' and 'match'. Otherwise, returns -1.'''
+
+ first_match = None
+ # 'freshlen' doesn't help here -- we cannot predict the
+ # length of a match, and the re module provides no help.
+ if searchwindowsize is None:
+ searchstart = 0
+ else:
+ searchstart = max(0, len(buffer) - searchwindowsize)
+ for index, s in self._searches:
+ match = s.search(buffer, searchstart)
+ if match is None:
+ continue
+ n = match.start()
+ if first_match is None or n < first_match:
+ first_match = n
+ the_match = match
+ best_index = index
+ if first_match is None:
+ return -1
+ self.start = first_match
+ self.match = the_match
+ self.end = self.match.end()
+ return best_index \ No newline at end of file
diff --git a/pexpect/fdpexpect.py b/pexpect/fdpexpect.py
index fe4ad89..96ca2e1 100644
--- a/pexpect/fdpexpect.py
+++ b/pexpect/fdpexpect.py
@@ -21,26 +21,22 @@ PEXPECT LICENSE
'''
-from pexpect import spawn, ExceptionPexpect
+from .spawnbase import SpawnBase
+from .exceptions import ExceptionPexpect
import os
__all__ = ['fdspawn']
-class fdspawn (spawn):
-
+class fdspawn(SpawnBase):
'''This is like pexpect.spawn but allows you to supply your own open file
descriptor. For example, you could use it to read through a file looking
for patterns, or to control a modem or serial device. '''
- def __init__ (self, fd, args=[], timeout=30, maxread=2000, searchwindowsize=None, logfile=None):
-
+ def __init__ (self, fd, args=None, timeout=30, maxread=2000, searchwindowsize=None, logfile=None):
'''This takes a file descriptor (an int) or an object that support the
fileno() method (returning an int). All Python file-like objects
support fileno(). '''
- ### TODO: Add better handling of trying to use fdspawn in place of spawn
- ### TODO: (overload to allow fdspawn to also handle commands as spawn does.
-
if type(fd) != type(0) and hasattr(fd, 'fileno'):
fd = fd.fileno()
@@ -54,15 +50,12 @@ class fdspawn (spawn):
self.args = None
self.command = None
- spawn.__init__(self, None, args, timeout, maxread, searchwindowsize, logfile)
+ SpawnBase.__init__(self, timeout, maxread, searchwindowsize, logfile)
self.child_fd = fd
self.own_fd = False
self.closed = False
self.name = '<file descriptor %d>' % fd
- def __del__ (self):
- return
-
def close (self):
"""Close the file descriptor.
@@ -91,7 +84,3 @@ class fdspawn (spawn):
def terminate (self, force=False): # pragma: no cover
raise ExceptionPexpect('This method is not valid for file descriptors.')
-
- def kill (self, sig): # pragma: no cover
- """No-op - no process to kill."""
- return
diff --git a/pexpect/pty_spawn.py b/pexpect/pty_spawn.py
new file mode 100644
index 0000000..0663926
--- /dev/null
+++ b/pexpect/pty_spawn.py
@@ -0,0 +1,819 @@
+import os
+import sys
+import time
+import select
+import re
+import pty
+import tty
+import termios
+import errno
+import signal
+from contextlib import contextmanager
+
+import ptyprocess
+from ptyprocess.ptyprocess import use_native_pty_fork
+
+from .exceptions import ExceptionPexpect, EOF, TIMEOUT
+from .spawnbase import SpawnBase, SpawnBaseUnicode
+from .utils import which, split_command_line
+
+@contextmanager
+def _wrap_ptyprocess_err():
+ """Turn ptyprocess errors into our own ExceptionPexpect errors"""
+ try:
+ yield
+ except ptyprocess.PtyProcessError as e:
+ raise ExceptionPexpect(*e.args)
+
+PY3 = (sys.version_info[0] >= 3)
+
+class spawn(SpawnBase):
+ '''This is the main class interface for Pexpect. Use this class to start
+ and control child applications. '''
+ ptyprocess_class = ptyprocess.PtyProcess
+
+ # This is purely informational now - changing it has no effect
+ use_native_pty_fork = use_native_pty_fork
+
+ def __init__(self, command, args=[], timeout=30, maxread=2000,
+ searchwindowsize=None, logfile=None, cwd=None, env=None,
+ ignore_sighup=True, echo=True, preexec_fn=None):
+ '''This is the constructor. The command parameter may be a string that
+ includes a command and any arguments to the command. For example::
+
+ child = pexpect.spawn('/usr/bin/ftp')
+ child = pexpect.spawn('/usr/bin/ssh user@example.com')
+ child = pexpect.spawn('ls -latr /tmp')
+
+ You may also construct it with a list of arguments like so::
+
+ child = pexpect.spawn('/usr/bin/ftp', [])
+ child = pexpect.spawn('/usr/bin/ssh', ['user@example.com'])
+ child = pexpect.spawn('ls', ['-latr', '/tmp'])
+
+ After this the child application will be created and will be ready to
+ talk to. For normal use, see expect() and send() and sendline().
+
+ Remember that Pexpect does NOT interpret shell meta characters such as
+ redirect, pipe, or wild cards (``>``, ``|``, or ``*``). This is a
+ common mistake. If you want to run a command and pipe it through
+ another command then you must also start a shell. For example::
+
+ child = pexpect.spawn('/bin/bash -c "ls -l | grep LOG > logs.txt"')
+ child.expect(pexpect.EOF)
+
+ The second form of spawn (where you pass a list of arguments) is useful
+ in situations where you wish to spawn a command and pass it its own
+ argument list. This can make syntax more clear. For example, the
+ following is equivalent to the previous example::
+
+ shell_cmd = 'ls -l | grep LOG > logs.txt'
+ child = pexpect.spawn('/bin/bash', ['-c', shell_cmd])
+ child.expect(pexpect.EOF)
+
+ The maxread attribute sets the read buffer size. This is maximum number
+ of bytes that Pexpect will try to read from a TTY at one time. Setting
+ the maxread size to 1 will turn off buffering. Setting the maxread
+ value higher may help performance in cases where large amounts of
+ output are read back from the child. This feature is useful in
+ conjunction with searchwindowsize.
+
+ The searchwindowsize attribute sets the how far back in the incoming
+ seach buffer Pexpect will search for pattern matches. Every time
+ Pexpect reads some data from the child it will append the data to the
+ incoming buffer. The default is to search from the beginning of the
+ incoming buffer each time new data is read from the child. But this is
+ very inefficient if you are running a command that generates a large
+ amount of data where you want to match. The searchwindowsize does not
+ affect the size of the incoming data buffer. You will still have
+ access to the full buffer after expect() returns.
+
+ The logfile member turns on or off logging. All input and output will
+ be copied to the given file object. Set logfile to None to stop
+ logging. This is the default. Set logfile to sys.stdout to echo
+ everything to standard output. The logfile is flushed after each write.
+
+ Example log input and output to a file::
+
+ child = pexpect.spawn('some_command')
+ fout = open('mylog.txt','wb')
+ child.logfile = fout
+
+ Example log to stdout::
+
+ # In Python 2:
+ child = pexpect.spawn('some_command')
+ child.logfile = sys.stdout
+
+ # In Python 3, spawnu should be used to give str to stdout:
+ child = pexpect.spawnu('some_command')
+ child.logfile = sys.stdout
+
+ The logfile_read and logfile_send members can be used to separately log
+ the input from the child and output sent to the child. Sometimes you
+ don't want to see everything you write to the child. You only want to
+ log what the child sends back. For example::
+
+ child = pexpect.spawn('some_command')
+ child.logfile_read = sys.stdout
+
+ Remember to use spawnu instead of spawn for the above code if you are
+ using Python 3.
+
+ To separately log output sent to the child use logfile_send::
+
+ child.logfile_send = fout
+
+ If ``ignore_sighup`` is True, the child process will ignore SIGHUP
+ signals. For now, the default is True, to preserve the behaviour of
+ earlier versions of Pexpect, but you should pass this explicitly if you
+ want to rely on it.
+
+ The delaybeforesend helps overcome a weird behavior that many users
+ were experiencing. The typical problem was that a user would expect() a
+ "Password:" prompt and then immediately call sendline() to send the
+ password. The user would then see that their password was echoed back
+ to them. Passwords don't normally echo. The problem is caused by the
+ fact that most applications print out the "Password" prompt and then
+ turn off stdin echo, but if you send your password before the
+ application turned off echo, then you get your password echoed.
+ Normally this wouldn't be a problem when interacting with a human at a
+ real keyboard. If you introduce a slight delay just before writing then
+ this seems to clear up the problem. This was such a common problem for
+ many users that I decided that the default pexpect behavior should be
+ to sleep just before writing to the child application. 1/20th of a
+ second (50 ms) seems to be enough to clear up the problem. You can set
+ delaybeforesend to 0 to return to the old behavior. Most Linux machines
+ don't like this to be below 0.03. I don't know why.
+
+ Note that spawn is clever about finding commands on your path.
+ It uses the same logic that "which" uses to find executables.
+
+ If you wish to get the exit status of the child you must call the
+ close() method. The exit or signal status of the child will be stored
+ in self.exitstatus or self.signalstatus. If the child exited normally
+ then exitstatus will store the exit return code and signalstatus will
+ be None. If the child was terminated abnormally with a signal then
+ signalstatus will store the signal value and exitstatus will be None.
+ If you need more detail you can also read the self.status member which
+ stores the status returned by os.waitpid. You can interpret this using
+ os.WIFEXITED/os.WEXITSTATUS or os.WIFSIGNALED/os.TERMSIG.
+
+ The echo attribute may be set to False to disable echoing of input.
+ As a pseudo-terminal, all input echoed by the "keyboard" (send()
+ or sendline()) will be repeated to output. For many cases, it is
+ not desirable to have echo enabled, and it may be later disabled
+ using setecho(False) followed by waitnoecho(). However, for some
+ platforms such as Solaris, this is not possible, and should be
+ disabled immediately on spawn.
+
+ If preexec_fn is given, it will be called in the child process before
+ launching the given command. This is useful to e.g. reset inherited
+ signal handlers.
+ '''
+ super(spawn, self).__init__(timeout=timeout, maxread=maxread, searchwindowsize=searchwindowsize,
+ logfile=logfile)
+ self.STDIN_FILENO = pty.STDIN_FILENO
+ self.STDOUT_FILENO = pty.STDOUT_FILENO
+ self.STDERR_FILENO = pty.STDERR_FILENO
+ self.cwd = cwd
+ self.env = env
+ self.echo = echo
+ self.ignore_sighup = ignore_sighup
+ self.__irix_hack = sys.platform.lower().startswith('irix')
+ if command is None:
+ self.command = None
+ self.args = None
+ self.name = '<pexpect factory incomplete>'
+ else:
+ self._spawn(command, args, preexec_fn)
+
+ def __str__(self):
+ '''This returns a human-readable string that represents the state of
+ the object. '''
+
+ s = []
+ s.append(repr(self))
+ s.append('command: ' + str(self.command))
+ s.append('args: %r' % (self.args,))
+ s.append('searcher: %r' % (self.searcher,))
+ s.append('buffer (last 100 chars): %r' % (
+ self.buffer[-100:] if self.buffer else self.buffer,))
+ s.append('before (last 100 chars): %r' % (
+ self.before[-100:] if self.before else self.before,))
+ s.append('after: %r' % (self.after,))
+ s.append('match: %r' % (self.match,))
+ s.append('match_index: ' + str(self.match_index))
+ s.append('exitstatus: ' + str(self.exitstatus))
+ s.append('flag_eof: ' + str(self.flag_eof))
+ s.append('pid: ' + str(self.pid))
+ s.append('child_fd: ' + str(self.child_fd))
+ s.append('closed: ' + str(self.closed))
+ s.append('timeout: ' + str(self.timeout))
+ s.append('delimiter: ' + str(self.delimiter))
+ s.append('logfile: ' + str(self.logfile))
+ s.append('logfile_read: ' + str(self.logfile_read))
+ s.append('logfile_send: ' + str(self.logfile_send))
+ s.append('maxread: ' + str(self.maxread))
+ s.append('ignorecase: ' + str(self.ignorecase))
+ s.append('searchwindowsize: ' + str(self.searchwindowsize))
+ s.append('delaybeforesend: ' + str(self.delaybeforesend))
+ s.append('delayafterclose: ' + str(self.delayafterclose))
+ s.append('delayafterterminate: ' + str(self.delayafterterminate))
+ return '\n'.join(s)
+
+ def _spawn(self, command, args=[], preexec_fn=None):
+ '''This starts the given command in a child process. This does all the
+ fork/exec type of stuff for a pty. This is called by __init__. If args
+ is empty then command will be parsed (split on spaces) and args will be
+ set to parsed arguments. '''
+
+ # The pid and child_fd of this object get set by this method.
+ # Note that it is difficult for this method to fail.
+ # You cannot detect if the child process cannot start.
+ # So the only way you can tell if the child process started
+ # or not is to try to read from the file descriptor. If you get
+ # EOF immediately then it means that the child is already dead.
+ # That may not necessarily be bad because you may have spawned a child
+ # that performs some task; creates no stdout output; and then dies.
+
+ # If command is an int type then it may represent a file descriptor.
+ if isinstance(command, type(0)):
+ raise ExceptionPexpect('Command is an int type. ' +
+ 'If this is a file descriptor then maybe you want to ' +
+ 'use fdpexpect.fdspawn which takes an existing ' +
+ 'file descriptor instead of a command string.')
+
+ if not isinstance(args, type([])):
+ raise TypeError('The argument, args, must be a list.')
+
+ if args == []:
+ self.args = split_command_line(command)
+ self.command = self.args[0]
+ else:
+ # Make a shallow copy of the args list.
+ self.args = args[:]
+ self.args.insert(0, command)
+ self.command = command
+
+ command_with_path = which(self.command)
+ if command_with_path is None:
+ raise ExceptionPexpect('The command was not found or was not ' +
+ 'executable: %s.' % self.command)
+ self.command = command_with_path
+ self.args[0] = self.command
+
+ self.name = '<' + ' '.join(self.args) + '>'
+
+ assert self.pid is None, 'The pid member must be None.'
+ assert self.command is not None, 'The command member must not be None.'
+
+ kwargs = {'echo': self.echo, 'preexec_fn': preexec_fn}
+ if self.ignore_sighup:
+ def preexec_wrapper():
+ "Set SIGHUP to be ignored, then call the real preexec_fn"
+ signal.signal(signal.SIGHUP, signal.SIG_IGN)
+ if preexec_fn is not None:
+ preexec_fn()
+ kwargs['preexec_fn'] = preexec_wrapper
+
+ self.ptyproc = self.ptyprocess_class.spawn(self.args, env=self.env,
+ cwd=self.cwd, **kwargs)
+
+ self.pid = self.ptyproc.pid
+ self.child_fd = self.ptyproc.fd
+
+
+ self.terminated = False
+ self.closed = False
+
+ def close(self, force=True):
+ '''This closes the connection with the child application. Note that
+ calling close() more than once is valid. This emulates standard Python
+ behavior with files. Set force to True if you want to make sure that
+ the child is terminated (SIGKILL is sent if the child ignores SIGHUP
+ and SIGINT). '''
+
+ self.flush()
+ self.ptyproc.close()
+ self.isalive() # Update exit status from ptyproc
+ self.child_fd = -1
+
+ def isatty(self):
+ '''This returns True if the file descriptor is open and connected to a
+ tty(-like) device, else False.
+
+ On SVR4-style platforms implementing streams, such as SunOS and HP-UX,
+ the child pty may not appear as a terminal device. This means
+ methods such as setecho(), setwinsize(), getwinsize() may raise an
+ IOError. '''
+
+ return os.isatty(self.child_fd)
+
+ def waitnoecho(self, timeout=-1):
+ '''This waits until the terminal ECHO flag is set False. This returns
+ True if the echo mode is off. This returns False if the ECHO flag was
+ not set False before the timeout. This can be used to detect when the
+ child is waiting for a password. Usually a child application will turn
+ off echo mode when it is waiting for the user to enter a password. For
+ example, instead of expecting the "password:" prompt you can wait for
+ the child to set ECHO off::
+
+ p = pexpect.spawn('ssh user@example.com')
+ p.waitnoecho()
+ p.sendline(mypassword)
+
+ If timeout==-1 then this method will use the value in self.timeout.
+ If timeout==None then this method to block until ECHO flag is False.
+ '''
+
+ if timeout == -1:
+ timeout = self.timeout
+ if timeout is not None:
+ end_time = time.time() + timeout
+ while True:
+ if not self.getecho():
+ return True
+ if timeout < 0 and timeout is not None:
+ return False
+ if timeout is not None:
+ timeout = end_time - time.time()
+ time.sleep(0.1)
+
+ def getecho(self):
+ '''This returns the terminal echo mode. This returns True if echo is
+ on or False if echo is off. Child applications that are expecting you
+ to enter a password often set ECHO False. See waitnoecho().
+
+ Not supported on platforms where ``isatty()`` returns False. '''
+ return self.ptyproc.getecho()
+
+ def setecho(self, state):
+ '''This sets the terminal echo mode on or off. Note that anything the
+ child sent before the echo will be lost, so you should be sure that
+ your input buffer is empty before you call setecho(). For example, the
+ following will work as expected::
+
+ p = pexpect.spawn('cat') # Echo is on by default.
+ p.sendline('1234') # We expect see this twice from the child...
+ p.expect(['1234']) # ... once from the tty echo...
+ p.expect(['1234']) # ... and again from cat itself.
+ p.setecho(False) # Turn off tty echo
+ p.sendline('abcd') # We will set this only once (echoed by cat).
+ p.sendline('wxyz') # We will set this only once (echoed by cat)
+ p.expect(['abcd'])
+ p.expect(['wxyz'])
+
+ The following WILL NOT WORK because the lines sent before the setecho
+ will be lost::
+
+ p = pexpect.spawn('cat')
+ p.sendline('1234')
+ p.setecho(False) # Turn off tty echo
+ p.sendline('abcd') # We will set this only once (echoed by cat).
+ p.sendline('wxyz') # We will set this only once (echoed by cat)
+ p.expect(['1234'])
+ p.expect(['1234'])
+ p.expect(['abcd'])
+ p.expect(['wxyz'])
+
+
+ Not supported on platforms where ``isatty()`` returns False.
+ '''
+ return self.ptyproc.setecho(state)
+
+ self.echo = state
+
+ def read_nonblocking(self, size=1, timeout=-1):
+ '''This reads at most size characters from the child application. It
+ includes a timeout. If the read does not complete within the timeout
+ period then a TIMEOUT exception is raised. If the end of file is read
+ then an EOF exception will be raised. If a log file was set using
+ setlog() then all data will also be written to the log file.
+
+ If timeout is None then the read may block indefinitely.
+ If timeout is -1 then the self.timeout value is used. If timeout is 0
+ then the child is polled and if there is no data immediately ready
+ then this will raise a TIMEOUT exception.
+
+ The timeout refers only to the amount of time to read at least one
+ character. This is not effected by the 'size' parameter, so if you call
+ read_nonblocking(size=100, timeout=30) and only one character is
+ available right away then one character will be returned immediately.
+ It will not wait for 30 seconds for another 99 characters to come in.
+
+ This is a wrapper around os.read(). It uses select.select() to
+ implement the timeout. '''
+
+ if self.closed:
+ raise ValueError('I/O operation on closed file.')
+
+ if timeout == -1:
+ timeout = self.timeout
+
+ # Note that some systems such as Solaris do not give an EOF when
+ # the child dies. In fact, you can still try to read
+ # from the child_fd -- it will block forever or until TIMEOUT.
+ # For this case, I test isalive() before doing any reading.
+ # If isalive() is false, then I pretend that this is the same as EOF.
+ if not self.isalive():
+ # timeout of 0 means "poll"
+ r, w, e = self.__select([self.child_fd], [], [], 0)
+ if not r:
+ self.flag_eof = True
+ raise EOF('End Of File (EOF). Braindead platform.')
+ elif self.__irix_hack:
+ # Irix takes a long time before it realizes a child was terminated.
+ # FIXME So does this mean Irix systems are forced to always have
+ # FIXME a 2 second delay when calling read_nonblocking? That sucks.
+ r, w, e = self.__select([self.child_fd], [], [], 2)
+ if not r and not self.isalive():
+ self.flag_eof = True
+ raise EOF('End Of File (EOF). Slow platform.')
+
+ r, w, e = self.__select([self.child_fd], [], [], timeout)
+
+ if not r:
+ if not self.isalive():
+ # Some platforms, such as Irix, will claim that their
+ # processes are alive; timeout on the select; and
+ # then finally admit that they are not alive.
+ self.flag_eof = True
+ raise EOF('End of File (EOF). Very slow platform.')
+ else:
+ raise TIMEOUT('Timeout exceeded.')
+
+ if self.child_fd in r:
+ return super(spawn, self).read_nonblocking(size)
+
+ raise ExceptionPexpect('Reached an unexpected state.') # pragma: no cover
+
+ def write(self, s):
+ '''This is similar to send() except that there is no return value.
+ '''
+
+ self.send(s)
+
+ def writelines(self, sequence):
+ '''This calls write() for each element in the sequence. The sequence
+ can be any iterable object producing strings, typically a list of
+ strings. This does not add line separators. There is no return value.
+ '''
+
+ for s in sequence:
+ self.write(s)
+
+ def send(self, s):
+ '''Sends string ``s`` to the child process, returning the number of
+ bytes written. If a logfile is specified, a copy is written to that
+ log.
+
+ The default terminal input mode is canonical processing unless set
+ otherwise by the child process. This allows backspace and other line
+ processing to be performed prior to transmitting to the receiving
+ program. As this is buffered, there is a limited size of such buffer.
+
+ On Linux systems, this is 4096 (defined by N_TTY_BUF_SIZE). All
+ other systems honor the POSIX.1 definition PC_MAX_CANON -- 1024
+ on OSX, 256 on OpenSolaris, 255 on FreeBSD.
+
+ This value may be discovered using fpathconf(3)::
+
+ >>> from os import fpathconf
+ >>> print(fpathconf(0, 'PC_MAX_CANON'))
+ 256
+
+ On such a system, only 256 bytes may be received per line. Any
+ subsequent bytes received will be discarded. BEL (``'\a'``) is then
+ sent to output if IMAXBEL (termios.h) is set by the tty driver.
+ This is usually enabled by default. Linux does not honor this as
+ an option -- it behaves as though it is always set on.
+
+ Canonical input processing may be disabled altogether by executing
+ a shell, then stty(1), before executing the final program::
+
+ >>> bash = pexpect.spawn('/bin/bash', echo=False)
+ >>> bash.sendline('stty -icanon')
+ >>> bash.sendline('base64')
+ >>> bash.sendline('x' * 5000)
+ '''
+
+ time.sleep(self.delaybeforesend)
+
+ s = self._coerce_send_string(s)
+ self._log(s, 'send')
+
+ return self._send(s)
+
+ def _send(self, s):
+ return os.write(self.child_fd, s)
+
+ def sendline(self, s=''):
+ '''Wraps send(), sending string ``s`` to child process, with
+ ``os.linesep`` automatically appended. Returns number of bytes
+ written. Only a limited number of bytes may be sent for each
+ line in the default terminal mode, see docstring of :meth:`send`.
+ '''
+
+ n = self.send(s)
+ n = n + self.send(self.linesep)
+ return n
+
+ def _log_control(self, byte):
+ """Write control characters to the appropriate log files"""
+ self._log(byte, 'send')
+
+ def sendcontrol(self, char):
+ '''Helper method that wraps send() with mnemonic access for sending control
+ character to the child (such as Ctrl-C or Ctrl-D). For example, to send
+ Ctrl-G (ASCII 7, bell, '\a')::
+
+ child.sendcontrol('g')
+
+ See also, sendintr() and sendeof().
+ '''
+ n, byte = self.ptyproc.sendcontrol(char)
+ self._log_control(byte)
+ return n
+
+ def sendeof(self):
+ '''This sends an EOF to the child. This sends a character which causes
+ the pending parent output buffer to be sent to the waiting child
+ program without waiting for end-of-line. If it is the first character
+ of the line, the read() in the user program returns 0, which signifies
+ end-of-file. This means to work as expected a sendeof() has to be
+ called at the beginning of a line. This method does not send a newline.
+ It is the responsibility of the caller to ensure the eof is sent at the
+ beginning of a line. '''
+
+ n, byte = self.ptyproc.sendeof()
+ self._log_control(byte)
+
+ def sendintr(self):
+ '''This sends a SIGINT to the child. It does not require
+ the SIGINT to be the first character on a line. '''
+
+ n, byte = self.ptyproc.sendintr()
+ self._log_control(byte)
+
+ @property
+ def flag_eof(self):
+ return self.ptyproc.flag_eof
+
+ @flag_eof.setter
+ def flag_eof(self, value):
+ self.ptyproc.flag_eof = value
+
+ def eof(self):
+ '''This returns True if the EOF exception was ever raised.
+ '''
+ return self.flag_eof
+
+ def terminate(self, force=False):
+ '''This forces a child process to terminate. It starts nicely with
+ SIGHUP and SIGINT. If "force" is True then moves onto SIGKILL. This
+ returns True if the child was terminated. This returns False if the
+ child could not be terminated. '''
+
+ if not self.isalive():
+ return True
+ try:
+ self.kill(signal.SIGHUP)
+ time.sleep(self.delayafterterminate)
+ if not self.isalive():
+ return True
+ self.kill(signal.SIGCONT)
+ time.sleep(self.delayafterterminate)
+ if not self.isalive():
+ return True
+ self.kill(signal.SIGINT)
+ time.sleep(self.delayafterterminate)
+ if not self.isalive():
+ return True
+ if force:
+ self.kill(signal.SIGKILL)
+ time.sleep(self.delayafterterminate)
+ if not self.isalive():
+ return True
+ else:
+ return False
+ return False
+ except OSError:
+ # I think there are kernel timing issues that sometimes cause
+ # this to happen. I think isalive() reports True, but the
+ # process is dead to the kernel.
+ # Make one last attempt to see if the kernel is up to date.
+ time.sleep(self.delayafterterminate)
+ if not self.isalive():
+ return True
+ else:
+ return False
+
+ def wait(self):
+ '''This waits until the child exits. This is a blocking call. This will
+ not read any data from the child, so this will block forever if the
+ child has unread output and has terminated. In other words, the child
+ may have printed output then called exit(), but, the child is
+ technically still alive until its output is read by the parent. '''
+
+ ptyproc = self.ptyproc
+ with _wrap_ptyprocess_err():
+ exitstatus = ptyproc.wait()
+ self.status = ptyproc.status
+ self.exitstatus = ptyproc.exitstatus
+ self.signalstatus = ptyproc.signalstatus
+ self.terminated = True
+
+ return exitstatus
+
+ def isalive(self):
+ '''This tests if the child process is running or not. This is
+ non-blocking. If the child was terminated then this will read the
+ exitstatus or signalstatus of the child. This returns True if the child
+ process appears to be running or False if not. It can take literally
+ SECONDS for Solaris to return the right status. '''
+
+ ptyproc = self.ptyproc
+ with _wrap_ptyprocess_err():
+ alive = ptyproc.isalive()
+
+ if not alive:
+ self.status = ptyproc.status
+ self.exitstatus = ptyproc.exitstatus
+ self.signalstatus = ptyproc.signalstatus
+ self.terminated = True
+
+ return alive
+
+ def kill(self, sig):
+
+ '''This sends the given signal to the child application. In keeping
+ with UNIX tradition it has a misleading name. It does not necessarily
+ kill the child unless you send the right signal. '''
+
+ # Same as os.kill, but the pid is given for you.
+ if self.isalive():
+ os.kill(self.pid, sig)
+
+ def getwinsize(self):
+ '''This returns the terminal window size of the child tty. The return
+ value is a tuple of (rows, cols). '''
+ return self.ptyproc.getwinsize()
+
+ def setwinsize(self, rows, cols):
+ '''This sets the terminal window size of the child tty. This will cause
+ a SIGWINCH signal to be sent to the child. This does not change the
+ physical window size. It changes the size reported to TTY-aware
+ applications like vi or curses -- applications that respond to the
+ SIGWINCH signal. '''
+ return self.ptyproc.setwinsize(rows, cols)
+
+
+ def interact(self, escape_character=chr(29),
+ input_filter=None, output_filter=None):
+
+ '''This gives control of the child process to the interactive user (the
+ human at the keyboard). Keystrokes are sent to the child process, and
+ the stdout and stderr output of the child process is printed. This
+ simply echos the child stdout and child stderr to the real stdout and
+ it echos the real stdin to the child stdin. When the user types the
+ escape_character this method will stop. The default for
+ escape_character is ^]. This should not be confused with ASCII 27 --
+ the ESC character. ASCII 29 was chosen for historical merit because
+ this is the character used by 'telnet' as the escape character. The
+ escape_character will not be sent to the child process.
+
+ You may pass in optional input and output filter functions. These
+ functions should take a string and return a string. The output_filter
+ will be passed all the output from the child process. The input_filter
+ will be passed all the keyboard input from the user. The input_filter
+ is run BEFORE the check for the escape_character.
+
+ Note that if you change the window size of the parent the SIGWINCH
+ signal will not be passed through to the child. If you want the child
+ window size to change when the parent's window size changes then do
+ something like the following example::
+
+ import pexpect, struct, fcntl, termios, signal, sys
+ def sigwinch_passthrough (sig, data):
+ s = struct.pack("HHHH", 0, 0, 0, 0)
+ a = struct.unpack('hhhh', fcntl.ioctl(sys.stdout.fileno(),
+ termios.TIOCGWINSZ , s))
+ global p
+ p.setwinsize(a[0],a[1])
+ # Note this 'p' global and used in sigwinch_passthrough.
+ p = pexpect.spawn('/bin/bash')
+ signal.signal(signal.SIGWINCH, sigwinch_passthrough)
+ p.interact()
+ '''
+
+ # Flush the buffer.
+ self.write_to_stdout(self.buffer)
+ self.stdout.flush()
+ self.buffer = self.string_type()
+ mode = tty.tcgetattr(self.STDIN_FILENO)
+ tty.setraw(self.STDIN_FILENO)
+ if PY3:
+ escape_character = escape_character.encode('latin-1')
+ try:
+ self.__interact_copy(escape_character, input_filter, output_filter)
+ finally:
+ tty.tcsetattr(self.STDIN_FILENO, tty.TCSAFLUSH, mode)
+
+ def __interact_writen(self, fd, data):
+ '''This is used by the interact() method.
+ '''
+
+ while data != b'' and self.isalive():
+ n = os.write(fd, data)
+ data = data[n:]
+
+ def __interact_read(self, fd):
+ '''This is used by the interact() method.
+ '''
+
+ return os.read(fd, 1000)
+
+ def __interact_copy(self, escape_character=None,
+ input_filter=None, output_filter=None):
+
+ '''This is used by the interact() method.
+ '''
+
+ while self.isalive():
+ r, w, e = self.__select([self.child_fd, self.STDIN_FILENO], [], [])
+ if self.child_fd in r:
+ try:
+ data = self.__interact_read(self.child_fd)
+ except OSError as err:
+ if err.args[0] == errno.EIO:
+ # Linux-style EOF
+ break
+ raise
+ if data == b'':
+ # BSD-style EOF
+ break
+ if output_filter:
+ data = output_filter(data)
+ if self.logfile is not None:
+ self.logfile.write(data)
+ self.logfile.flush()
+ os.write(self.STDOUT_FILENO, data)
+ if self.STDIN_FILENO in r:
+ data = self.__interact_read(self.STDIN_FILENO)
+ if input_filter:
+ data = input_filter(data)
+ i = data.rfind(escape_character)
+ if i != -1:
+ data = data[:i]
+ self.__interact_writen(self.child_fd, data)
+ break
+ self.__interact_writen(self.child_fd, data)
+
+ def __select(self, iwtd, owtd, ewtd, timeout=None):
+
+ '''This is a wrapper around select.select() that ignores signals. If
+ select.select raises a select.error exception and errno is an EINTR
+ error then it is ignored. Mainly this is used to ignore sigwinch
+ (terminal resize). '''
+
+ # if select() is interrupted by a signal (errno==EINTR) then
+ # we loop back and enter the select() again.
+ if timeout is not None:
+ end_time = time.time() + timeout
+ while True:
+ try:
+ return select.select(iwtd, owtd, ewtd, timeout)
+ except select.error:
+ err = sys.exc_info()[1]
+ if err.args[0] == errno.EINTR:
+ # if we loop back we have to subtract the
+ # amount of time we already waited.
+ if timeout is not None:
+ timeout = end_time - time.time()
+ if timeout < 0:
+ return([], [], [])
+ else:
+ # something else caused the select.error, so
+ # this actually is an exception.
+ raise
+
+
+class spawnu(SpawnBaseUnicode, spawn):
+ """Works like spawn, but accepts and returns unicode strings.
+
+ Extra parameters:
+
+ :param encoding: The encoding to use for communications (default: 'utf-8')
+ :param errors: How to handle encoding/decoding errors; one of 'strict'
+ (the default), 'ignore', or 'replace', as described
+ for :meth:`~bytes.decode` and :meth:`~str.encode`.
+ """
+ ptyprocess_class = ptyprocess.PtyProcessUnicode
+
+ def _send(self, s):
+ return os.write(self.child_fd, s.encode(self.encoding, self.errors))
+
+ def _log_control(self, byte):
+ s = byte.decode(self.encoding, 'replace')
+ self._log(s, 'send')
diff --git a/pexpect/pxssh.py b/pexpect/pxssh.py
index 9a6edc7..71f56a0 100644
--- a/pexpect/pxssh.py
+++ b/pexpect/pxssh.py
@@ -68,6 +68,14 @@ class pxssh (spawn):
print("pxssh failed on login.")
print(e)
+ Example showing how to specify SSH options::
+
+ import pxssh
+ s = pxssh.pxssh(options={
+ "StrictHostKeyChecking": "no",
+ "UserKnownHostsFile": "/dev/null"})
+ ...
+
Note that if you have ssh-agent running while doing development with pxssh
then this can lead to a lot of confusion. Many X display managers (xdm,
gdm, kdm, etc.) will automatically start a GUI agent. You may see a GUI
@@ -86,9 +94,10 @@ class pxssh (spawn):
'''
def __init__ (self, timeout=30, maxread=2000, searchwindowsize=None,
- logfile=None, cwd=None, env=None, echo=True):
+ logfile=None, cwd=None, env=None, ignore_sighup=True, echo=True,
+ options={}):
- spawn.__init__(self, None, timeout=timeout, maxread=maxread, searchwindowsize=searchwindowsize, logfile=logfile, cwd=cwd, env=env, echo=echo)
+ spawn.__init__(self, None, timeout=timeout, maxread=maxread, searchwindowsize=searchwindowsize, logfile=logfile, cwd=cwd, env=env, ignore_sighup=ignore_sighup, echo=echo)
self.name = '<pxssh>'
@@ -120,6 +129,10 @@ class pxssh (spawn):
#self.SSH_OPTS = "-x -o'RSAAuthentication=no' -o 'PubkeyAuthentication=no'"
self.force_password = False
+ # User defined SSH options, eg,
+ # ssh.otions = dict(StrictHostKeyChecking="no",UserKnownHostsFile="/dev/null")
+ self.options = options
+
def levenshtein_distance(self, a, b):
'''This calculates the Levenshtein distance between a and b.
'''
@@ -165,7 +178,7 @@ class pxssh (spawn):
try:
prompt += self.read_nonblocking(size=1, timeout=timeout)
expired = time.time() - begin # updated total time expired
- timeout = inter_char_timeout
+ timeout = inter_char_timeout
except TIMEOUT:
break
@@ -241,7 +254,7 @@ class pxssh (spawn):
manually set the :attr:`PROMPT` attribute.
'''
- ssh_options = ''
+ ssh_options = ''.join([" -o '%s=%s'" % (o, v) for (o, v) in self.options.items()])
if quiet:
ssh_options = ssh_options + ' -q'
if not check_local_ip:
diff --git a/pexpect/replwrap.py b/pexpect/replwrap.py
index 2e50286..7b0e823 100644
--- a/pexpect/replwrap.py
+++ b/pexpect/replwrap.py
@@ -1,5 +1,6 @@
"""Generic wrapper for read-eval-print-loops, a.k.a. interactive shells
"""
+import os.path
import signal
import sys
import re
@@ -104,7 +105,9 @@ def python(command="python"):
"""Start a Python shell and return a :class:`REPLWrapper` object."""
return REPLWrapper(command, u(">>> "), u("import sys; sys.ps1={0!r}; sys.ps2={1!r}"))
-def bash(command="bash", orig_prompt=re.compile('[$#]')):
+def bash(command="bash"):
"""Start a bash shell and return a :class:`REPLWrapper` object."""
- return REPLWrapper(command, orig_prompt, u("PS1='{0}' PS2='{1}' PROMPT_COMMAND=''"),
+ bashrc = os.path.join(os.path.dirname(__file__), 'bashrc.sh')
+ child = pexpect.spawnu(command, ['--rcfile', bashrc], echo=False)
+ return REPLWrapper(child, u'\$', u("PS1='{0}' PS2='{1}' PROMPT_COMMAND=''"),
extra_init_cmd="export PAGER=cat")
diff --git a/pexpect/spawnbase.py b/pexpect/spawnbase.py
new file mode 100644
index 0000000..d79c5c0
--- /dev/null
+++ b/pexpect/spawnbase.py
@@ -0,0 +1,484 @@
+import codecs
+import os
+import sys
+import re
+import errno
+from .exceptions import ExceptionPexpect, EOF, TIMEOUT
+from .expect import Expecter, searcher_string, searcher_re
+
+PY3 = (sys.version_info[0] >= 3)
+
+class SpawnBase(object):
+ """A base class providing the backwards-compatible spawn API for Pexpect.
+
+ This should not be instantiated directly: use :class:`pexpect.spawn` or :class:`pexpect.fdpexpect.fdspawn`."""
+ string_type = bytes
+ if PY3:
+ allowed_string_types = (bytes, str)
+ linesep = os.linesep.encode('ascii')
+ crlf = '\r\n'.encode('ascii')
+
+ @staticmethod
+ def write_to_stdout(b):
+ try:
+ return sys.stdout.buffer.write(b)
+ except AttributeError:
+ # If stdout has been replaced, it may not have .buffer
+ return sys.stdout.write(b.decode('ascii', 'replace'))
+ else:
+ allowed_string_types = (basestring,) # analysis:ignore
+ linesep = os.linesep
+ crlf = '\r\n'
+ write_to_stdout = sys.stdout.write
+
+ encoding = None
+ pid = None
+ flag_eof = False
+
+ def __init__(self, timeout=30, maxread=2000, searchwindowsize=None, logfile=None):
+ self.stdin = sys.stdin
+ self.stdout = sys.stdout
+ self.stderr = sys.stderr
+
+ self.searcher = None
+ self.ignorecase = False
+ self.before = None
+ self.after = None
+ self.match = None
+ self.match_index = None
+ self.terminated = True
+ self.exitstatus = None
+ self.signalstatus = None
+ # status returned by os.waitpid
+ self.status = None
+ # the child file descriptor is initially closed
+ self.child_fd = -1
+ self.timeout = timeout
+ self.delimiter = EOF
+ self.logfile = logfile
+ # input from child (read_nonblocking)
+ self.logfile_read = None
+ # output to send (send, sendline)
+ self.logfile_send = None
+ # max bytes to read at one time into buffer
+ self.maxread = maxread
+ # This is the read buffer. See maxread.
+ self.buffer = self.string_type()
+ # Data before searchwindowsize point is preserved, but not searched.
+ self.searchwindowsize = searchwindowsize
+ # Delay used before sending data to child. Time in seconds.
+ # Most Linux machines don't like this to be below 0.03 (30 ms).
+ self.delaybeforesend = 0.05
+ # Used by close() to give kernel time to update process status.
+ # Time in seconds.
+ self.delayafterclose = 0.1
+ # Used by terminate() to give kernel time to update process status.
+ # Time in seconds.
+ self.delayafterterminate = 0.1
+ self.softspace = False
+ self.name = '<' + repr(self) + '>'
+ self.closed = True
+
+ def _log(self, s, direction):
+ if self.logfile is not None:
+ self.logfile.write(s)
+ self.logfile.flush()
+ second_log = self.logfile_send if (direction=='send') else self.logfile_read
+ if second_log is not None:
+ second_log.write(s)
+ second_log.flush()
+
+ @staticmethod
+ def _coerce_expect_string(s):
+ if not isinstance(s, bytes):
+ return s.encode('ascii')
+ return s
+
+ @staticmethod
+ def _coerce_send_string(s):
+ if not isinstance(s, bytes):
+ return s.encode('utf-8')
+ return s
+
+ @staticmethod
+ def _coerce_read_string(s):
+ return s
+
+ def read_nonblocking(self, size=1, timeout=None):
+ """This reads data from the file descriptor.
+
+ This is a simple implementation suitable for a regular file. Subclasses using ptys or pipes should override it.
+
+ The timeout parameter is ignored.
+ """
+
+ try:
+ s = os.read(self.child_fd, size)
+ except OSError as err:
+ if err.args[0] == errno.EIO:
+ # Linux-style EOF
+ self.flag_eof = True
+ raise EOF('End Of File (EOF). Exception style platform.')
+ raise
+ if s == b'':
+ # BSD-style EOF
+ self.flag_eof = True
+ raise EOF('End Of File (EOF). Empty string style platform.')
+
+ s = self._coerce_read_string(s)
+ self._log(s, 'read')
+ return s
+
+ def _pattern_type_err(self, pattern):
+ raise TypeError('got {badtype} ({badobj!r}) as pattern, must be one'
+ ' of: {goodtypes}, pexpect.EOF, pexpect.TIMEOUT'\
+ .format(badtype=type(pattern),
+ badobj=pattern,
+ goodtypes=', '.join([str(ast)\
+ for ast in self.allowed_string_types])
+ )
+ )
+
+ def compile_pattern_list(self, patterns):
+ '''This compiles a pattern-string or a list of pattern-strings.
+ Patterns must be a StringType, EOF, TIMEOUT, SRE_Pattern, or a list of
+ those. Patterns may also be None which results in an empty list (you
+ might do this if waiting for an EOF or TIMEOUT condition without
+ expecting any pattern).
+
+ This is used by expect() when calling expect_list(). Thus expect() is
+ nothing more than::
+
+ cpl = self.compile_pattern_list(pl)
+ return self.expect_list(cpl, timeout)
+
+ If you are using expect() within a loop it may be more
+ efficient to compile the patterns first and then call expect_list().
+ This avoid calls in a loop to compile_pattern_list()::
+
+ cpl = self.compile_pattern_list(my_pattern)
+ while some_condition:
+ ...
+ i = self.expect_list(cpl, timeout)
+ ...
+ '''
+
+ if patterns is None:
+ return []
+ if not isinstance(patterns, list):
+ patterns = [patterns]
+
+ # Allow dot to match \n
+ compile_flags = re.DOTALL
+ if self.ignorecase:
+ compile_flags = compile_flags | re.IGNORECASE
+ compiled_pattern_list = []
+ for idx, p in enumerate(patterns):
+ if isinstance(p, self.allowed_string_types):
+ p = self._coerce_expect_string(p)
+ compiled_pattern_list.append(re.compile(p, compile_flags))
+ elif p is EOF:
+ compiled_pattern_list.append(EOF)
+ elif p is TIMEOUT:
+ compiled_pattern_list.append(TIMEOUT)
+ elif isinstance(p, type(re.compile(''))):
+ compiled_pattern_list.append(p)
+ else:
+ self._pattern_type_err(p)
+ return compiled_pattern_list
+
+ def expect(self, pattern, timeout=-1, searchwindowsize=-1, async=False):
+ '''This seeks through the stream until a pattern is matched. The
+ pattern is overloaded and may take several types. The pattern can be a
+ StringType, EOF, a compiled re, or a list of any of those types.
+ Strings will be compiled to re types. This returns the index into the
+ pattern list. If the pattern was not a list this returns index 0 on a
+ successful match. This may raise exceptions for EOF or TIMEOUT. To
+ avoid the EOF or TIMEOUT exceptions add EOF or TIMEOUT to the pattern
+ list. That will cause expect to match an EOF or TIMEOUT condition
+ instead of raising an exception.
+
+ If you pass a list of patterns and more than one matches, the first
+ match in the stream is chosen. If more than one pattern matches at that
+ point, the leftmost in the pattern list is chosen. For example::
+
+ # the input is 'foobar'
+ index = p.expect(['bar', 'foo', 'foobar'])
+ # returns 1('foo') even though 'foobar' is a "better" match
+
+ Please note, however, that buffering can affect this behavior, since
+ input arrives in unpredictable chunks. For example::
+
+ # the input is 'foobar'
+ index = p.expect(['foobar', 'foo'])
+ # returns 0('foobar') if all input is available at once,
+ # but returs 1('foo') if parts of the final 'bar' arrive late
+
+ After a match is found the instance attributes 'before', 'after' and
+ 'match' will be set. You can see all the data read before the match in
+ 'before'. You can see the data that was matched in 'after'. The
+ re.MatchObject used in the re match will be in 'match'. If an error
+ occurred then 'before' will be set to all the data read so far and
+ 'after' and 'match' will be None.
+
+ If timeout is -1 then timeout will be set to the self.timeout value.
+
+ A list entry may be EOF or TIMEOUT instead of a string. This will
+ catch these exceptions and return the index of the list entry instead
+ of raising the exception. The attribute 'after' will be set to the
+ exception type. The attribute 'match' will be None. This allows you to
+ write code like this::
+
+ index = p.expect(['good', 'bad', pexpect.EOF, pexpect.TIMEOUT])
+ if index == 0:
+ do_something()
+ elif index == 1:
+ do_something_else()
+ elif index == 2:
+ do_some_other_thing()
+ elif index == 3:
+ do_something_completely_different()
+
+ instead of code like this::
+
+ try:
+ index = p.expect(['good', 'bad'])
+ if index == 0:
+ do_something()
+ elif index == 1:
+ do_something_else()
+ except EOF:
+ do_some_other_thing()
+ except TIMEOUT:
+ do_something_completely_different()
+
+ These two forms are equivalent. It all depends on what you want. You
+ can also just expect the EOF if you are waiting for all output of a
+ child to finish. For example::
+
+ p = pexpect.spawn('/bin/ls')
+ p.expect(pexpect.EOF)
+ print p.before
+
+ If you are trying to optimize for speed then see expect_list().
+
+ On Python 3.4, or Python 3.3 with asyncio installed, passing
+ ``async=True`` will make this return an :mod:`asyncio` coroutine,
+ which you can yield from to get the same result that this method would
+ normally give directly. So, inside a coroutine, you can replace this code::
+
+ index = p.expect(patterns)
+
+ With this non-blocking form::
+
+ index = yield from p.expect(patterns, async=True)
+ '''
+
+ compiled_pattern_list = self.compile_pattern_list(pattern)
+ return self.expect_list(compiled_pattern_list,
+ timeout, searchwindowsize, async)
+
+ def expect_list(self, pattern_list, timeout=-1, searchwindowsize=-1,
+ async=False):
+ '''This takes a list of compiled regular expressions and returns the
+ index into the pattern_list that matched the child output. The list may
+ also contain EOF or TIMEOUT(which are not compiled regular
+ expressions). This method is similar to the expect() method except that
+ expect_list() does not recompile the pattern list on every call. This
+ may help if you are trying to optimize for speed, otherwise just use
+ the expect() method. This is called by expect(). If timeout==-1 then
+ the self.timeout value is used. If searchwindowsize==-1 then the
+ self.searchwindowsize value is used.
+
+ Like :meth:`expect`, passing ``async=True`` will make this return an
+ asyncio coroutine.
+ '''
+ if timeout == -1:
+ timeout = self.timeout
+
+ exp = Expecter(self, searcher_re(pattern_list), searchwindowsize)
+ if async:
+ from .async import expect_async
+ return expect_async(exp, timeout)
+ else:
+ return exp.expect_loop(timeout)
+
+ def expect_exact(self, pattern_list, timeout=-1, searchwindowsize=-1,
+ async=False):
+
+ '''This is similar to expect(), but uses plain string matching instead
+ of compiled regular expressions in 'pattern_list'. The 'pattern_list'
+ may be a string; a list or other sequence of strings; or TIMEOUT and
+ EOF.
+
+ This call might be faster than expect() for two reasons: string
+ searching is faster than RE matching and it is possible to limit the
+ search to just the end of the input buffer.
+
+ This method is also useful when you don't want to have to worry about
+ escaping regular expression characters that you want to match.
+
+ Like :meth:`expect`, passing ``async=True`` will make this return an
+ asyncio coroutine.
+ '''
+ if timeout == -1:
+ timeout = self.timeout
+
+ if (isinstance(pattern_list, self.allowed_string_types) or
+ pattern_list in (TIMEOUT, EOF)):
+ pattern_list = [pattern_list]
+
+ def prepare_pattern(pattern):
+ if pattern in (TIMEOUT, EOF):
+ return pattern
+ if isinstance(pattern, self.allowed_string_types):
+ return self._coerce_expect_string(pattern)
+ self._pattern_type_err(pattern)
+
+ try:
+ pattern_list = iter(pattern_list)
+ except TypeError:
+ self._pattern_type_err(pattern_list)
+ pattern_list = [prepare_pattern(p) for p in pattern_list]
+
+ exp = Expecter(self, searcher_string(pattern_list), searchwindowsize)
+ if async:
+ from .async import expect_async
+ return expect_async(exp, timeout)
+ else:
+ return exp.expect_loop(timeout)
+
+ def expect_loop(self, searcher, timeout=-1, searchwindowsize=-1):
+ '''This is the common loop used inside expect. The 'searcher' should be
+ an instance of searcher_re or searcher_string, which describes how and
+ what to search for in the input.
+
+ See expect() for other arguments, return value and exceptions. '''
+
+ exp = Expecter(self, searcher, searchwindowsize)
+ return exp.expect_loop(timeout)
+
+ def read(self, size=-1):
+ '''This reads at most "size" bytes from the file (less if the read hits
+ EOF before obtaining size bytes). If the size argument is negative or
+ omitted, read all data until EOF is reached. The bytes are returned as
+ a string object. An empty string is returned when EOF is encountered
+ immediately. '''
+
+ if size == 0:
+ return self.string_type()
+ if size < 0:
+ # delimiter default is EOF
+ self.expect(self.delimiter)
+ return self.before
+
+ # I could have done this more directly by not using expect(), but
+ # I deliberately decided to couple read() to expect() so that
+ # I would catch any bugs early and ensure consistant behavior.
+ # It's a little less efficient, but there is less for me to
+ # worry about if I have to later modify read() or expect().
+ # Note, it's OK if size==-1 in the regex. That just means it
+ # will never match anything in which case we stop only on EOF.
+ cre = re.compile(self._coerce_expect_string('.{%d}' % size), re.DOTALL)
+ # delimiter default is EOF
+ index = self.expect([cre, self.delimiter])
+ if index == 0:
+ ### FIXME self.before should be ''. Should I assert this?
+ return self.after
+ return self.before
+
+ def readline(self, size=-1):
+ '''This reads and returns one entire line. The newline at the end of
+ line is returned as part of the string, unless the file ends without a
+ newline. An empty string is returned if EOF is encountered immediately.
+ This looks for a newline as a CR/LF pair (\\r\\n) even on UNIX because
+ this is what the pseudotty device returns. So contrary to what you may
+ expect you will receive newlines as \\r\\n.
+
+ If the size argument is 0 then an empty string is returned. In all
+ other cases the size argument is ignored, which is not standard
+ behavior for a file-like object. '''
+
+ if size == 0:
+ return self.string_type()
+ # delimiter default is EOF
+ index = self.expect([self.crlf, self.delimiter])
+ if index == 0:
+ return self.before + self.crlf
+ else:
+ return self.before
+
+ def __iter__(self):
+ '''This is to support iterators over a file-like object.
+ '''
+ return iter(self.readline, self.string_type())
+
+ def readlines(self, sizehint=-1):
+ '''This reads until EOF using readline() and returns a list containing
+ the lines thus read. The optional 'sizehint' argument is ignored.
+ Remember, because this reads until EOF that means the child
+ process should have closed its stdout. If you run this method on
+ a child that is still running with its stdout open then this
+ method will block until it timesout.'''
+
+ lines = []
+ while True:
+ line = self.readline()
+ if not line:
+ break
+ lines.append(line)
+ return lines
+
+ def fileno(self):
+ '''Expose file descriptor for a file-like interface
+ '''
+ return self.child_fd
+
+ def flush(self):
+ '''This does nothing. It is here to support the interface for a
+ File-like object. '''
+ pass
+
+ def isatty(self):
+ """Overridden in subclass using tty"""
+ return False
+
+ # For 'with spawn(...) as child:'
+ def __enter__(self):
+ return self
+
+ def __exit__(self, etype, evalue, tb):
+ # We rely on subclasses to implement close(). If they don't, it's not
+ # clear what a context manager should do.
+ self.close()
+
+class SpawnBaseUnicode(SpawnBase):
+ if PY3:
+ string_type = str
+ allowed_string_types = (str, )
+ linesep = os.linesep
+ crlf = '\r\n'
+ else:
+ string_type = unicode
+ allowed_string_types = (unicode, )
+ linesep = os.linesep.decode('ascii')
+ crlf = '\r\n'.decode('ascii')
+ # This can handle unicode in both Python 2 and 3
+ write_to_stdout = sys.stdout.write
+
+ def __init__(self, *args, **kwargs):
+ self.encoding = kwargs.pop('encoding', 'utf-8')
+ self.errors = kwargs.pop('errors', 'strict')
+ self._decoder = codecs.getincrementaldecoder(self.encoding)(errors=self.errors)
+ super(SpawnBaseUnicode, self).__init__(*args, **kwargs)
+
+ @staticmethod
+ def _coerce_expect_string(s):
+ return s
+
+ @staticmethod
+ def _coerce_send_string(s):
+ return s
+
+ def _coerce_read_string(self, s):
+ return self._decoder.decode(s, final=False) \ No newline at end of file
diff --git a/pexpect/utils.py b/pexpect/utils.py
new file mode 100644
index 0000000..737f0ed
--- /dev/null
+++ b/pexpect/utils.py
@@ -0,0 +1,112 @@
+import os
+import sys
+import stat
+
+
+def is_executable_file(path):
+ """Checks that path is an executable regular file, or a symlink towards one.
+
+ This is roughly ``os.path isfile(path) and os.access(path, os.X_OK)``.
+ """
+ # follow symlinks,
+ fpath = os.path.realpath(path)
+
+ if not os.path.isfile(fpath):
+ # non-files (directories, fifo, etc.)
+ return False
+
+ mode = os.stat(fpath).st_mode
+
+ if (sys.platform.startswith('sunos')
+ and os.getuid() == 0):
+ # When root on Solaris, os.X_OK is True for *all* files, irregardless
+ # of their executability -- instead, any permission bit of any user,
+ # group, or other is fine enough.
+ #
+ # (This may be true for other "Unix98" OS's such as HP-UX and AIX)
+ return bool(mode & (stat.S_IXUSR |
+ stat.S_IXGRP |
+ stat.S_IXOTH))
+
+ return os.access(fpath, os.X_OK)
+
+
+def which(filename):
+ '''This takes a given filename; tries to find it in the environment path;
+ then checks if it is executable. This returns the full path to the filename
+ if found and executable. Otherwise this returns None.'''
+
+ # Special case where filename contains an explicit path.
+ if os.path.dirname(filename) != '' and is_executable_file(filename):
+ return filename
+ if 'PATH' not in os.environ or os.environ['PATH'] == '':
+ p = os.defpath
+ else:
+ p = os.environ['PATH']
+ pathlist = p.split(os.pathsep)
+ for path in pathlist:
+ ff = os.path.join(path, filename)
+ if is_executable_file(ff):
+ return ff
+ return None
+
+
+def split_command_line(command_line):
+
+ '''This splits a command line into a list of arguments. It splits arguments
+ on spaces, but handles embedded quotes, doublequotes, and escaped
+ characters. It's impossible to do this with a regular expression, so I
+ wrote a little state machine to parse the command line. '''
+
+ arg_list = []
+ arg = ''
+
+ # Constants to name the states we can be in.
+ state_basic = 0
+ state_esc = 1
+ state_singlequote = 2
+ state_doublequote = 3
+ # The state when consuming whitespace between commands.
+ state_whitespace = 4
+ state = state_basic
+
+ for c in command_line:
+ if state == state_basic or state == state_whitespace:
+ if c == '\\':
+ # Escape the next character
+ state = state_esc
+ elif c == r"'":
+ # Handle single quote
+ state = state_singlequote
+ elif c == r'"':
+ # Handle double quote
+ state = state_doublequote
+ elif c.isspace():
+ # Add arg to arg_list if we aren't in the middle of whitespace.
+ if state == state_whitespace:
+ # Do nothing.
+ None
+ else:
+ arg_list.append(arg)
+ arg = ''
+ state = state_whitespace
+ else:
+ arg = arg + c
+ state = state_basic
+ elif state == state_esc:
+ arg = arg + c
+ state = state_basic
+ elif state == state_singlequote:
+ if c == r"'":
+ state = state_basic
+ else:
+ arg = arg + c
+ elif state == state_doublequote:
+ if c == r'"':
+ state = state_basic
+ else:
+ arg = arg + c
+
+ if arg != '':
+ arg_list.append(arg)
+ return arg_list
diff --git a/setup.cfg b/setup.cfg
new file mode 100644
index 0000000..ae62686
--- /dev/null
+++ b/setup.cfg
@@ -0,0 +1,2 @@
+[pytest]
+norecursedirs = .git
diff --git a/setup.py b/setup.py
index d4136af..126749a 100644
--- a/setup.py
+++ b/setup.py
@@ -1,6 +1,15 @@
from distutils.core import setup
+import os
+import re
-from pexpect import __version__
+with open(os.path.join(os.path.dirname(__file__), 'pexpect', '__init__.py'), 'r') as f:
+ for line in f:
+ version_match = re.search(r"__version__ = ['\"]([^'\"]*)['\"]", line)
+ if version_match:
+ version = version_match.group(1)
+ break
+ else:
+ raise Exception("couldn't find version number")
long_description = """
Pexpect is a pure Python module for spawning child applications; controlling
@@ -19,7 +28,7 @@ The Pexpect interface was designed to be easy to use.
"""
setup (name='pexpect',
- version=__version__,
+ version=version,
py_modules=['pxssh', 'fdpexpect', 'FSM', 'screen', 'ANSI'],
packages=['pexpect'],
description='Pexpect allows easy control of interactive console applications.',
@@ -52,4 +61,5 @@ setup (name='pexpect',
'Topic :: System :: Software Distribution',
'Topic :: Terminals',
],
+ install_requires=['ptyprocess'],
)
diff --git a/tests/PexpectTestCase.py b/tests/PexpectTestCase.py
index 7a9574e..307437e 100644
--- a/tests/PexpectTestCase.py
+++ b/tests/PexpectTestCase.py
@@ -22,26 +22,68 @@ from __future__ import print_function
import contextlib
import unittest
+import signal
import sys
import os
+
class PexpectTestCase(unittest.TestCase):
def setUp(self):
self.PYTHONBIN = sys.executable
self.original_path = os.getcwd()
tests_dir = os.path.dirname(__file__)
self.project_dir = project_dir = os.path.dirname(tests_dir)
+
+ # all tests are executed in this folder; there are many auxiliary
+ # programs in this folder executed by spawn().
os.chdir(tests_dir)
- os.environ['COVERAGE_PROCESS_START'] = os.path.join(project_dir, '.coveragerc')
+
+ # If the pexpect raises an exception after fork(), but before
+ # exec(), our test runner *also* forks. We prevent this by
+ # storing our pid and asserting equality on tearDown.
+ self.pid = os.getpid()
+
+ coverage_rc = os.path.join(project_dir, '.coveragerc')
+ os.environ['COVERAGE_PROCESS_START'] = coverage_rc
os.environ['COVERAGE_FILE'] = os.path.join(project_dir, '.coverage')
print('\n', self.id(), end=' ')
sys.stdout.flush()
+
+ # some build agents will ignore SIGHUP and SIGINT, which python
+ # inherits. This causes some of the tests related to terminate()
+ # to fail. We set them to the default handlers that they should
+ # be, and restore them back to their SIG_IGN value on tearDown.
+ #
+ # I'm not entirely convinced they need to be restored, only our
+ # test runner is affected.
+ self.restore_ignored_signals = [
+ value for value in (signal.SIGHUP, signal.SIGINT,)
+ if signal.getsignal(value) == signal.SIG_IGN]
+ if signal.SIGHUP in self.restore_ignored_signals:
+ # sighup should be set to default handler
+ signal.signal(signal.SIGHUP, signal.SIG_DFL)
+ if signal.SIGINT in self.restore_ignored_signals:
+ # SIGINT should be set to signal.default_int_handler
+ signal.signal(signal.SIGINT, signal.default_int_handler)
unittest.TestCase.setUp(self)
def tearDown(self):
- os.chdir (self.original_path)
+ # restore original working folder
+ os.chdir(self.original_path)
+
+ if self.pid != os.getpid():
+ # The build server pattern-matches phrase 'Test runner has forked!'
+ print("Test runner has forked! This means a child process raised "
+ "an exception before exec() in a test case, the error is "
+ "more than likely found above this line in stderr.",
+ file=sys.stderr)
+ exit(1)
+
+ # restore signal handlers
+ for signal_value in self.restore_ignored_signals:
+ signal.signal(signal_value, signal.SIG_IGN)
- if sys.version_info < (2,7):
+ if sys.version_info < (2, 7):
# We want to use these methods, which are new/improved in 2.7, but
# we are still supporting 2.6 for the moment. This section can be
# removed when we drop Python 2.6 support.
diff --git a/tests/README b/tests/README
index 295632b..ef5b613 100644
--- a/tests/README
+++ b/tests/README
@@ -1,18 +1,8 @@
-The best way to run these tests is from the directory above this one. Source
-the test.env environment file. This will make sure that you are using the
-correct pexpect.py file otherwise Python might try to import a different
-version if it is already installed in this environment. Then run the testall.py
-script in the tools/ directory. This script will automatically build a test
-suite from all the test scripts in the tests/ directory. This allows you to add
-new test scripts simply by dropping them in the tests/ directory. You don't
-have to register the test or do anything else to integrate it into the test
-suite.
+The best way to run these tests is from the directory above this one. Run:
-For example, this is the normal set of commands you would use to run all tests
-in the tests/ directory:
+ py.test
- $ cd /home/user/pexpect_dev/
- $ . test.env
- $ ./tools/testall.py
+To run a specific test file:
+ py.test tests/test_constructor.py
diff --git a/tests/test_constructor.py b/tests/test_constructor.py
index 60525a0..98c473a 100755
--- a/tests/test_constructor.py
+++ b/tests/test_constructor.py
@@ -28,11 +28,11 @@ class TestCaseConstructor(PexpectTestCase.PexpectTestCase):
the same results for different styles of invoking __init__().
This assumes that the root directory / is static during the test.
'''
- p1 = pexpect.spawn('/bin/ls -l /bin')
- p2 = pexpect.spawn('/bin/ls' ,['-l', '/bin'])
- p1.expect (pexpect.EOF)
- p2.expect (pexpect.EOF)
- assert (p1.before == p2.before)
+ p1 = pexpect.spawn('uname -m -n -p -r -s -v')
+ p2 = pexpect.spawn('uname', ['-m', '-n', '-p', '-r', '-s', '-v'])
+ p1.expect(pexpect.EOF)
+ p2.expect(pexpect.EOF)
+ assert p1.before == p2.before
def test_named_parameters (self):
'''This tests that named parameters work.
diff --git a/tests/test_ctrl_chars.py b/tests/test_ctrl_chars.py
index 9c7b869..10d03db 100755
--- a/tests/test_ctrl_chars.py
+++ b/tests/test_ctrl_chars.py
@@ -26,6 +26,9 @@ from . import PexpectTestCase
import time
import sys
+from ptyprocess import ptyprocess
+ptyprocess._make_eof_intr()
+
if sys.version_info[0] >= 3:
def byte(i):
return bytes([i])
@@ -54,7 +57,7 @@ class TestCtrlChars(PexpectTestCase.PexpectTestCase):
child = pexpect.spawn('python getch.py', echo=False, timeout=5)
child.expect('READY')
child.sendintr()
- child.expect(str(child._INTR) + '<STOP>')
+ child.expect(str(ord(ptyprocess._INTR)) + '<STOP>')
child.send(byte(0))
child.expect('0<STOP>')
@@ -66,7 +69,7 @@ class TestCtrlChars(PexpectTestCase.PexpectTestCase):
child = pexpect.spawn('python getch.py', echo=False, timeout=5)
child.expect('READY')
child.sendeof()
- child.expect(str(child._EOF) + '<STOP>')
+ child.expect(str(ord(ptyprocess._EOF)) + '<STOP>')
child.send(byte(0))
child.expect('0<STOP>')
diff --git a/tests/test_maxcanon.py b/tests/test_maxcanon.py
new file mode 100644
index 0000000..bbd08f3
--- /dev/null
+++ b/tests/test_maxcanon.py
@@ -0,0 +1,152 @@
+""" Module for canonical-mode tests. """
+import sys
+import os
+
+
+import pexpect
+from . import PexpectTestCase
+
+
+class TestCaseCanon(PexpectTestCase.PexpectTestCase):
+ """
+ Test expected Canonical mode behavior (limited input line length).
+
+ All systems use the value of MAX_CANON which can be found using
+ fpathconf(3) value PC_MAX_CANON -- with the exception of Linux.
+
+ Linux, though defining a value of 255, actually honors the value
+ of 4096 from linux kernel include file tty.h definition
+ N_TTY_BUF_SIZE.
+
+ Linux also does not honor IMAXBEL. termios(3) states, "Linux does not
+ implement this bit, and acts as if it is always set." Although these
+ tests ensure it is enabled, this is a non-op for Linux.
+
+ These tests only ensure the correctness of the behavior described by
+ the sendline() docstring. pexpect is not particularly involved in
+ these scenarios, though if we wish to expose some kind of interface
+ to tty.setraw, for example, these tests may be re-purposed as such.
+
+ Lastly, portions of these tests are skipped on Travis-CI. It produces
+ unexpected behavior not reproduced on Debian/GNU Linux.
+ """
+
+ def setUp(self):
+ super(TestCaseCanon, self).setUp()
+
+ self.echo = False
+ if sys.platform.lower().startswith('linux'):
+ # linux is 4096, N_TTY_BUF_SIZE.
+ self.max_input = 4096
+ self.echo = True
+ elif sys.platform.lower().startswith('sunos'):
+ # SunOS allows PC_MAX_CANON + 1; see
+ # https://bitbucket.org/illumos/illumos-gate/src/d07a59219ab7fd2a7f39eb47c46cf083c88e932f/usr/src/uts/common/io/ldterm.c?at=default#cl-1888
+ self.max_input = os.fpathconf(0, 'PC_MAX_CANON') + 1
+ else:
+ # All others (probably) limit exactly at PC_MAX_CANON
+ self.max_input = os.fpathconf(0, 'PC_MAX_CANON')
+
+ def test_under_max_canon(self):
+ " BEL is not sent by terminal driver at maximum bytes - 1. "
+ # given,
+ child = pexpect.spawn('bash', echo=self.echo, timeout=5)
+ child.sendline('echo READY')
+ child.sendline('stty icanon imaxbel')
+ child.sendline('echo BEGIN; cat')
+
+ # some systems BEL on (maximum - 1), not able to receive CR,
+ # even though all characters up until then were received, they
+ # simply cannot be transmitted, as CR is part of the transmission.
+ send_bytes = self.max_input - 1
+
+ # exercise,
+ child.sendline('_' * send_bytes)
+
+ # fast forward beyond 'cat' command, as ^G can be found as part of
+ # set-xterm-title sequence of $PROMPT_COMMAND or $PS1.
+ child.expect_exact('BEGIN')
+
+ # verify, all input is found in echo output,
+ child.expect_exact('_' * send_bytes)
+
+ # BEL is not found,
+ with self.assertRaises(pexpect.TIMEOUT, timeout=5):
+ child.expect_exact('\a')
+
+ # cleanup,
+ child.sendeof() # exit cat(1)
+ child.sendline('exit 0') # exit bash(1)
+ child.expect(pexpect.EOF)
+ assert not child.isalive()
+ assert child.exitstatus == 0
+
+ def test_beyond_max_icanon(self):
+ " a single BEL is sent when maximum bytes is reached. "
+ # given,
+ child = pexpect.spawn('bash', echo=self.echo, timeout=5)
+ child.sendline('stty icanon imaxbel erase ^H')
+ child.sendline('cat')
+ send_bytes = self.max_input
+
+ # exercise,
+ child.sendline('_' * send_bytes)
+ child.expect_exact('\a')
+
+ # exercise, we must now backspace to send CR.
+ child.sendcontrol('h')
+ child.sendline()
+
+ if os.environ.get('TRAVIS', None) == 'true':
+ # Travis-CI has intermittent behavior here, possibly
+ # because the master process is itself, a PTY?
+ return
+
+ # verify the length of (maximum - 1) received by cat(1),
+ # which has written it back out,
+ child.expect_exact('_' * (send_bytes - 1))
+ # and not a byte more.
+ with self.assertRaises(pexpect.TIMEOUT):
+ child.expect_exact('_', timeout=1)
+
+ # cleanup,
+ child.sendeof() # exit cat(1)
+ child.sendline('exit 0') # exit bash(1)
+ child.expect_exact(pexpect.EOF)
+ assert not child.isalive()
+ assert child.exitstatus == 0
+
+ def test_max_no_icanon(self):
+ " may exceed maximum input bytes if canonical mode is disabled. "
+ # given,
+ child = pexpect.spawn('bash', echo=self.echo, timeout=5)
+ child.sendline('stty -icanon imaxbel')
+ child.sendline('echo BEGIN; cat')
+ send_bytes = self.max_input + 11
+
+ # exercise,
+ child.sendline('_' * send_bytes)
+
+ # fast forward beyond 'cat' command, as ^G can be found as part of
+ # set-xterm-title sequence of $PROMPT_COMMAND or $PS1.
+ child.expect_exact('BEGIN')
+
+ if os.environ.get('TRAVIS', None) == 'true':
+ # Travis-CI has intermittent behavior here, possibly
+ # because the master process is itself, a PTY?
+ return
+
+ # BEL is *not* found,
+ with self.assertRaises(pexpect.TIMEOUT):
+ child.expect_exact('\a', timeout=1)
+
+ # verify, all input is found in output,
+ child.expect_exact('_' * send_bytes)
+
+ # cleanup,
+ child.sendcontrol('c') # exit cat(1) (eof wont work in -icanon)
+ child.sendcontrol('c')
+ child.sendline('exit 0') # exit bash(1)
+ child.expect(pexpect.EOF)
+ assert not child.isalive()
+ assert child.exitstatus == 0
diff --git a/tests/test_misc.py b/tests/test_misc.py
index a2245aa..d5a707c 100755
--- a/tests/test_misc.py
+++ b/tests/test_misc.py
@@ -141,6 +141,16 @@ class TestCaseMisc(PexpectTestCase.PexpectTestCase):
with self.assertRaises(pexpect.EOF):
child.expect('the unexpected')
+ def test_with(self):
+ "spawn can be used as a context manager"
+ with pexpect.spawn(sys.executable + ' echo_w_prompt.py') as p:
+ p.expect('<in >')
+ p.sendline(b'alpha')
+ p.expect(b'<out>alpha')
+ assert p.isalive()
+
+ assert not p.isalive()
+
def test_terminate(self):
" test force terminate always succeeds (SIGKILL). "
child = pexpect.spawn('cat')
@@ -149,41 +159,24 @@ class TestCaseMisc(PexpectTestCase.PexpectTestCase):
def test_sighup(self):
" validate argument `ignore_sighup=True` and `ignore_sighup=False`. "
- # If a parent process sets an Ignore handler for SIGHUP (as on Fedora's
- # build machines), this test breaks. We temporarily restore the default
- # handler, so the child process will quit. However, we can't simply
- # replace any installed handler, because getsignal returns None for
- # handlers not set in Python code, so we wouldn't be able to restore
- # them.
- if signal.getsignal(signal.SIGHUP) == signal.SIG_IGN:
- signal.signal(signal.SIGHUP, signal.SIG_DFL)
- restore_sig_ign = True
- else:
- restore_sig_ign = False
-
getch = sys.executable + ' getch.py'
- try:
- child = pexpect.spawn(getch, ignore_sighup=True)
- child.expect('READY')
- child.kill(signal.SIGHUP)
- for _ in range(10):
- if not child.isalive():
- self.fail('Child process should not have exited.')
- time.sleep(0.1)
-
- child = pexpect.spawn(getch, ignore_sighup=False)
- child.expect('READY')
- child.kill(signal.SIGHUP)
- for _ in range(10):
- if not child.isalive():
- break
- time.sleep(0.1)
- else:
- self.fail('Child process should have exited.')
-
- finally:
- if restore_sig_ign:
- signal.signal(signal.SIGHUP, signal.SIG_IGN)
+ child = pexpect.spawn(getch, ignore_sighup=True)
+ child.expect('READY')
+ child.kill(signal.SIGHUP)
+ for _ in range(10):
+ if not child.isalive():
+ self.fail('Child process should not have exited.')
+ time.sleep(0.1)
+
+ child = pexpect.spawn(getch, ignore_sighup=False)
+ child.expect('READY')
+ child.kill(signal.SIGHUP)
+ for _ in range(10):
+ if not child.isalive():
+ break
+ time.sleep(0.1)
+ else:
+ self.fail('Child process should have exited.')
def test_bad_child_pid(self):
" assert bad condition error in isalive(). "
@@ -191,7 +184,7 @@ class TestCaseMisc(PexpectTestCase.PexpectTestCase):
child = pexpect.spawn('cat')
child.terminate(force=1)
# Force an invalid state to test isalive
- child.terminated = 0
+ child.ptyproc.terminated = 0
try:
with self.assertRaisesRegexp(pexpect.ExceptionPexpect,
".*" + expect_errmsg):
@@ -324,9 +317,9 @@ class TestCaseMisc(PexpectTestCase.PexpectTestCase):
" test forced self.__fork_pty() and __pty_make_controlling_tty "
# given,
class spawn_ourptyfork(pexpect.spawn):
- def _spawn(self, command, args=[]):
+ def _spawn(self, command, args=[], preexec_fn=None):
self.use_native_pty_fork = False
- pexpect.spawn._spawn(self, command, args)
+ pexpect.spawn._spawn(self, command, args, preexec_fn)
# exercise,
p = spawn_ourptyfork('cat', echo=False)
diff --git a/tests/test_replwrap.py b/tests/test_replwrap.py
index 14f7c39..28c7599 100644
--- a/tests/test_replwrap.py
+++ b/tests/test_replwrap.py
@@ -26,7 +26,7 @@ class REPLWrapTestCase(unittest.TestCase):
assert 'real' in res, res
# PAGER should be set to cat, otherwise man hangs
- res = bash.run_command('man sleep', timeout=2)
+ res = bash.run_command('man sleep', timeout=5)
assert 'SLEEP' in res, res
def test_multiline(self):
diff --git a/tests/test_repr.py b/tests/test_repr.py
new file mode 100644
index 0000000..ce618d4
--- /dev/null
+++ b/tests/test_repr.py
@@ -0,0 +1,26 @@
+""" Test __str__ methods. """
+import pexpect
+
+from . import PexpectTestCase
+
+
+class TestCaseMisc(PexpectTestCase.PexpectTestCase):
+
+ def test_str_spawnu(self):
+ """ Exercise spawnu.__str__() """
+ # given,
+ p = pexpect.spawnu('cat')
+ # exercise,
+ value = str(p)
+ # verify
+ assert isinstance(value, str)
+
+ def test_str_spawn(self):
+ """ Exercise spawn.__str__() """
+ # given,
+ p = pexpect.spawn('cat')
+ # exercise,
+ value = str(p)
+ # verify
+ assert isinstance(value, str)
+
diff --git a/tests/test_run.py b/tests/test_run.py
index 814b70a..c018b4d 100755
--- a/tests/test_run.py
+++ b/tests/test_run.py
@@ -22,14 +22,11 @@ PEXPECT LICENSE
import pexpect
import unittest
import subprocess
+import tempfile
import sys
+import os
from . import PexpectTestCase
-# TODO Many of these test cases blindly assume that sequential
-# TODO listing of the /bin directory will yield the same results.
-# TODO This may not always be true, but seems adequate for testing for now.
-# TODO I should fix this at some point.
-
unicode_type = str if pexpect.PY3 else unicode
def timeout_callback (d):
@@ -44,14 +41,24 @@ class RunFuncTestCase(PexpectTestCase.PexpectTestCase):
empty = b''
prep_subprocess_out = staticmethod(lambda x: x)
+ def setUp(self):
+ fd, self.rcfile = tempfile.mkstemp()
+ os.write(fd, b'PS1=GO: \n')
+ os.close(fd)
+ super(RunFuncTestCase, self).setUp()
+
+ def tearDown(self):
+ os.unlink(self.rcfile)
+ super(RunFuncTestCase, self).tearDown()
+
def test_run_exit (self):
(data, exitstatus) = self.runfunc('python exit1.py', withexitstatus=1)
assert exitstatus == 1, "Exit status of 'python exit1.py' should be 1."
def test_run (self):
- the_old_way = subprocess.Popen(args=['ls', '-l', '/bin'],
+ the_old_way = subprocess.Popen(args=['uname', '-m', '-n'],
stdout=subprocess.PIPE).communicate()[0].rstrip()
- (the_new_way, exitstatus) = self.runfunc('ls -l /bin', withexitstatus=1)
+ (the_new_way, exitstatus) = self.runfunc('uname -m -n', withexitstatus=1)
the_new_way = the_new_way.replace(self.cr, self.empty).rstrip()
self.assertEqual(self.prep_subprocess_out(the_old_way), the_new_way)
self.assertEqual(exitstatus, 0)
@@ -64,6 +71,23 @@ class RunFuncTestCase(PexpectTestCase.PexpectTestCase):
withexitstatus=1)
assert exitstatus != 0
+ def test_run_tuple_list (self):
+ events = [
+ # second match on 'abc', echo 'def'
+ ('abc\r\n.*GO:', 'echo "def"\n'),
+ # final match on 'def': exit
+ ('def\r\n.*GO:', 'exit\n'),
+ # first match on 'GO:' prompt, echo 'abc'
+ ('GO:', 'echo "abc"\n')
+ ]
+
+ (data, exitstatus) = pexpect.run(
+ 'bash --rcfile {0}'.format(self.rcfile),
+ withexitstatus=True,
+ events=events,
+ timeout=10)
+ assert exitstatus == 0
+
class RunUnicodeFuncTestCase(RunFuncTestCase):
runfunc = staticmethod(pexpect.runu)
cr = b'\r'.decode('ascii')
diff --git a/tests/test_which.py b/tests/test_which.py
index 83575fb..bda3333 100644
--- a/tests/test_which.py
+++ b/tests/test_which.py
@@ -1,9 +1,14 @@
+import subprocess
import tempfile
+import shutil
+import errno
import os
import pexpect
from . import PexpectTestCase
+import pytest
+
class TestCaseWhich(PexpectTestCase.PexpectTestCase):
" Tests for pexpect.which(). "
@@ -162,27 +167,101 @@ class TestCaseWhich(PexpectTestCase.PexpectTestCase):
try:
# setup
os.environ['PATH'] = bin_dir
- with open(bin_path, 'w') as fp:
- fp.write('#!/bin/sh\necho hello, world\n')
- for should_match, mode in ((False, 0o000),
- (True, 0o005),
- (True, 0o050),
- (True, 0o500),
- (False, 0o004),
- (False, 0o040),
- (False, 0o400)):
+
+ # an interpreted script requires the ability to read,
+ # whereas a binary program requires only to be executable.
+ #
+ # to gain access to a binary program, we make a copy of
+ # the existing system program echo(1).
+ bin_echo = None
+ for pth in ('/bin/echo', '/usr/bin/echo'):
+ if os.path.exists(pth):
+ bin_echo = pth
+ break
+ bin_which = None
+ for pth in ('/bin/which', '/usr/bin/which'):
+ if os.path.exists(pth):
+ bin_which = pth
+ break
+ if not bin_echo or not bin_which:
+ pytest.skip('needs `echo` and `which` binaries')
+ shutil.copy(bin_echo, bin_path)
+ isroot = os.getuid() == 0
+ for should_match, mode in (
+ # note that although the file may have matching 'group' or
+ # 'other' executable permissions, it is *not* executable
+ # because the current uid is the owner of the file -- which
+ # takes precedence
+ (False, 0o000), # ----------, no
+ (isroot, 0o001), # ---------x, no
+ (isroot, 0o010), # ------x---, no
+ (True, 0o100), # ---x------, yes
+ (False, 0o002), # --------w-, no
+ (False, 0o020), # -----w----, no
+ (False, 0o200), # --w-------, no
+ (isroot, 0o003), # --------wx, no
+ (isroot, 0o030), # -----wx---, no
+ (True, 0o300), # --wx------, yes
+ (False, 0o004), # -------r--, no
+ (False, 0o040), # ----r-----, no
+ (False, 0o400), # -r--------, no
+ (isroot, 0o005), # -------r-x, no
+ (isroot, 0o050), # ----r-x---, no
+ (True, 0o500), # -r-x------, yes
+ (False, 0o006), # -------rw-, no
+ (False, 0o060), # ----rw----, no
+ (False, 0o600), # -rw-------, no
+ (isroot, 0o007), # -------rwx, no
+ (isroot, 0o070), # ----rwx---, no
+ (True, 0o700), # -rwx------, yes
+ (isroot, 0o4001), # ---S-----x, no
+ (isroot, 0o4010), # ---S--x---, no
+ (True, 0o4100), # ---s------, yes
+ (isroot, 0o4003), # ---S----wx, no
+ (isroot, 0o4030), # ---S-wx---, no
+ (True, 0o4300), # --ws------, yes
+ (isroot, 0o2001), # ------S--x, no
+ (isroot, 0o2010), # ------s---, no
+ (True, 0o2100), # ---x--S---, yes
+
+ ):
+ mode_str = '{0:0>4o}'.format(mode)
+
+ # given file mode,
os.chmod(bin_path, mode)
- if not should_match:
- # should not be found because it is not executable
- assert pexpect.which(fname) is None
- else:
- # should match full path
- assert pexpect.which(fname) == bin_path
+ # exercise whether we may execute
+ can_execute = True
+ try:
+ subprocess.Popen(fname).wait() == 0
+ except OSError as err:
+ if err.errno != errno.EACCES:
+ raise
+ # permission denied
+ can_execute = False
+
+ assert should_match == can_execute, (
+ should_match, can_execute, mode_str)
+
+ # exercise whether which(1) would match
+ proc = subprocess.Popen((bin_which, fname),
+ env={'PATH': bin_dir},
+ stdout=subprocess.PIPE)
+ bin_which_match = bool(not proc.wait())
+ assert should_match == bin_which_match, (
+ should_match, bin_which_match, mode_str)
+
+ # finally, exercise pexpect's which(1) matches
+ # the same.
+ pexpect_match = bool(pexpect.which(fname))
+
+ assert should_match == pexpect_match == bin_which_match, (
+ should_match, pexpect_match, bin_which_match, mode_str)
finally:
# restore,
os.environ['PATH'] = save_path
+
# destroy scratch files and folders,
if os.path.exists(bin_path):
os.unlink(bin_path)
diff --git a/tools/display-sighandlers.py b/tools/display-sighandlers.py
new file mode 100755
index 0000000..98445e9
--- /dev/null
+++ b/tools/display-sighandlers.py
@@ -0,0 +1,20 @@
+#!/usr/bin/env python
+# Displays all signals, their values, and their handlers.
+from __future__ import print_function
+import signal
+FMT = '{name:<10} {value:<5} {description}'
+
+# header
+print(FMT.format(name='name', value='value', description='description'))
+print('-' * (33))
+
+for name, value in [(signal_name, getattr(signal, signal_name))
+ for signal_name in dir(signal)
+ if signal_name.startswith('SIG')
+ and not signal_name.startswith('SIG_')]:
+ handler = signal.getsignal(value)
+ description = {
+ signal.SIG_IGN: "ignored(SIG_IGN)",
+ signal.SIG_DFL: "default(SIG_DFL)"
+ }.get(handler, handler)
+ print(FMT.format(name=name, value=value, description=description))
diff --git a/tools/display-terminalinfo.py b/tools/display-terminalinfo.py
new file mode 100755
index 0000000..15911d4
--- /dev/null
+++ b/tools/display-terminalinfo.py
@@ -0,0 +1,209 @@
+#!/usr/bin/env python
+""" Display known information about our terminal. """
+from __future__ import print_function
+import termios
+import locale
+import sys
+import os
+
+BITMAP_IFLAG = {
+ 'IGNBRK': 'ignore BREAK condition',
+ 'BRKINT': 'map BREAK to SIGINTR',
+ 'IGNPAR': 'ignore (discard) parity errors',
+ 'PARMRK': 'mark parity and framing errors',
+ 'INPCK': 'enable checking of parity errors',
+ 'ISTRIP': 'strip 8th bit off chars',
+ 'INLCR': 'map NL into CR',
+ 'IGNCR': 'ignore CR',
+ 'ICRNL': 'map CR to NL (ala CRMOD)',
+ 'IXON': 'enable output flow control',
+ 'IXOFF': 'enable input flow control',
+ 'IXANY': 'any char will restart after stop',
+ 'IMAXBEL': 'ring bell on input queue full',
+ 'IUCLC': 'translate upper case to lower case',
+}
+
+BITMAP_OFLAG = {
+ 'OPOST': 'enable following output processing',
+ 'ONLCR': 'map NL to CR-NL (ala CRMOD)',
+ 'OXTABS': 'expand tabs to spaces',
+ 'ONOEOT': 'discard EOT\'s `^D\' on output)',
+ 'OCRNL': 'map CR to NL',
+ 'OLCUC': 'translate lower case to upper case',
+ 'ONOCR': 'No CR output at column 0',
+ 'ONLRET': 'NL performs CR function',
+}
+
+BITMAP_CFLAG = {
+ 'CSIZE': 'character size mask',
+ 'CS5': '5 bits (pseudo)',
+ 'CS6': '6 bits',
+ 'CS7': '7 bits',
+ 'CS8': '8 bits',
+ 'CSTOPB': 'send 2 stop bits',
+ 'CREAD': 'enable receiver',
+ 'PARENB': 'parity enable',
+ 'PARODD': 'odd parity, else even',
+ 'HUPCL': 'hang up on last close',
+ 'CLOCAL': 'ignore modem status lines',
+ 'CCTS_OFLOW': 'CTS flow control of output',
+ 'CRTSCTS': 'same as CCTS_OFLOW',
+ 'CRTS_IFLOW': 'RTS flow control of input',
+ 'MDMBUF': 'flow control output via Carrier',
+}
+
+BITMAP_LFLAG = {
+ 'ECHOKE': 'visual erase for line kill',
+ 'ECHOE': 'visually erase chars',
+ 'ECHO': 'enable echoing',
+ 'ECHONL': 'echo NL even if ECHO is off',
+ 'ECHOPRT': 'visual erase mode for hardcopy',
+ 'ECHOCTL': 'echo control chars as ^(Char)',
+ 'ISIG': 'enable signals INTR, QUIT, [D]SUSP',
+ 'ICANON': 'canonicalize input lines',
+ 'ALTWERASE': 'use alternate WERASE algorithm',
+ 'IEXTEN': 'enable DISCARD and LNEXT',
+ 'EXTPROC': 'external processing',
+ 'TOSTOP': 'stop background jobs from output',
+ 'FLUSHO': 'output being flushed (state)',
+ 'NOKERNINFO': 'no kernel output from VSTATUS',
+ 'PENDIN': 'XXX retype pending input (state)',
+ 'NOFLSH': 'don\'t flush after interrupt',
+}
+
+CTLCHAR_INDEX = {
+ 'VEOF': 'EOF',
+ 'VEOL': 'EOL',
+ 'VEOL2': 'EOL2',
+ 'VERASE': 'ERASE',
+ 'VWERASE': 'WERASE',
+ 'VKILL': 'KILL',
+ 'VREPRINT': 'REPRINT',
+ 'VINTR': 'INTR',
+ 'VQUIT': 'QUIT',
+ 'VSUSP': 'SUSP',
+ 'VDSUSP': 'DSUSP',
+ 'VSTART': 'START',
+ 'VSTOP': 'STOP',
+ 'VLNEXT': 'LNEXT',
+ 'VDISCARD': 'DISCARD',
+ 'VMIN': '---',
+ 'VTIME': '---',
+ 'VSTATUS': 'STATUS',
+}
+
+
+def display_bitmask(kind, bitmap, value):
+ """ Display all matching bitmask values for ``value`` given ``bitmap``. """
+ col1_width = max(map(len, list(bitmap.keys()) + [kind]))
+ col2_width = 7
+ FMT = '{name:>{col1_width}} {value:>{col2_width}} {description}'
+ print(FMT.format(name=kind,
+ value='Value',
+ description='Description',
+ col1_width=col1_width,
+ col2_width=col2_width))
+ print('{0} {1} {2}'.format('-' * col1_width,
+ '-' * col2_width,
+ '-' * max(map(len, bitmap.values()))))
+ for flag_name, description in bitmap.items():
+ try:
+ bitmask = getattr(termios, flag_name)
+ bit_val = 'on' if bool(value & bitmask) else 'off'
+ except AttributeError:
+ bit_val = 'undef'
+ print(FMT.format(name=flag_name,
+ value=bit_val,
+ description=description,
+ col1_width=col1_width,
+ col2_width=col2_width))
+ print()
+
+
+def display_ctl_chars(index, cc):
+ """ Display all control character indicies, names, and values. """
+ title = 'Special Character'
+ col1_width = len(title)
+ col2_width = max(map(len, index.values()))
+ FMT = '{idx:<{col1_width}} {name:<{col2_width}} {value}'
+ print('Special line Characters'.center(40).rstrip())
+ print(FMT.format(idx='Index',
+ name='Name',
+ value='Value',
+ col1_width=col1_width,
+ col2_width=col2_width))
+ print('{0} {1} {2}'.format('-' * col1_width,
+ '-' * col2_width,
+ '-' * 10))
+ for index_name, name in index.items():
+ try:
+ index = getattr(termios, index_name)
+ value = cc[index]
+ if value == b'\xff':
+ value = '_POSIX_VDISABLE'
+ else:
+ value = repr(value)
+ except AttributeError:
+ value = 'undef'
+ print(FMT.format(idx=index_name,
+ name=name,
+ value=value,
+ col1_width=col1_width,
+ col2_width=col2_width))
+ print()
+
+
+def display_conf(kind, names, getter):
+ col1_width = max(map(len, names))
+ FMT = '{name:>{col1_width}} {value}'
+ print(FMT.format(name=kind,
+ value='value',
+ col1_width=col1_width))
+ print('{0} {1}'.format('-' * col1_width, '-' * 27))
+ for name in names:
+ try:
+ value = getter(name)
+ except OSError as err:
+ value = err
+ print(FMT.format(name=name, value=value, col1_width=col1_width))
+ print()
+
+
+def main():
+ fd = sys.stdin.fileno()
+ locale.setlocale(locale.LC_ALL, '')
+ encoding = locale.getpreferredencoding()
+
+ print('os.isatty({0}) => {1}'.format(fd, os.isatty(fd)))
+ print('locale.getpreferredencoding() => {0}'.format(encoding))
+
+ display_conf(kind='pathconf',
+ names=os.pathconf_names,
+ getter=lambda name: os.fpathconf(fd, name))
+
+ try:
+ (iflag, oflag, cflag, lflag, ispeed, ospeed, cc
+ ) = termios.tcgetattr(fd)
+ except termios.error as err:
+ print('stdin is not a typewriter: {0}'.format(err))
+ else:
+ display_bitmask(kind='Input Mode',
+ bitmap=BITMAP_IFLAG,
+ value=iflag)
+ display_bitmask(kind='Output Mode',
+ bitmap=BITMAP_OFLAG,
+ value=oflag)
+ display_bitmask(kind='Control Mode',
+ bitmap=BITMAP_CFLAG,
+ value=cflag)
+ display_bitmask(kind='Local Mode',
+ bitmap=BITMAP_LFLAG,
+ value=lflag)
+ display_ctl_chars(index=CTLCHAR_INDEX,
+ cc=cc)
+ print('os.ttyname({0}) => {1}'.format(fd, os.ttyname(fd)))
+ print('os.ctermid() => {0}'.format(os.ttyname(fd)))
+
+
+if __name__ == '__main__':
+ main()
diff --git a/tools/dotfiles.tar.gz b/tools/dotfiles.tar.gz
deleted file mode 100644
index 0636410..0000000
--- a/tools/dotfiles.tar.gz
+++ /dev/null
Binary files differ
diff --git a/tools/getkey.py b/tools/getkey.py
deleted file mode 100755
index 76c07de..0000000
--- a/tools/getkey.py
+++ /dev/null
@@ -1,46 +0,0 @@
-'''
-This currently just holds some notes.
-This is not expected to be working code.
-
-$Revision: 120 $
-$Date: 2002-11-27 11:13:04 -0800 (Wed, 27 Nov 2002) $
-
-PEXPECT LICENSE
-
- This license is approved by the OSI and FSF as GPL-compatible.
- http://opensource.org/licenses/isc-license.txt
-
- Copyright (c) 2012, Noah Spurrier <noah@noah.org>
- PERMISSION TO USE, COPY, MODIFY, AND/OR DISTRIBUTE THIS SOFTWARE FOR ANY
- PURPOSE WITH OR WITHOUT FEE IS HEREBY GRANTED, PROVIDED THAT THE ABOVE
- COPYRIGHT NOTICE AND THIS PERMISSION NOTICE APPEAR IN ALL COPIES.
- THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
- WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
- MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
- ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
- WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
- ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
- OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
-
-'''
-
-import tty, termios, sys
-
-def getkey():
- file = sys.stdin.fileno()
- mode = termios.tcgetattr(file)
- try:
- tty.setraw(file, termios.TCSANOW)
- ch = sys.stdin.read(1)
- finally:
- termios.tcsetattr(file, termios.TCSANOW, mode)
- return ch
-
-def test_typing ():
- s = screen (10,10)
- while 1:
- ch = getkey()
- s.type(ch)
- print str(s)
- print
-
diff --git a/tools/merge_templates.py b/tools/merge_templates.py
deleted file mode 100755
index b4fab18..0000000
--- a/tools/merge_templates.py
+++ /dev/null
@@ -1,52 +0,0 @@
-#!/usr/bin/env python
-
-'''
-I used to use this to keep the sourceforge pages up to date with the
-latest documentation and I like to keep a copy of the distribution
-on the web site so that it will be compatible with
-The Vaults of Parnasus which requires a direct URL link to a
-tar ball distribution. I don't advertise the package this way.
-
-PEXPECT LICENSE
-
- This license is approved by the OSI and FSF as GPL-compatible.
- http://opensource.org/licenses/isc-license.txt
-
- Copyright (c) 2012, Noah Spurrier <noah@noah.org>
- PERMISSION TO USE, COPY, MODIFY, AND/OR DISTRIBUTE THIS SOFTWARE FOR ANY
- PURPOSE WITH OR WITHOUT FEE IS HEREBY GRANTED, PROVIDED THAT THE ABOVE
- COPYRIGHT NOTICE AND THIS PERMISSION NOTICE APPEAR IN ALL COPIES.
- THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
- WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
- MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
- ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
- WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
- ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
- OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
-'''
-import os
-import re
-import pyed
-
-# extract the version number from the pexpect.py source.
-d = pyed.pyed()
-d.read ("pexpect.py")
-d.first('^__version__')
-r = re.search("'([0-9]\.[0-9])'", d.cur_line)
-version = r.group(1)
-
-# Edit the index.html to update current VERSION.
-d = pyed.pyed()
-d.read ("doc/index.template.html")
-for cl in d.match_lines('.*VERSION.*'):
- d.cur_line = d.cur_line.replace('VERSION', version)
-d.write("doc/index.html")
-
-# Edit the setup.py to update current VERSION.
-d = pyed.pyed()
-d.read ("setup.py.template")
-for cl in d.match_lines('.*VERSION.*'):
- d.cur_line = d.cur_line.replace('VERSION', version)
-d.write("setup.py")
-os.chmod("setup.py", 0755)
-
diff --git a/tools/pyed.py b/tools/pyed.py
deleted file mode 100755
index 14c562a..0000000
--- a/tools/pyed.py
+++ /dev/null
@@ -1,180 +0,0 @@
-"""This represents a document with methods to allow easy editing.
-Think 'sed', only more fun to use.
-Example 1: Convert all python-style comments in a file to UPPERCASE.
-This operates as a filter on stdin, so this needs a shell pipe.
-cat myscript.py | upper_filter.py
- import sys, pyed
- pe = pyed()
- pe.read(sys.stdin)
- for pe in pe.match_lines('^\\s*#'):
- pe.cur_line = pe.cur_line.upper()
- print pe
-
-Example 2: Edit an Apache2 httpd.conf file to turn on supplemental SSL configuration.
- import pyed
- pe = pyed()
- pe.read("httpd.conf")
- pe.first('#Include conf/extra/httpd-ssl.conf')
- pe.cur_line = 'Include conf/extra/httpd-ssl.conf'
- pe.write("httpd.conf")
-
-PEXPECT LICENSE
-
- This license is approved by the OSI and FSF as GPL-compatible.
- http://opensource.org/licenses/isc-license.txt
-
- Copyright (c) 2012, Noah Spurrier <noah@noah.org>
- PERMISSION TO USE, COPY, MODIFY, AND/OR DISTRIBUTE THIS SOFTWARE FOR ANY
- PURPOSE WITH OR WITHOUT FEE IS HEREBY GRANTED, PROVIDED THAT THE ABOVE
- COPYRIGHT NOTICE AND THIS PERMISSION NOTICE APPEAR IN ALL COPIES.
- THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
- WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
- MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
- ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
- WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
- ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
- OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
-
-"""
-
-import re
-class pyed (object):
- def __init__ (self, new_str=None):
- if new_str is not None:
- self.lines = new_str.splitlines()
- self.cur_line_num = 0
- else:
- self.lines = None
- # force invalid line number
- self.cur_line_num = None
- def match_lines (self, pattern, beg=0, end=None):
- """This returns a generator that iterates this object
- over the lines and yielding when a line matches the pattern.
- Note that this generator mutates this object so that
- the cur_line is changed to the line matching the pattern.
- """
- p = re.compile (pattern)
- if end is None:
- end = len(self.lines)
- for i in xrange (beg,end):
- m = p.match(self.lines[i])
- if m is not None:
- self.cur_line_num = i
- yield self
- else:
- # force invalid line number
- cur_line_num = None
- def match_lines_rev (self, pattern, beg=0, end=None):
- """This is similar to match_lines, but the order is reversed.
- """
- p = re.compile (pattern)
- if end is None:
- end = len(self.lines)
- for i in xrange (end-1,beg-1,-1):
- m = p.match(self.lines[i])
- if m is not None:
- self.cur_line_num = i
- yield self
- else:
- # force invalid line number
- cur_line_num = None
- def next (self):
- self.cur_line_num = self.cur_line_num + 1
- if self.cur_line_num >= len(self.lines):
- self.cur_line_num = len(self.lines) - 1
- return self.cur_line
- def prev (self):
- self.cur_line_num = self.cur_line_num - 1
- if self.cur_line_num < 0:
- self.cur_line_num = 0
- return self.cur_line
- def first (self, pattern=None):
- if pattern is not None:
- try:
- return self.match_lines(pattern).next()
- except StopIteration, e:
- # force invalid line number
- self.cur_line_num = None
- return None
- self.cur_line_num = 0
- return self.cur_line
- def last (self, pattern=None):
- if pattern is not None:
- try:
- return self.match_lines_rev(pattern).next()
- except StopIteration, e:
- # force invalid line number
- self.cur_line_num = None
- return None
- self.cur_line_num = len(self.lines) - 1
- return self.cur_line
- def insert (self, s=''):
- """This inserts the string as a new line before the current line number.
- """
- self.lines.insert(self.cur_line_num, s)
- def append (self, s=''):
- """Unlike list append, this appends after the current line number,
- not at the end of the entire list.
- """
- self.cur_line_num = self.cur_line_num + 1
- self.lines.insert(self.cur_line_num, s)
- def delete (self):
- del self.cur_line
- def read (self, file_holder):
- """This reads all the lines from a file. The file_holder may be
- either a string filename or any object that supports "read()".
- All previous lines are lost.
- """
- if hasattr(file_holder, 'read') and callable(file_holder.read):
- fin = file_holder
- else:
- fin = open (file_holder, 'rb')
- data = fin.read()
- self.lines = data.splitlines()
- self.cur_line_num = 0
- def write (self, file_holder):
- """This writes all the lines to a file. The file_holder may be
- either a string filename or any object that supports "read()".
- TODO: Make write be atomic using file move instead of overwrite.
- """
- if hasattr(file_holder, 'write') and callable(file_holder.write):
- fout = file_holder
- else:
- fout = open (file_holder, 'wb')
- for l in self.lines:
- fout.write(l)
- fout.write('\n')
- # the following are for smart properties.
- def __str__ (self):
- return '\n'.join(self.lines)
- def __get_cur_line (self):
- self.__cur_line = self.lines[self.cur_line_num]
- return self.__cur_line
- def __set_cur_line (self, value):
- self.__cur_line = value
- self.lines[self.cur_line_num] = self.__cur_line
- def __del_cur_line (self):
- del self.lines[self.cur_line_num]
- if self.cur_line_num >= len(self.lines):
- self.cur_line_num = len(self.lines) - 1
- cur_line = property (__get_cur_line, __set_cur_line, __del_cur_line)
- # lines = property (get_lines, set_lines, del_lines)
-
-__NOT_USED ="""
-import sys
-pe = pyed()
-pe.read(sys.stdin)
-#print "---"
-#print list(x.cur_line for x in pe.match_lines_rev('^#'))
-#print pe.first('^#')
-#print pe.last('^#')
-#print "---"
-for pe in pe.match_lines('^\\s*#'):
- pe.cur_line = pe.cur_line.lower()
-pe.last('# comment.*')
-pe.cur_line = '# Comment 1'
-print pe
-if pe.last('asdfasdf') is None:
- print "can't find 'asdfasdf'"
-"""
-
diff --git a/tools/sfupload.py b/tools/sfupload.py
deleted file mode 100755
index 8a3b078..0000000
--- a/tools/sfupload.py
+++ /dev/null
@@ -1,46 +0,0 @@
-#!/usr/bin/env python
-'''This uploads the latest pexpect package to sourceforge.
-
-PEXPECT LICENSE
-
- This license is approved by the OSI and FSF as GPL-compatible.
- http://opensource.org/licenses/isc-license.txt
-
- Copyright (c) 2012, Noah Spurrier <noah@noah.org>
- PERMISSION TO USE, COPY, MODIFY, AND/OR DISTRIBUTE THIS SOFTWARE FOR ANY
- PURPOSE WITH OR WITHOUT FEE IS HEREBY GRANTED, PROVIDED THAT THE ABOVE
- COPYRIGHT NOTICE AND THIS PERMISSION NOTICE APPEAR IN ALL COPIES.
- THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
- WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
- MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
- ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
- WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
- ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
- OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
-
-'''
-import pexpect
-import sys
-
-child = pexpect.spawn('ftp upload.sourceforge.net')
-child.logfile = sys.stdout
-child.expect('Name .*: ')
-child.sendline('anonymous')
-child.expect('Password:')
-child.sendline('noah@noah.org')
-child.expect('ftp> ')
-child.sendline('cd /incoming')
-child.expect('ftp> ')
-child.sendline('lcd dist')
-child.expect('ftp> ')
-child.sendline('bin')
-child.expect('ftp> ')
-child.sendline('prompt')
-child.expect('ftp> ')
-child.sendline('mput pexpect-*.tar.gz')
-child.expect('ftp> ')
-child.sendline('ls pexpect*')
-child.expect('ftp> ')
-print child.before
-child.sendline('bye')
-
diff --git a/tools/step.py b/tools/step.py
deleted file mode 100755
index cc0062e..0000000
--- a/tools/step.py
+++ /dev/null
@@ -1,47 +0,0 @@
-#!/usr/bin/env python
-'''
-# This single steps through a log file.
-
-PEXPECT LICENSE
-
- This license is approved by the OSI and FSF as GPL-compatible.
- http://opensource.org/licenses/isc-license.txt
-
- Copyright (c) 2012, Noah Spurrier <noah@noah.org>
- PERMISSION TO USE, COPY, MODIFY, AND/OR DISTRIBUTE THIS SOFTWARE FOR ANY
- PURPOSE WITH OR WITHOUT FEE IS HEREBY GRANTED, PROVIDED THAT THE ABOVE
- COPYRIGHT NOTICE AND THIS PERMISSION NOTICE APPEAR IN ALL COPIES.
- THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
- WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
- MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
- ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
- WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
- ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
- OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
-
-'''
-
-import tty, termios, sys
-
-def getkey():
- file = sys.stdin.fileno()
- mode = termios.tcgetattr(file)
- try:
- tty.setraw(file, termios.TCSANOW)
- ch = sys.stdin.read(1)
- finally:
- termios.tcsetattr(file, termios.TCSANOW, mode)
- return ch
-
-fin = open ('log', 'rb')
-fout = open ('log2', 'wb')
-
-while 1:
- foo = fin.read(1)
- if foo == '':
- sys.exit(0)
- sys.stdout.write(foo)
- getkey()
- fout.write (foo)
- fout.flush()
-
diff --git a/tools/teamcity-coverage-report.sh b/tools/teamcity-coverage-report.sh
new file mode 100755
index 0000000..2e32241
--- /dev/null
+++ b/tools/teamcity-coverage-report.sh
@@ -0,0 +1,27 @@
+#!/bin/bash
+# This is to be executed by each individual OS test. It only
+# combines coverage files and reports locally to the given
+# TeamCity build configuration.
+set -e
+set -o pipefail
+[ -z ${TEMP} ] && TEMP=/tmp
+
+# combine all .coverage* files,
+coverage combine
+
+# create ascii report,
+report_file=$(mktemp $TEMP/coverage.XXXXX)
+coverage report --rcfile=`dirname $0`/../.coveragerc > "${report_file}" 2>/dev/null
+
+# Report Code Coverage for TeamCity, using 'Service Messages',
+# https://confluence.jetbrains.com/display/TCD8/How+To...#HowTo...-ImportcoverageresultsinTeamCity
+# https://confluence.jetbrains.com/display/TCD8/Custom+Chart#CustomChart-DefaultStatisticsValuesProvidedbyTeamCity
+total_no_lines=$(awk '/TOTAL/{printf("%s",$2)}' < "${report_file}")
+total_no_misses=$(awk '/TOTAL/{printf("%s",$3)}' < "${report_file}")
+total_no_covered=$((${total_no_lines} - ${total_no_misses}))
+echo "##teamcity[buildStatisticValue key='CodeCoverageAbsLTotal' value='""${total_no_lines}""']"
+echo "##teamcity[buildStatisticValue key='CodeCoverageAbsLCovered' value='""${total_no_covered}""']"
+
+# Display for human consumption and remove ascii file.
+cat "${report_file}"
+rm "${report_file}"
diff --git a/tools/teamcity-runtests.sh b/tools/teamcity-runtests.sh
new file mode 100755
index 0000000..b74f179
--- /dev/null
+++ b/tools/teamcity-runtests.sh
@@ -0,0 +1,61 @@
+#!/bin/bash
+#
+# This script assumes that the project 'ptyprocess' is
+# available in the parent of the project's folder.
+set -e
+set -o pipefail
+
+if [ -z $1 ]; then
+ echo "$0 (2.6|2.7|3.3|3.4)"
+ exit 1
+fi
+
+export PYTHONIOENCODING=UTF8
+export LANG=en_US.UTF-8
+
+pyversion=$1
+here=$(cd `dirname $0`; pwd)
+osrel=$(uname -s)
+venv=teamcity-pexpect
+venv_wrapper=$(which virtualenvwrapper.sh)
+
+if [ -z $venv_wrapper ]; then
+ echo "virtualenvwrapper.sh not found in PATH." >&2
+ exit 1
+fi
+
+. ${venv_wrapper}
+workon ${venv} || mkvirtualenv -p `which python${pyversion}` ${venv} || true
+
+# install ptyprocess
+cd $here/../../ptyprocess
+pip uninstall --yes ptyprocess || true
+python setup.py install
+
+# install all test requirements
+pip install --upgrade pytest-cov coverage coveralls pytest-capturelog
+
+# run tests
+cd $here/..
+ret=0
+py.test \
+ --cov pexpect \
+ --cov-config .coveragerc \
+ --junit-xml=results.${osrel}.py${pyversion}.xml \
+ --verbose \
+ --verbose \
+ || ret=$?
+
+if [ $ret -ne 0 ]; then
+ # we always exit 0, preferring instead the jUnit XML
+ # results to be the dominate cause of a failed build.
+ echo "py.test returned exit code ${ret}." >&2
+ echo "the build should detect and report these failing tests." >&2
+fi
+
+# combine all coverage to single file, report for this build,
+# then move into ./build-output/ as a unique artifact to allow
+# the final "Full build" step to combine and report to coveralls.io
+`dirname $0`/teamcity-coverage-report.sh
+mkdir -p build-output
+mv .coverage build-output/.coverage.${osrel}.py{$pyversion}.$RANDOM.$$
diff --git a/tools/tweak_files.py b/tools/tweak_files.py
deleted file mode 100755
index 08481a2..0000000
--- a/tools/tweak_files.py
+++ /dev/null
@@ -1,46 +0,0 @@
-'''
-
-PEXPECT LICENSE
-
- This license is approved by the OSI and FSF as GPL-compatible.
- http://opensource.org/licenses/isc-license.txt
-
- Copyright (c) 2012, Noah Spurrier <noah@noah.org>
- PERMISSION TO USE, COPY, MODIFY, AND/OR DISTRIBUTE THIS SOFTWARE FOR ANY
- PURPOSE WITH OR WITHOUT FEE IS HEREBY GRANTED, PROVIDED THAT THE ABOVE
- COPYRIGHT NOTICE AND THIS PERMISSION NOTICE APPEAR IN ALL COPIES.
- THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
- WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
- MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
- ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
- WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
- ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
- OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
-
-'''
-
-import pyed
-import os
-import re
-
-# extract the version number from the pexpect.py source.
-d = pyed.pyed()
-d.read ("pexpect.py")
-d.first('^__version__')
-r = re.search("'([0-9]\.[0-9])'", d.cur_line)
-version = r.group(1)
-
-# Edit the index.html to update current VERSION.
-d = pyed.pyed()
-d.read ("doc/index.html.template")
-for cl in d.match_lines('.*VERSION.*'):
- d.cur_line = d.cur_line.replace('VERSION', version)
-d.write("doc/index.html")
-
-# Edit the setup.py to update current VERSION.
-d = pyed.pyed()
-d.read ("setup.py.template")
-for cl in d.match_lines('.*VERSION.*'):
- d.cur_line = d.cur_line.replace('VERSION', version)
-d.write("setup.py")
-os.chmod("setup.py", 0755)
diff --git a/tools/websync.py b/tools/websync.py
deleted file mode 100755
index b7723e5..0000000
--- a/tools/websync.py
+++ /dev/null
@@ -1,63 +0,0 @@
-#!/usr/bin/env python
-
-'''
-I used to use this to keep the sourceforge pages up to date with the
-latest documentation and I like to keep a copy of the distribution
-on the web site so that it will be compatible with
-The Vaults of Parnasus which requires a direct URL link to a
-tar ball distribution. I don't advertise the package this way.
-
-PEXPECT LICENSE
-
- This license is approved by the OSI and FSF as GPL-compatible.
- http://opensource.org/licenses/isc-license.txt
-
- Copyright (c) 2012, Noah Spurrier <noah@noah.org>
- PERMISSION TO USE, COPY, MODIFY, AND/OR DISTRIBUTE THIS SOFTWARE FOR ANY
- PURPOSE WITH OR WITHOUT FEE IS HEREBY GRANTED, PROVIDED THAT THE ABOVE
- COPYRIGHT NOTICE AND THIS PERMISSION NOTICE APPEAR IN ALL COPIES.
- THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
- WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
- MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
- ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
- WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
- ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
- OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
-
-'''
-
-import pexpect
-import getpass
-import sys
-
-X = getpass.getpass('Password: ')
-pp_pattern=["(?i)password:", "(?i)enter passphrase for key '.*?':"]
-
-p = pexpect.spawn ('scp -r doc/. noah@shell.sourceforge.net:/home/groups/p/pe/pexpect/htdocs/.')
-p.logfile_read = sys.stdout
-p.expect (pp_pattern)
-p.sendline (X)
-p.expect (pexpect.EOF)
-print p.before
-
-p = pexpect.spawn ('scp doc/clean.css doc/email.png noah@shell.sourceforge.net:/home/groups/p/pe/pexpect/htdocs/clean.css')
-p.logfile_read = sys.stdout
-p.expect (pp_pattern)
-p.sendline (X)
-p.expect (pexpect.EOF)
-print p.before
-
-#p = pexpect.spawn ('ssh noah@use-pr-shell1.sourceforge.net "cd htdocs;tar zxvf pexpect-doc.tgz"')
-#p.logfile_read = sys.stdout
-#p.expect ('password:')
-#p.sendline (X)
-#p.expect (pexpect.EOF)
-#print p.before
-
-p = pexpect.spawn ('scp dist/pexpect-*.tar.gz noah@shell.sourceforge.net:/home/groups/p/pe/pexpect/htdocs/.')
-p.logfile_read = sys.stdout
-p.expect (pp_pattern)
-p.sendline (X)
-p.expect (pexpect.EOF)
-print p.before
-