summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJeff Quast <contact@jeffquast.com>2015-04-24 21:10:03 -0700
committerJeff Quast <contact@jeffquast.com>2015-04-24 21:10:03 -0700
commit8dafe9ad04bf3237b1c8a7c00e9c007b446ffc33 (patch)
tree83f6eccf7020a47bb8c30332e25ef96ee7e78b3c
parentfba2c6c4d903c2d1e4eca5fbd0db2336d7656ef0 (diff)
parentdf3e476c71c8a10d84788b03c79b4167ce6b778b (diff)
downloadpexpect-git-8dafe9ad04bf3237b1c8a7c00e9c007b446ffc33.tar.gz
Merge pull request #178 from pexpect/support-method-as-run-event-callback
Support MethodType as callback for pexpect.run(event=...)
-rw-r--r--doc/history.rst2
-rw-r--r--pexpect/__init__.py13
-rwxr-xr-xtests/test_run.py118
3 files changed, 106 insertions, 27 deletions
diff --git a/doc/history.rst b/doc/history.rst
index c9d5640..95bf371 100644
--- a/doc/history.rst
+++ b/doc/history.rst
@@ -12,6 +12,8 @@ Version 4.0
coroutine. You can get the result using ``yield from``, or wrap it in an
:class:`asyncio.Task`. This allows the event loop to do other things while
waiting for output that matches a pattern.
+* Enhancement: allow method as callbacks of argument ``events`` for
+ :func:`pexpect.run` (:ghissue:`176`).
Version 3.4
```````````
diff --git a/pexpect/__init__.py b/pexpect/__init__.py
index c906e89..4b153f4 100644
--- a/pexpect/__init__.py
+++ b/pexpect/__init__.py
@@ -71,7 +71,7 @@ from .utils import split_command_line, which, is_executable_file
from .pty_spawn import spawn, spawnu, PY3
from .expect import Expecter, searcher_re, searcher_string
-__version__ = '3.3'
+__version__ = '4.0.dev'
__revision__ = ''
__all__ = ['ExceptionPexpect', 'EOF', 'TIMEOUT', 'spawn', 'spawnu', 'run', 'runu',
'which', 'split_command_line', '__version__', '__revision__']
@@ -149,8 +149,8 @@ def run(command, timeout=30, withexitstatus=False, events=None,
Note that you should put newlines in your string if Enter is necessary.
- Like the example above, the responses may also contain callback functions.
- Any callback is a function that takes a dictionary as an argument.
+ Like the example above, the responses may also contain a callback, either
+ a function or method. It should accept a dictionary value as an argument.
The dictionary contains all the locals from the run() function, so you can
access the child spawn object or any other variable defined in run()
(event_count, child, and extra_args are the most useful). A callback may
@@ -206,7 +206,8 @@ def _run(command, timeout, withexitstatus, events, extra_args, logfile, cwd,
child_result_list.append(child.before)
if isinstance(responses[index], child.allowed_string_types):
child.send(responses[index])
- elif isinstance(responses[index], types.FunctionType):
+ elif (isinstance(responses[index], types.FunctionType) or
+ isinstance(responses[index], types.MethodType)):
callback_result = responses[index](locals())
sys.stdout.flush()
if isinstance(callback_result, child.allowed_string_types):
@@ -214,7 +215,9 @@ def _run(command, timeout, withexitstatus, events, extra_args, logfile, cwd,
elif callback_result:
break
else:
- raise TypeError('The callback must be a string or function.')
+ raise TypeError("parameter `event' at index {index} must be "
+ "a string, method, or function: {value!r}"
+ .format(index=index, value=responses[index]))
event_count = event_count + 1
except TIMEOUT:
child_result_list.append(child.before)
diff --git a/tests/test_run.py b/tests/test_run.py
index c018b4d..1b3c92f 100755
--- a/tests/test_run.py
+++ b/tests/test_run.py
@@ -29,12 +29,29 @@ from . import PexpectTestCase
unicode_type = str if pexpect.PY3 else unicode
-def timeout_callback (d):
-# print d["event_count"],
- if d["event_count"]>3:
+
+def timeout_callback(values):
+ if values["event_count"] > 3:
return 1
return 0
+
+def function_events_callback(values):
+ try:
+ previous_echoed = (values["child_result_list"][-1]
+ .decode().split("\n")[-2].strip())
+ if previous_echoed.endswith("stage-1"):
+ return "echo stage-2\n"
+ elif previous_echoed.endswith("stage-2"):
+ return "echo stage-3\n"
+ elif previous_echoed.endswith("stage-3"):
+ return "exit\n"
+ else:
+ raise Exception("Unexpected output {0}".format(previous_echoed))
+ except IndexError:
+ return "echo stage-1\n"
+
+
class RunFuncTestCase(PexpectTestCase.PexpectTestCase):
runfunc = staticmethod(pexpect.run)
cr = b'\r'
@@ -51,27 +68,34 @@ class RunFuncTestCase(PexpectTestCase.PexpectTestCase):
os.unlink(self.rcfile)
super(RunFuncTestCase, self).tearDown()
- def test_run_exit (self):
+ def test_run_exit(self):
(data, exitstatus) = self.runfunc('python exit1.py', withexitstatus=1)
assert exitstatus == 1, "Exit status of 'python exit1.py' should be 1."
- def test_run (self):
- the_old_way = subprocess.Popen(args=['uname', '-m', '-n'],
- stdout=subprocess.PIPE).communicate()[0].rstrip()
- (the_new_way, exitstatus) = self.runfunc('uname -m -n', withexitstatus=1)
+ def test_run(self):
+ the_old_way = subprocess.Popen(
+ args=['uname', '-m', '-n'],
+ stdout=subprocess.PIPE
+ ).communicate()[0].rstrip()
+
+ (the_new_way, exitstatus) = self.runfunc(
+ 'uname -m -n', withexitstatus=1)
the_new_way = the_new_way.replace(self.cr, self.empty).rstrip()
+
self.assertEqual(self.prep_subprocess_out(the_old_way), the_new_way)
self.assertEqual(exitstatus, 0)
- def test_run_callback (self): # TODO it seems like this test could block forever if run fails...
- self.runfunc("cat", timeout=1, events={pexpect.TIMEOUT:timeout_callback})
+ def test_run_callback(self):
+ # TODO it seems like this test could block forever if run fails...
+ events = {pexpect.TIMEOUT: timeout_callback}
+ self.runfunc("cat", timeout=1, events=events)
- def test_run_bad_exitstatus (self):
- (the_new_way, exitstatus) = self.runfunc('ls -l /najoeufhdnzkxjd',
- withexitstatus=1)
+ def test_run_bad_exitstatus(self):
+ (the_new_way, exitstatus) = self.runfunc(
+ 'ls -l /najoeufhdnzkxjd', withexitstatus=1)
assert exitstatus != 0
- def test_run_tuple_list (self):
+ def test_run_event_as_string(self):
events = [
# second match on 'abc', echo 'def'
('abc\r\n.*GO:', 'echo "def"\n'),
@@ -88,30 +112,80 @@ class RunFuncTestCase(PexpectTestCase.PexpectTestCase):
timeout=10)
assert exitstatus == 0
+ def test_run_event_as_function(self):
+ events = [
+ ('GO:', function_events_callback)
+ ]
+
+ (data, exitstatus) = pexpect.run(
+ 'bash --rcfile {0}'.format(self.rcfile),
+ withexitstatus=True,
+ events=events,
+ timeout=10)
+ assert exitstatus == 0
+
+ def test_run_event_as_method(self):
+ events = [
+ ('GO:', self._method_events_callback)
+ ]
+
+ (data, exitstatus) = pexpect.run(
+ 'bash --rcfile {0}'.format(self.rcfile),
+ withexitstatus=True,
+ events=events,
+ timeout=10)
+ assert exitstatus == 0
+
+ def test_run_event_typeerror(self):
+ events = [('GO:', -1)]
+ with self.assertRaises(TypeError):
+ pexpect.run('bash --rcfile {0}'.format(self.rcfile),
+ withexitstatus=True,
+ events=events,
+ timeout=10)
+
+ def _method_events_callback(self, values):
+ try:
+ previous_echoed = (values["child_result_list"][-1].decode()
+ .split("\n")[-2].strip())
+ if previous_echoed.endswith("foo1"):
+ return "echo foo2\n"
+ elif previous_echoed.endswith("foo2"):
+ return "echo foo3\n"
+ elif previous_echoed.endswith("foo3"):
+ return "exit\n"
+ else:
+ raise Exception("Unexpected output {0!r}"
+ .format(previous_echoed))
+ except IndexError:
+ return "echo foo1\n"
+
+
class RunUnicodeFuncTestCase(RunFuncTestCase):
runfunc = staticmethod(pexpect.runu)
cr = b'\r'.decode('ascii')
empty = b''.decode('ascii')
prep_subprocess_out = staticmethod(lambda x: x.decode('utf-8', 'replace'))
+
def test_run_unicode(self):
if pexpect.PY3:
- c = chr(254) # þ
+ char = chr(254) # þ
pattern = '<in >'
else:
- c = unichr(254) # analysis:ignore
+ char = unichr(254) # analysis:ignore
pattern = '<in >'.decode('ascii')
- def callback(d):
- if d['event_count'] == 0:
- return c + '\n'
+ def callback(values):
+ if values['event_count'] == 0:
+ return char + '\n'
else:
return True # Stop the child process
output = pexpect.runu(sys.executable + ' echo_w_prompt.py',
- env={'PYTHONIOENCODING':'utf-8'},
- events={pattern:callback})
+ env={'PYTHONIOENCODING': 'utf-8'},
+ events={pattern: callback})
assert isinstance(output, unicode_type), type(output)
- assert '<out>'+c in output, output
+ assert ('<out>' + char) in output, output
if __name__ == '__main__':
unittest.main()