diff options
author | noah <noah@656d521f-e311-0410-88e0-e7920216d269> | 2002-08-13 12:11:39 +0000 |
---|---|---|
committer | noah <noah@656d521f-e311-0410-88e0-e7920216d269> | 2002-08-13 12:11:39 +0000 |
commit | b5fb29a11e8cbdff20ec1d0a7a940ed065055c08 (patch) | |
tree | 7d0d9050cce04f2efa9b57be8b7e6d1b369addba | |
parent | f5a4fdf25a59020794583dc401e5c56f0543648e (diff) | |
download | pexpect-b5fb29a11e8cbdff20ec1d0a7a940ed065055c08.tar.gz |
Initial revision
git-svn-id: http://pexpect.svn.sourceforge.net/svnroot/pexpect/trunk@3 656d521f-e311-0410-88e0-e7920216d269
30 files changed, 2198 insertions, 0 deletions
diff --git a/pexpect/MANIFEST b/pexpect/MANIFEST new file mode 100644 index 0000000..4809b4b --- /dev/null +++ b/pexpect/MANIFEST @@ -0,0 +1,5 @@ +README.txt +setup.py +pexpect.py +ansi.py +fsm.py diff --git a/pexpect/Makefile b/pexpect/Makefile new file mode 100644 index 0000000..7d5e7f1 --- /dev/null +++ b/pexpect/Makefile @@ -0,0 +1,50 @@ +SHELL = /bin/sh + +VERSION= 0.8 +#DOCGENERATOR= happydoc +DOCGENERATOR= pydoc -w +MANIFEST_LINES != cat MANIFEST + +all: dist examples doc + +# *.py README.txt MANIFEST + +dist/pexpect-$(VERSION).tar.gz: $(MANIFEST_LINES) + rm -f *.pyc + rm -f pexpect-*.tgz + rm -f dist/pexpect-$(VERSION).tar.gz + /usr/bin/env python setup.py sdist + +install: + make distutil + /usr/bin/env python setup.py install + +dist: pexpect-current.tgz + +pexpect-current.tgz: dist/pexpect-$(VERSION).tar.gz + rm -f pexpect-current.tgz + cp dist/pexpect-$(VERSION).tar.gz ./pexpect-current.tgz + +doc: doc.tgz + +doc.tgz: doc/* + rm -f doc.tgz + $(DOCGENERATOR) `echo "$(MANIFEST_LINES)" | sed -e "s/\.py//g"` + mv *.html doc/ + tar zcf doc.tgz doc/ + +examples: examples.tgz + +examples.tgz: examples/* + rm -f examples.tgz + tar zcf examples.tgz examples/ + +clean: + rm -f *.pyc + rm -f pexpect-*.tgz + rm -f dist/pexpect-$(VERSION).tar.gz + rm -f examples.tgz + rm -f doc.tgz + rm -f python.core + rm -f core + diff --git a/pexpect/README.txt b/pexpect/README.txt new file mode 100644 index 0000000..8d71f8d --- /dev/null +++ b/pexpect/README.txt @@ -0,0 +1,28 @@ +Pexpect +-- a Pure Python "Expect like" module + +The purpose of the Pexpect module is to make Python be a better glue. + +Pexpect works like Don Libes' Expect. Use Pexpect when you want to +control another application. It allows you to start a child +application and have your script control it as if a human were +typing commands. + +Pexpect is a Python module for spawning child applications; +controlling them; and responding to expected patterns in their +output. Pexpect can be used for automating interactive applications +such as ssh, ftp, passwd, telnet, etc. It can be used to a automate +setup scripts for duplicating software package installations on +different servers. It can be used for automated software testing. +Pexpect is in the spirit of Don Libes' Expect, but Pexpect is pure +Python. Other Expect-like modules for Python require TCL and Expect +or require C extensions to be compiled. Pexpect does not use C, +Expect, or TCL extensions. It should work on any platform that +supports the standard Python pty module. The Pexpect interface was +designed to be easy to use so that simple tasks are easy. + +License: Python Software Foundation License + +Noah Spurrier +http://pexpect.sourceforge.net/ + diff --git a/pexpect/ansi.py b/pexpect/ansi.py new file mode 100755 index 0000000..ed0d94e --- /dev/null +++ b/pexpect/ansi.py @@ -0,0 +1,347 @@ +#!/usr/bin/env python + +# references: +# http://www.retards.org/terminals/vt102.html +# http://vt100.net/docs/vt102-ug/contents.html + +import copy + +NUL = 0 # Fill character; ignored on input. +ENQ = 5 # Transmit answerback message. +BEL = 7 # Ring the bell. +BS = 8 # Move cursor left. +HT = 9 # Move cursor to next tab stop. +LF = 10 # Line feed. +VT = 11 # Same as LF. +FF = 12 # Same as LF. +CR = 13 # Move cursor to left margin or newline. +SO = 14 # Invoke G1 character set. +SI = 15 # Invoke G0 character set. +XON = 17 # Resume transmission. +XOFF = 19 # Halt transmission. +CAN = 24 # Cancel escape sequence. +SUB = 26 # Same as CAN. +ESC = 27 # Introduce a control sequence. +DEL = 127 # Fill character; ignored on input. +SPACE = chr(32) # Space or blank character. + +import fsm + +def constrain (n, min, max): + if n < min: + return min + if n > max: + return max + return n + +def push_digit (input_symbol, state, stack): + stack.append(input_symbol) +def build_digit (input_symbol, state, stack): + s = stack.pop() + s = s + input_symbol + stack.append(s) +def accept (input_symbol, state, stack): + if input_symbol=='H': + c = stack.pop() + r = stack.pop() + print 'HOME (r,c) -> (%s, %s)' % (r,c) + if input_symbol == 'D': + n = stack.pop() + print 'BACK (n) -> %s' % n + if input_symbol == 'B': + n = stack.pop() + print 'DOWN (n) -> %s' % n +def default (input_symbol, state, stack): + print 'UNDEFINED: %s, %s -- RESETTING' % (input_symbol, state) + stack=[] + + +class term: + '''This class encapsulates a generic terminal. + It filters a stream and maintains the state of + a screen object. + ''' + def __init__ (self): + s = fsm ('INIT') + f.add_default_transition ('INIT', default) + f.add_transition('INIT','\x1b', 'ESC', None) + f.add_transition('ESC','[', 'ELB', None) + f.add_transition_list('ELB',['0','1','2','3','4','5','6','7','8','9'], 'ELB_DIGIT', push_digit) + f.add_transition_list('ELB_DIGIT',['0','1','2','3','4','5','6','7','8','9'], 'ELB_DIGIT', build_digit) + f.add_transition('ELB_DIGIT',';', 'SEMICOLON', None) + f.add_transition_list('SEMICOLON',['0','1','2','3','4','5','6','7','8','9'], 'ELB_DIGIT2', push_digit) + f.add_transition_list('ELB_DIGIT2',['0','1','2','3','4','5','6','7','8','9'], 'ELB_DIGIT2', build_digit) + f.add_transition_list('ELB_DIGIT2',['H','f'], 'INIT', accept) + f.add_transition_list('ELB_DIGIT',['D','B','C','A'], 'INIT', accept) + + + +class screen: + def __init__ (self, r=24,c=80): + self.rows = r + self.cols = c + self.cur_r = 1 + self.cur_c = 1 + self.scroll_row_start = 1 + self.scroll_row_end = self.rows + self.mode_scape = 0 + self.w = [ [SPACE] * self.cols for c in range(self.rows)] + + def __str__ (self): + s = '' + for r in range (1, self.rows + 1): + for c in range (1, self.cols + 1): + s = s + self.get(r,c) + s = s + '\n' + return s + + def fill (self, ch=SPACE): + self.fill_region (1,1,self.rows,self.cols, ch) + + def fill_region (self, rs,cs, re,ce, ch=SPACE): + rs = constrain (rs, 1, self.rows) + re = constrain (re, 1, self.rows) + cs = constrain (cs, 1, self.cols) + ce = constrain (ce, 1, self.cols) + temp=0 + if rs > re: + temp = re + re = rs + rs = temp + if cs > ce: + temp = ce + ce = cs + cs = temp + for r in range (rs, re+1): + for c in range (cs, ce + 1): + self.put (r,c,ch) + + def type (self, ch): + '''Puts a character at the current cursor position. + cursor position if moved forward with wrap-around, but + no scrolling is done if the cursor hits the lower-right corner + of the screen. + ''' + if ch == '\r': + self.crlf() + return + + self.put(self.cur_r, self.cur_c, ch) + old_r = self.cur_r + old_c = self.cur_c + self.cursor_forward() + if old_c == self.cur_c: + self.cursor_down() + if old_r != self.cur_r: + self.cursor_home (self.cur_r, 1) + else: + self.scroll_up () + self.cursor_home (self.cur_r, 1) + self.erase_line() + + def crlf (self): + '''This advances the cursor with CRLF properties. + The cursor will line wrap and the screen may scroll. + Under UNIX this is what happens when a chr(13) '\r' is read. + ''' + self.cursor_home (self.cur_r, 1) + old_r = self.cur_r + self.cursor_down() + if old_r == self.cur_r: + self.scroll_up () + self.erase_line() + + def put (self, r, c, ch): + '''Screen array starts at 1 index.''' +# if r < 1 or r > self.rows or c < 1 or c > self.cols: +# raise IndexError ('Screen array index out of range') + ch = str(ch)[0] + self.w[r-1][c-1] = ch + + def get (self, r, c): + '''Screen array starts at 1 index.''' +# if r < 1 or r > self.rows or c < 1 or c > self.cols: +# raise IndexError ('Screen array index out of range') + return self.w[r-1][c-1] + + def cursor_constrain (self): + self.cur_r = constrain (self.cur_r, 1, self.rows) + self.cur_c = constrain (self.cur_c, 1, self.cols) + + def cursor_home (self, r=1, c=1): # <ESC>[{ROW};{COLUMN}H + self.cur_r = r + self.cur_c = c + self.cursor_constrain () + def cursor_back (self,count=1): # <ESC>[{COUNT}D (not confused with down) + self.cur_r = self.cur_r - count + self.cursor_constrain () + def cursor_down (self,count=1): # <ESC>[{COUNT}B (not confused with back) + self.cur_r = self.cur_r + count + self.cursor_constrain () + def cursor_forward (self,count=1): # <ESC>[{COUNT}C + self.cur_c = self.cur_c + count + self.cursor_constrain () + def cursor_up (self,count=1): # <ESC>[{COUNT}A + self.cur_r = self.cur_r - count + self.cursor_constrain () + def cursor_force_position (self, r, c): # <ESC>[{ROW};{COLUMN}f + '''Identical to Cursor Home.''' + self.cursor_home (r, c) + def cursor_save (self): # <ESC>[s + '''Save current cursor position.''' + pass + def cursor_unsave (self): # <ESC>[u + '''Restores cursor position after a Save Cursor.''' + pass + def cursor_save_attrs (self): # <ESC>7 + '''Save current cursor position.''' + pass + def cursor_restore_attrs (self): # <ESC>8 + '''Restores cursor position after a Save Cursor.''' + pass + def scroll_constrain (self): + '''This keeps the scroll region within the screen region.''' + if self.scroll_row_start <= 0: + self.scroll_row_start = 1 + if self.scroll_row_end > self.rows: + self.scroll_row_end = self.rows + def scroll_screen (self): # <ESC>[r + '''Enable scrolling for entire display.''' + self.scroll_row_start = 1 + self.scroll_row_end = self.rows + def scroll_screen_rows (self, rs, re): # <ESC>[{start};{end}r + '''Enable scrolling from row {start} to row {end}.''' + self.scroll_row_start = rs + self.scroll_row_end = re + self.scroll_constrain() + def scroll_down (self): # <ESC>D + '''Scroll display down one line.''' + # Screen is indexed from 1, but arrays are indexed from 0. + s = self.scroll_row_start - 1 + e = self.scroll_row_end - 1 + self.w[s+1:e+1] = copy.deepcopy(self.w[s:e]) + def scroll_up (self): # <ESC>M + '''Scroll display up one line.''' + # Screen is indexed from 1, but arrays are indexed from 0. + s = self.scroll_row_start - 1 + e = self.scroll_row_end - 1 + self.w[s:e] = copy.deepcopy(self.w[s+1:e+1]) + def erase_end_of_line (self): # <ESC>[K + '''Erases from the current cursor position to + the end of the current line.''' + self.fill_region (self.cur_r, self.cur_c, self.cur_r, self.cols) + def erase_start_of_line (self): # <ESC>[1K + '''Erases from the current cursor position to + the start of the current line.''' + self.fill_region (self.cur_r, 1, self.cur_r, self.cur_c) + def erase_line (self): # <ESC>[2K + '''Erases the entire current line.''' + self.fill_region (self.cur_r, 1, self.cur_r, self.cols) + def erase_down (self): # <ESC>[J + '''Erases the screen from the current line down to + the bottom of the screen.''' + self.erase_end_of_line () + self.fill_region (self.cur_r + 1, 1, self.rows, self.cols) + def erase_up (self): # <ESC>[1J + '''Erases the screen from the current line up to + the top of the screen.''' + self.erase_start_of_line () + self.fill_region (self.cur_r-1, 1, 1, self.cols) + def erase_screen (self): # <ESC>[2J + '''Erases the screen with the background color.''' + self.fill () + + def set_tab (self): # <ESC>H + '''Sets a tab at the current position.''' + pass + def clear_tab (self): # <ESC>[g + '''Clears tab at the current position.''' + pass + def clear_all_tabs (self): # <ESC>[3g + '''Clears all tabs.''' + pass + +# Insert line Esc [ Pn L +# Delete line Esc [ Pn M +# Delete character Esc [ Pn P +# Scrolling region Esc [ Pn(top);Pn(bot) r + + + + + +import tty, termios, sys + +def getkey(): + file = sys.stdin.fileno() + mode = termios.tcgetattr(file) + try: + tty.setraw(file, termios.TCSANOW) + ch = sys.stdin.read(1) + finally: + termios.tcsetattr(file, termios.TCSANOW, mode) + return ch + + +def test_typing (): + s = screen (10,10) + while 1: + ch = getkey() + s.type(ch) + print str(s) + print + +#s = screen () +#s.fill ('X') +#print s.w +import sys + +e = chr(0x1b) +#sys.stdout.write (e+'[6n') +#sys.stdout.write (e+'[c') +#sys.stdout.write (e+'[0c') +#sys.stdout.write (e+'[5;10r') +#sys.stdout.write (e+'[r') +#sys.stdout.write (e+'[5;10H') +#sys.stdout.write (e+'[K') +#sys.stdout.write (e+'[6n') +#for i in range (0,9): +# sys.stdout.write (e + 'D') + +s = screen (10,10) +s.fill ('X') +s.fill_region (2,2,9,9,'O') +s.fill_region (5,5,s.rows-5,s.cols-5, 'X') +for r in range (1,s.rows + 1): + s.put (r, 1, str(r)) +s.put(1,1, '1') +s.put(1,s.cols, 'C') +s.put(s.rows, 1, 'R') +s.put(s.rows, s.cols, '*') +from pprint import * +print +pprint (s.w) +print str(s) +print +s.scroll_screen_rows (4,6) +s.scroll_down() +s.scroll_down() +s.scroll_down() +print str(s) +print +s.scroll_screen_rows (3,7) +s.scroll_up() +s.scroll_up() +s.scroll_up() +s.scroll_up() +s.scroll_up() +print str(s) +s.fill('.') +s.cursor_home() +for r in range (1,11): + for c in range(1,11): + s.type(r*c) +print +print str(s) + +#test_typing() diff --git a/pexpect/examples/chess.py b/pexpect/examples/chess.py new file mode 100755 index 0000000..8cb922f --- /dev/null +++ b/pexpect/examples/chess.py @@ -0,0 +1,116 @@ +#!/usr/bin/env python +'''This demonstrates controlling a screen oriented application (curses). +It starts two instances of gnuchess and then pits them against each other. +''' +import pexpect +import string + +REGEX_MOVE = '(?:[a-z]|\x1b\[C)(?:[0-9]|\x1b\[C)(?:[a-z]|\x1b\[C)(?:[0-9]|\x1b\[C)' +REGEX_MOVE_PART = '(?:[0-9]|\x1b\[C)(?:[a-z]|\x1b\[C)(?:[0-9]|\x1b\[C)' + +class Chess: + + def __init__(self, engine = "/usr/local/bin/gnuchess -a -h 1"): + self.child = pexpect.spawn (engine) + self.child.expect ('Chess') + if self.child.matched != 'Chess': + raise IOError, 'incompatible chess program' + self.last_computer_move = '' + + def do_first_move (self, move): + self.child.expect ('Your move is') + self.child.sendline (move) +# print '', self.child.matched + return move + + def do_move (self, move): + self.child.expect ('\[19;60H') + self.child.sendline (move) + print 'do_move', self.child.matched, move + return move + + def get_first_computer_move (self): + self.child.expect ('My move is') + self.child.expect (REGEX_MOVE) +# print '', self.child.matched + return self.child.matched + + def get_computer_move (self): + print 'Here' + i = self.child.expect (['\[17;59H', '\[17;58H']) + print i + if i == 0: + self.child.expect (REGEX_MOVE) + if len(self.child.matched) < 4: + self.child.matched = self.child.matched + self.last_computer_move[3] + if i == 1: + self.child.expect (REGEX_MOVE_PART) + self.child.matched = self.last_computer_move[0] + self.child.matched + print '', self.child.matched + self.last_computer_move = self.child.matched + return self.child.matched + + def switch (self): + self.child.sendline ('switch') + + def set_depth (self, depth): + self.child.sendline ('depth') + self.child.expect ('depth=') + self.child.sendline ('%d' % depth) + + def quit(self): + self.child.sendline ('quit') +import sys, os +print 'Starting...' +white = Chess() +white.child.echo = 1 +white.child.expect ('Your move is') +white.set_depth(2) +white.switch() + +move_white = white.get_first_computer_move() +print 'first move white:', move_white + +white.do_move ('e7e5') +move_white = white.get_computer_move() +print 'move white:', move_white +white.do_move ('f8c5') +move_white = white.get_computer_move() +print 'move white:', move_white +white.do_move ('b8a6') +move_white = white.get_computer_move() +print 'move white:', move_white + +sys.exit(1) + + + +black = Chess() +white = Chess() +white.child.expect ('Your move is') +white.switch() + +move_white = white.get_first_computer_move() +print 'first move white:', move_white + +black.do_first_move (move_white) +move_black = black.get_first_computer_move() +print 'first move black:', move_black + +white.do_move (move_black) + +done = 0 +while not done: + move_white = white.get_computer_move() + print 'move white:', move_white + + black.do_move (move_white) + move_black = black.get_computer_move() + print 'move black:', move_black + + white.do_move (move_black) + print 'tail of loop' + +g.quit() + + diff --git a/pexpect/examples/ftp.py b/pexpect/examples/ftp.py new file mode 100755 index 0000000..42e159a --- /dev/null +++ b/pexpect/examples/ftp.py @@ -0,0 +1,30 @@ +#!/usr/bin/env python +'''This connects to an ftp site; does a few ftp stuff; and then gives the user interactive control over the session. +''' +import pexpect +import sys + +child = pexpect.spawn('/usr/bin/ftp ftp.openbsd.org') +child.expect('Name .*: ') +child.sendline('anonymous') +child.expect('Password:') +child.sendline('noah@noah.org') +child.expect('ftp> ') +child.sendline('cd /pub/OpenBSD/2.9/packages/i386') +child.expect('ftp> ') +child.sendline('bin') +child.expect('ftp> ') +child.sendline('prompt') +child.expect('ftp> ') +child.sendline('pwd') +child.expect('ftp> ') +print("Escape character is '^]'.\n") +sys.stdout.write (child.matched) +sys.stdout.flush() +child.interact() # Escape character defaults to ^] + +if child.isAlive(): + child.sendline('bye') + child.kill(1) +print 'Is Alive: ', child.isAlive() + diff --git a/pexpect/examples/passmass.py b/pexpect/examples/passmass.py new file mode 100755 index 0000000..e6f4712 --- /dev/null +++ b/pexpect/examples/passmass.py @@ -0,0 +1,71 @@ +#!/usr/bin/env python +'''Change passwords on the named machines. + passmass host1 host2 host3 . . . +Note that login shell prompt on remote machine must end in # or $. +''' + +import pexpect +import sys, getpass + +USAGE = '''passmass host1 host2 host3 . . .''' +SHELL_PROMPT = '[#\$] ' + +def login(host, user, password): + child = pexpect.spawn('ssh %s@%s'%(user, host)) + child.expect('password:') + child.sendline(password) + i = child.expect(['Permission denied', SHELL_PROMPT, 'Terminal type']) + if i == 0: + print 'Permission denied on host:', host + return None + elif i == 2: + child.sendline('vt100') + i = child.expect('[#\$] ') + return child + +def change_password(child, user, oldpassword, newpassword): + child.sendline('passwd %s'%user) + i = child.expect(['Old password', 'New password']) + # Root does not require old password, so it gets to bypass the next step. + if i == 0: + child.sendline(oldpassword) + child.expect('New password') + child.sendline(newpassword) + i = child.expect(['New password', 'Retype new password']) + if i == 0: + print 'Host did not like new password. Here is what it said...' + print child.before + child.sendline('') # This should tell remote passwd command to quit. + return + child.sendline(newpassword) + +def main(): + if len(sys.argv) <= 1: + print USAGE + return 1 + + user = raw_input('Username: ') + password = getpass.getpass('Current Password: ') + newpassword = getpass.getpass('New Password: ') + newpasswordconfirm = getpass.getpass('Confirm New Password: ') + if newpassword != newpasswordconfirm: + print 'New Passwords do not match.' + return 1 + + for host in sys.argv[1:]: + child = login(host, user, password) + if child == None: + print 'Could not login to host:', host + continue + print 'Changing password on host:', host + change_password(e, user, password, newpassword) + child.expect(SHELL_PROMPT) + child.sendline('exit') + +if __name__ == '__main__': + try: + main() + except ExceptionPypect, e: + print str(e) + + diff --git a/pexpect/examples/python.py b/pexpect/examples/python.py new file mode 100755 index 0000000..9ba0d61 --- /dev/null +++ b/pexpect/examples/python.py @@ -0,0 +1,22 @@ +#!/usr/bin/env python +'''This starts the python interpreter; captures the startup message; then gives the user interactive control over the session. +Why? +''' + +# Don't do this unless you like being John Malkovich +# c = pexpect.spawn ('/usr/bin/env python ./python.py') + +import pexpect +c = pexpect.spawn ('/usr/bin/env python') +c.expect ('>>>') +print 'And now for something completely different...' +f = lambda s:s and f(s[1:])+s[0] # Makes a function to reverse a string. +print f(c.before) +print 'Yes, it\'s python, but it\'s backwards.' +print +print 'Escape character is \'^]\'.' +print c.matched, +c.interact() +c.kill(1) +print 'isAlive:', c.isAlive() + diff --git a/pexpect/examples/sshls.py b/pexpect/examples/sshls.py new file mode 100755 index 0000000..fb37976 --- /dev/null +++ b/pexpect/examples/sshls.py @@ -0,0 +1,20 @@ +#!/usr/bin/env python +'''This runs "ls -l" on a remote host using SSH. + At the prompts enter hostname, user, and password. +''' +import pexpect +import getpass + +host = raw_input('Hostname: ') +user = raw_input('User: ') +password = getpass.getpass('Password: ') + +child = pexpect.spawn("/usr/bin/ssh -l %s %s /bin/ls -l"%(user, host)) + +child.expect('password:') +child.sendline(password) + +child.expect_eof() + +print child.before + diff --git a/pexpect/examples/step.py b/pexpect/examples/step.py new file mode 100755 index 0000000..f117667 --- /dev/null +++ b/pexpect/examples/step.py @@ -0,0 +1,28 @@ +#!/usr/bin/env python + +# This single steps through a log file. + +import tty, termios, sys + +def getkey(): + file = sys.stdin.fileno() + mode = termios.tcgetattr(file) + try: + tty.setraw(file, termios.TCSANOW) + ch = sys.stdin.read(1) + finally: + termios.tcsetattr(file, termios.TCSANOW, mode) + return ch + +fin = open ('log', 'rb') +fout = open ('log2', 'wb') + +while 1: + foo = fin.read(1) + if foo == '': + sys.exit(0) + sys.stdout.write(foo) + getkey() + fout.write (foo) + fout.flush() + diff --git a/pexpect/fsm.py b/pexpect/fsm.py new file mode 100755 index 0000000..311a335 --- /dev/null +++ b/pexpect/fsm.py @@ -0,0 +1,233 @@ +#!/usr/bin/env python +'''This module implements a Finite State Machine (FSM) with one stack. +The FSM is fairly simple. It is useful for small parsing tasks. +The addition of a stack makes it much simpler to build tiny parsers. + +The FSM is an association of + (input_symbol, current_state) --> (action, next_state) +When the FSM matches the pair (input_symbol, current_state) +it will call the associated action and then set the next state. +The action will be passed input_symbol, current state, and a stack. +''' + +class ANY: + '''This is a meta key. This is a class, but you use it like a value. + Example: x = ANY + Example: f.add_transaction (ANY, 'SOMESTATE', None, 'OTHERSTATE') + ''' + pass + +class FSM: + '''This class is a Finite State Machine (FSM) with one stack. + You set up a state transition table which is + The FSM is an association of + (input_symbol, current_state) --> (action, next_state) + When the FSM matches a pair (current_state, input_symbol) + it will call the associated action + The action is a function reference defined with a signature like this: + def a (input_symbol, fsm): + and pass as parameters the current state, the input symbold, and a stack. + As an additional aid a stack is given. + The stack is really just a list. + The action function may produce output and update the stack. + ''' + + def __init__(self, initial_state = None): + self.state_transitions = {} # Map (input_symbol, state) to (action, next_state). + self.default_transition = None + self.initial_state = initial_state + self.current_state = self.initial_state + self.stack = [] + + def push (self, v): + '''This pushes a value onto the stack.''' + self.stack.append (v) + def pop (self): + '''This pops a value off the stack and returns the value.''' + return self.stack.pop () + + def reset (self): + '''This clears the stack and resets the current_state to the initial_state. + ''' + self.current_state = self.initial_state + self.stack = [] + + def add_default_transition (self, action, next_state): + '''This sets the default transition. + If the FSM cannot match the pair (input_symbol, current_state) + in the transition table then this is the transition that + will be returned. This is useful for catching errors and undefined states. + The default transition can be removed by calling + add_default_transition (None, None) + If the default is not set and the FSM cannot match + the input_symbol and current_state then it will + raise an exception (see process()). + ''' + if action == None and next_state == None: + self.default_transition = None + else: + self.default_transition = (action, next_state) + + def add_transition (self, input_symbol, state, action, next_state): + '''This adds an association between inputs and outputs. + (input_symbol, current_state) --> (action, next_state) + The action may be set to None. + The input_symbol may be set to None. + ''' + self.state_transitions[(input_symbol, state)] = (action, next_state) + + def add_transition_list (self, list_input_symbols, state, action, next_state): + '''This adds lots of the same transitions for different input symbols. + You can pass a list or a string. Don't forget that it is handy to use + string.digits, string.letters, etc. to add transitions that match + those character classes. + ''' + for input_symbol in list_input_symbols: + self.add_transition (input_symbol, state, action, next_state) + + def get_transition (self, input_symbol, state): + '''This tells what the next state and action would be + given the current state and the input_symbol. + This returns (action, new state). + This does not update the current state + nor does it trigger the output action. + If the transition is not defined and the default state is defined + then that will be used; otherwise, this throws an exception. + ''' + if self.state_transitions.has_key((input_symbol, self.current_state)): + return self.state_transitions[(input_symbol, self.current_state)] + elif self.state_transitions.has_key ((ANY, self.current_state)): + return self.state_transitions[(ANY, self.current_state)] + elif self.default_transition != None: + return self.default_transition + else: + raise Exception ('Transition is undefined.') + + def process (self, input_symbol): + '''This causes the fsm to change state and call an action. + (input_symbol, current_state) --> (action, next_state) + If the action is None then the action is not called and + only the current state is changed. + ''' + (action, next_state) = self.get_transition (input_symbol, self.current_state) + if action != None: + apply (action, (input_symbol, self) ) + self.current_state = next_state + + def process_string (self, s): + for c in s: + self.process (c) + +################################################################### +# The following is a test of the FSM +# +# This is not a real XML validator. It ignores character sets, +# entity and character references, and attributes. +# But it does check the tree structure and +# can tell if the XML input is generally well formed or not. +#################################################################### +XML_TEST_DATA = '''<?xml version="1.0"?> +<graph> + <att name="directed" value="1" /> + <att name="mode" value="FA" /> + <att name="start" value="q0" /> + <node id="0" label="q0"> + <graphics x="-172.0" y="13.0" z="0.0" /> + </node> + <node id="1" label="q1"> + <graphics type="hexagon" x="10.0" y="74.0" z="-0.0"/> + </node> + <node id="2" label="q2" accept="1"> + <graphics x="169.0" y="-5.0" z="-0.0" /> + </node> + <node id="3" label="q3" > + <graphics x="19.0" y="8.0" z="-0.0" /> + </node> + <node id="4" label="q4"> + <graphics x="18.0" y="-74.0" z="-0.0" /> + </node> + <edge source="0" target="1" label="ab"/> + <edge source="1" target="2" label="aa"/> + <edge source="2" target="2" label="c"/> + <edge source="0" target="3" label="ba"/> + <edge source="3" target="2" label="aa"/> + <edge source="0" target="4" label="a"/> + <edge source="4" target="2" label="c" /> + <edge source="2" target="4" label="ab" /> +</graph> +''' +def Error (input_symbol, fsm): + print 'UNDEFINED: %s, %s -- RESETTING' % (input_symbol, fsm.state) + fsm.reset() +def StartBuildTag (input_symbol, fsm): + fsm.push (input_symbol) +def BuildTag (input_symbol, fsm): + s = fsm.pop () + s = s + input_symbol + fsm.push (s) +def DoneBuildTag (input_symbol, fsm): + pass +def DoneEmptyElement (input_symbol, fsm): + s = fsm.pop() + print s +def StartBuildEndTag (input_symbol, fsm): + fsm.push (input_symbol) +def BuildEndTag (input_symbol, fsm): + s = fsm.pop () + s = s + input_symbol + fsm.push (s) +def DoneBuildEndTag (input_symbol, fsm): + s1 = fsm.pop () + s2 = fsm.pop () + if s1 == s2: + print s1 + else: + print 'Not valid XML.' + +def test(): + f = FSM('INIT') + f.add_default_transition (Error, 'INIT') + f.add_transition ('<', 'INIT', None, 'TAG') + f.add_transition (ANY, 'INIT', None, 'INIT') # Ignore white space between tags + + f.add_transition ('?', 'TAG', None, 'XML_DECLARATION') + f.add_transition (ANY, 'XML_DECLARATION', None, 'XML_DECLARATION') + f.add_transition ('?', 'XML_DECLARATION', None, 'XML_DECLARATION_END') + f.add_transition ('>', 'XML_DECLARATION_END', None, 'INIT') + + # Handle building tags + f.add_transition (ANY, 'TAG', StartBuildTag, 'BUILD_TAG') + f.add_transition (ANY, 'BUILD_TAG', BuildTag, 'BUILD_TAG') + f.add_transition (' ', 'BUILD_TAG', None, 'ELEMENT_PARAMETERS') + f.add_transition ('/', 'TAG', None, 'END_TAG') + f.add_transition ('/', 'BUILD_TAG', None, 'EMPTY_ELEMENT') + f.add_transition ('>', 'BUILD_TAG', DoneBuildTag, 'INIT') + + # Handle element parameters + f.add_transition ('>', 'ELEMENT_PARAMETERS', DoneBuildTag, 'INIT') + f.add_transition ('/', 'ELEMENT_PARAMETERS', None, 'EMPTY_ELEMENT') + f.add_transition ('"', 'ELEMENT_PARAMETERS', None, 'DOUBLE_QUOTE') + f.add_transition (ANY, 'ELEMENT_PARAMETERS', None, 'ELEMENT_PARAMETERS') + + # Handle quoting inside of parameter lists + f.add_transition (ANY, 'DOUBLE_QUOTE', None, 'DOUBLE_QUOTE') + f.add_transition ('"', 'DOUBLE_QUOTE', None, 'ELEMENT_PARAMETERS') + + # Handle empty element tags + f.add_transition ('>', 'EMPTY_ELEMENT', DoneEmptyElement, 'INIT') + + # Handle end tags + f.add_transition (ANY, 'END_TAG', StartBuildEndTag, 'BUILD_END_TAG') + f.add_transition (ANY, 'BUILD_END_TAG', BuildEndTag, 'BUILD_END_TAG') + f.add_transition ('>', 'BUILD_END_TAG', DoneBuildEndTag, 'INIT') + + f.process_string (XML_TEST_DATA) + + if len(f.stack) == 0: + print 'XML file is valid.' + else: + print 'XML file is not valid. Stack is not empty.' + print f.stack + +if __name__ == '__main__': + test () diff --git a/pexpect/pexpect.py b/pexpect/pexpect.py new file mode 100644 index 0000000..f0b219d --- /dev/null +++ b/pexpect/pexpect.py @@ -0,0 +1,592 @@ +''' +Pexpect is a Python module for spawning child applications; +controlling them; and responding to expected patterns in their output. +Pexpect can be used for automating interactive applications such as +ssh, ftp, passwd, telnet, etc. It can be used to a automate setup scripts +for duplicating software package installations on different servers. +It can be used for automated software testing. Pexpect is in the spirit of +Don Libes' Expect, but Pexpect is pure Python. Other Expect-like +modules for Python require TCL and Expect or require C extensions to +be compiled. Pexpect does not use C, Expect, or TCL extensions. It +should work on any platform that supports the standard Python pty +module. The Pexpect interface focuses on ease of use so that simple +tasks are easy. + +Pexpect is Open Source, free, and all that stuff. +License: Python Software Foundation License + http://www.opensource.org/licenses/PythonSoftFoundation.html + +Noah Spurrier +2002 +''' +import select +import signal +import os, sys +import errno +import time +import pty +import tty +import termios +import fcntl +import traceback +import re +import struct +from types import * + + +# Exception classes used by this module. +class ExceptionPexpect(Exception): + '''Base class for all exceptions raised by this module.''' + def __init__(self, value): + self.value = value + def __str__(self): + return `self.value` +class EOF(ExceptionPexpect): + '''Raised when EOF is read from a child.''' +class TIMEOUT(ExceptionPexpect): + '''Raised when a read time exceeds the timeout.''' +##class MAXBUFFER(ExceptionPexpect): +## '''Raised when a scan buffer fills before matching an expected pattern.''' + +class spawn: + '''This is the main class interface for Pexpect. Use this class to + start and control child applications. + ''' + + def __init__(self, command): + '''This is the constructor. The command parameter is a string + that includes the path and any arguments to the command. For + example: + p = pexpect.spawn ('/usr/bin/ftp') + p = pexpect.spawn ('/bin/ls -latr /tmp') + p = pexpect.spawn ('/usr/bin/ssh some@host.com') + After this the child application will be created and + will be ready for action. See expect() and send()/sendline(). + ''' + ### This is not strictly correct since pty is not POSIX (Alas!). + ### Instead I should check for a working pty or something... + if os.name != 'posix': + raise OSError, 'This operating system is not supported: %s'%os.name + + self.STDIN_FILENO = sys.stdin.fileno() + self.STDOUT_FILENO = sys.stdout.fileno() + self.STDERR_FILENO = sys.stderr.fileno() + + ### IMPLEMENT THIS FEATURE!!! + self.maxbuffersize = 10000 + # anything before maxsearchsize point is preserved, but not searched. + self.maxsearchsize = 1000 + + self.timeout = 30.0 # Seconds + self.child_fd = -1 + self.pid = None + self.log_fd = -1 + + self.before = None + self.matched = None + + self.command = command + + self.__spawn() + + def __spawn(self): + '''This starts the given command in a child process. This does + all the fork/exec type of stuff for a pty. This is called by + __init__. The args parameter is a list, command is a string. + ''' + # The pid and child_fd of this object get set by this method. + # Note that it is difficult for this method to fail. + # You cannot detect if the child process cannot start. + # So the only way you can tell if the child process started + # or not is to try to read from the file descriptor. If you get + # EOF immediately then it means that the child is already dead. + # That may not necessarily be bad, because you may spawn a child + # that performs some operator, creates no stdout output, and then dies. + # It is a fuzzy edge case. Any child process that you are likely to + # want to interact with Pexpect would probably not fall into this + # category. + # FYI, This essentially does a fork/exec operation. + + assert self.pid == None, 'The pid member is not None.' + assert self.command != None, 'The command member is None.' + + command_line = split_command_line(self.command) + assert which (command_line[0]) != None, 'The command was not found or was not executable.' + + # This is necessary for isAlive() to work. Without this there is + # no portable way to tell if a child process is a zombie. + # Checking waitpid with WNOHANG option does not work and + # checking waitpid without it would block if the child is not a zombie. + # With this children should exit completely without going into + # a zombie state. Note that some UNIX flavors may send the signal + # before the child's pty output buffer is empty, while others + # may send the signal only when the buffer is empty. + # In the later case, isAlive() will always return true until the + # output buffer is empty. Use expect_eof() to consume all child output. + # This is not the same as the Zombie (waiting to die) problem. + signal.signal(signal.SIGCHLD, signal.SIG_IGN) + + try: + self.pid, self.child_fd = pty.fork() + except OSError, e: + raise ExceptionPexpect(str(e) + '\nPexpect: pty.fork() failed. ' + + 'Out of pty devices or this platform ' + + 'does not properly support pty.fork().') + + if self.pid == 0: # Child + setwinsize(24, 80) + os.execvp(command_line[0], command_line) + raise ExceptionPexpect ('Reached an unexpected state in __spawn().') + + # Parent + + def fileno (): + '''This returns the file descriptor of the pty for the child.''' + return child_fd + + def log_open (self, filename): + '''This opens a log file. All data read from the child + application will be written to the log file. + This is very useful to use while creating scripts. + You can use this to figure out exactly what the child + is sending. + ''' + self.log_fd = os.open (filename, O_APPEND | O_CREAT) + + def log_close (self): + '''This closes the log file opened by log(). + ''' + os.close (self.log_fd) + self.log_fd = -1 + + def expect(self, pattern, local_timeout = None): + '''This seeks through the stream looking for the given + pattern. The 'pattern' can be a string or a list of strings. + The strings are regular expressions. This returns the index + into the pattern list or None if error. Afterwards the + instance attributes 'before' and 'matched' will be set. You + can read the data that was matched by the pattern in + 'matched'. You can read all the data read before the match in + 'before'. + ''' + if local_timeout == None: + local_timeout = self.timeout + + compiled_pattern_list = [] + if type(pattern)is StringType: + compiled_pattern_list = [re.compile(pattern)] + elif type(pattern)is ListType: + compiled_pattern_list = [re.compile(x)for x in pattern] + else: + raise TypeError, 'Pattern argument is not a string or list of strings.' + + return self.expect_list(compiled_pattern_list, local_timeout) + + + def expect_list(self, re_list, local_timeout = None): + '''This is called by expect(). This takes a list of compiled + regular expressions. This returns the matched index into the + re_list. + ''' + matched_pattern = None + before_pattern = None + index = None + + try: + done = 0 + incoming = '' + while not done: # Keep reading until done. + c = self.read(1, local_timeout) + incoming = incoming + c + + # Sequence through the list of patterns and look for a match. + index = 0 + for cre in re_list: + match = cre.search(incoming) + if match is not None: + matched_pattern = incoming[match.start(): match.end()] + before_pattern = incoming[: match.start()] + done = 1 + break + else: + index = index + 1 + except Exception, e: + ### Here I should test if the client wants to pass exceptions, or + ### to return some state flag. Exception versus return value. + matched_pattern = None + before_pattern = incoming + index = -1 + raise + + self.before = before_pattern + self.matched = matched_pattern + return index #before_pattern, matched_pattern, index + + def expect_eof(self, local_timeout = None): + '''This reads from the child until the end of file is found. + ''' + foo = """ if partial=='': ### self.flag_eof: + flag_eof = 1 ### Should not need this if self.flag_eof is used. + index = None + matched_pattern = None + done = 1 + break + """ + matched_pattern = None + before_pattern = None + index = None + + try: + done = 0 + incoming = '' + while not done: + c = self.read(1, local_timeout) + incoming = incoming + c + except EOF, e: + matched_pattern = '' + before_pattern = incoming + index = 1 + + self.before = before_pattern + self.matched = matched_pattern + return index + + def write(self, text): + '''This is an alias for send().''' + self.send (text) + + def send(self, text): + '''This sends a string to the child process. + ''' + ### Add code so that an empty string will send an EOF. + ### This emulates the symantics of Libes Expect. + ### Hmmm... how do I send an EOF? + ###C if ((m = write(pty, *buf, p - *buf)) < 0) + ###C return (errno == EWOULDBLOCK) ? n : -1; + + try: + if text == '': + pass ### Do something someday, like send an EOF. + os.write(self.child_fd, text) + except Exception, e: + msg = 'Exception caught in send():' + str(e) + '\n' + raise ExceptionPexpect(msg) + + def sendline(self, text): + '''This is like send(), but it adds a line separator. + ''' + self.send(text) + self.send(os.linesep) + + def read(self, n, timeout = None): + '''This reads up to n characters from the child application. + It includes a timeout. If the read does not complete within the + timeout period then a TIMEOUT exception is raised. + If the end of file is read then an EOF exception will be raised. + If a log file was opened using log_open() then all data will + also be written to the log file. + + Note that if this method is called with timeout=None + then it actually may block. + This is a non-blocking wrapper around os.read(). + It uses select.select() to supply a timeout. + ''' + r, w, e = select.select([self.child_fd], [], [], timeout) + if not r: + raise TIMEOUT('Timeout exceeded in read().') + + if self.child_fd in r: + try: + s = os.read(self.child_fd, n) + except OSError, e: + self.flag_eof = 1 + raise EOF('End Of File (EOF) in read(). Exception style platform.') + if s == '': + self.flag_eof = 1 + raise EOF('End Of File (EOF) in read(). Empty string style platform.') + + if self.log_fd != -1: + os.write (self.log_fd, s) + + return s + + raise ExceptionPexpect('Reached an unexpected state in read().') + + + def isAlive(self): + '''This tests if the child process is running or not. + It returns 1 if the child process appears to be running or + 0 if not. This checks the process list to see if the pid is + there. In theory, the original child could have died and the + pid could have been reused by some other process. This is + unlikely, but I can find no portable way to make sure. + Also, this is not POSIX portable way to check, but + UNIX provides no standard way to test if a given pid is + running or not. By convention most modern UNIX systems will + respond to signal 0. + ''' + try: + self.kill(0) + return 1 + except OSError, e: + return 0 + ###return e.errno == errno.EPERM + ### For some reason I got this exception printed even though + ### I am explicitly catching OSError. Noah doth halucinate? + ### OSError: [Errno 3] No such process + + def kill(self, sig): + '''This sends the given signal to the child application. + In keeping with UNIX tradition it has a misleading name. + It does not necessarily kill the child unless + you send the right signal. + ''' + # Same as os.kill, but the pid is given for you. + os.kill(self.pid, sig) + + def interact(self, escape_character = chr(29)): + '''This gives control of the child process to the interactive user. + Keystrokes are sent to the child process, and the stdout and stderr + output of the child process is printed. + When the user types the escape_character this method will stop. + The default for escape_character is ^] (ASCII 29). + This simply echos the child stdout and child stderr to the real + stdout and it echos the real stdin to the child stdin. + ''' + mode = tty.tcgetattr(self.STDIN_FILENO) + tty.setraw(self.STDIN_FILENO) + try: + self.__interact_copy(escape_character) + finally: + tty.tcsetattr(self.STDIN_FILENO, tty.TCSAFLUSH, mode) + + def __interact_writen(self, fd, data): + '''This is used by the interact() method. + ''' + ### This is stupid. It's a deadlock waiting to happen. + ### I can't check isAlive due to problems with OpenBSD handling. + ### I can't think of a safe way to handle this. + while data != '': + n = os.write(fd, data) + data = data[n:] + def __interact_read(self, fd): + '''This is used by the interact() method. + ''' + return os.read(fd, 1000) + def __interact_copy(self, escape_character = None): + '''This is used by the interact() method. + ''' + while self.isAlive(): + r, w, e = select.select([self.child_fd, self.STDIN_FILENO], [], []) + if self.child_fd in r: + data = self.__interact_read(self.child_fd) + os.write(self.STDOUT_FILENO, data) + if self.STDIN_FILENO in r: + data = self.__interact_read(self.STDIN_FILENO) + self.__interact_writen(self.child_fd, data) + if escape_character in data: + break + + +## def send_human(self, text, delay_min = 0, delay_max = 1): +## pass +## def spawn2(self, command, args): +## '''return pid, fd_stdio, fd_stderr +## ''' +## pass +## def expect_ex(self, string_match, local_timeout = None): +## '''This is like expect(), except that instead of regular expression patterns +## it matches on exact strings. +## ''' +## pass +## # Return (data_read) + + +def which (filename): + '''This takes a given filename and tries to find it in the + environment path and check if it is executable. + ''' + + # Special case where filename already contains a path. + if os.path.split(filename)[0] != '': + if os.access (filename, os.X_OK): + return filename + + if not os.environ.has_key('PATH') or os.environ['PATH'] == '': + p = os.defpath + else: + p = os.environ['PATH'] + + pathlist = p.split (os.pathsep) + + for path in pathlist: + f = os.path.join(path, filename) + if os.access(f, os.X_OK): + return f + return None + +def setwinsize(r, c): + '''This sets the windowsize of the tty for stdout. + This does not change the physical window size. + It changes the size reported to TTY-aware applications like + vi or curses. In other words, applications that respond to the + SIGWINCH signal. + This is used by __spawn to set the tty window size of the child. + ''' + # Assume ws_xpixel and ws_ypixel are zero. + s = struct.pack("HHHH", r, c, 0, 0) + x = fcntl.ioctl(sys.stdout.fileno(), termios.TIOCSWINSZ, s) + +def split_command_line(command_line): + '''This splits a command line into a list of arguments. + It splits arguments on spaces, but handles + embedded quotes, doublequotes, and escaped characters. + I couldn't do this with a regular expression, so + I wrote a little state machine to parse the command line. + ''' + arg_list = [] + arg = '' + state_quote = 0 + state_doublequote = 0 + state_esc = 0 + for c in command_line: + if c == '\\': # Escape the next character + state_esc = 1 + if c == r"'": # Handle single quote + if state_esc: + state_esc = 0 + elif not state_quote: + state_quote = 1 + else: + state_quote = 0 + if c == r'"': # Handle double quote + if state_esc: + state_esc = 0 + elif not state_doublequote: + state_doublequote = 1 + else: + state_doublequote = 0 + + # Add arg to arg_list unless in some other state. + if c == ' 'and not state_quote and not state_doublequote and not state_esc: + arg_list.append(arg) + arg = '' + else: + arg = arg + c + if c != '\\'and state_esc: # escape mode lasts for one character. + state_esc = 0 + + # Handle last argument. + if arg != '': + arg_list.append(arg) + return arg_list + +#################### +# +# NOTES +# +#################### + +# If you just want the object then +# import pexpect +# If you want the static mathods too then +# from pexpect import * +# + +# Reason for double fork: +#http://www.erlenstar.demon.co.uk/unix/faq_2.html#SEC15 +# Reason for ptys: +# http://www.erlenstar.demon.co.uk/unix/faq_4.html#SEC52 + +# Nonblocking on Win32? +# Reasearch this as a way to maybe make pipe work for Win32. +# http://groups.google.com/groups?q=setraw+tty&hl=en&selm=uvgpvisvk.fsf%40roundpoint.com&rnum=7 +# +# if istty: +# if os.name=='posix': +# import tty +# tty.setraw(sys.stdin.fileno()) +# elif os.name=='nt': +# import win32file, win32con +# hstdin = win32file._get_osfhandle(sys.stdin.fileno()) +# modes = (win32file.GetConsoleMode(hstdin) +# & ~(win32con.ENABLE_LINE_INPUT +# |win32con.ENABLE_ECHO_INPUT)) +# win32file.SetConsoleMode(hstdin, modes) + +# Basic documentation: +# Explain use of lists of patterns and return index. +# Explain exceptions for non-handled special cases like EOF +# Advanced documentation: +# Explain how patterns can be associated with actions. +# Can I Do this without changing interface. + +# Test bad fork +# Test ENOENT. In other words, no more TTY devices. + +#GLOBAL_SIGCHLD_RECEIVED = 0 +#def childdied (signum, frame): +# print 'Signal handler called with signal', signum +# frame.f_globals['pexpect'].GLOBAL_SIGCHLD_RECEIVED = 1 +# print str(frame.f_globals['pexpect'].GLOBAL_SIGCHLD_RECEIVED) +# GLOBAL_SIGCHLD_RECEIVED = 1 + +### Add a greedy read -- like a readall() to keep reading until a +# timeout is returned. Will e.expect('') work? or e.expect(None)? + +### Weird bug. If you read too fast after doing a sendline() +# Sometimes you will read the data back that you just sent even if +# the child did not echo the data. This is particularly a problem if +# you send a password. + + +##class PushbackReader: +## '''This class is a wrapper around os.read. It adds the features of buffering +## to allow push-back of data and to provide a timeout on a read. +## ''' +## def __init__(self, file_descriptor): +## self.fd = file_descriptor +## self.buffer = '' +## +## def read(self, n, timeout = None): +## '''This does a read restricted by a timeout and +## it includes any cached data from previous calls. +## This is a non-blocking wrapper around os.read. +## it uses select.select to supply a timeout. +## Note that if this is called with timeout=None (the default) +## then this actually MAY block. +## ''' +## # The read() call is a problem. +## # Some platforms return an empty string '' at EOF. +## # Whereas other platforms raise an Input/output exception. +## +## avail = len(self.buffer) +## if n > avail: +## result = self.buffer +## n = n-avail +## else: +## result = self.buffer[: n] +## self.buffer = self.buffer[n:] +## +## r, w, e = select.select([self.fd], [], [], timeout) +## if not r: +## self.flag_timeout = 1 +## raise TIMEOUT('Read exceeded time: %d'%timeout) +## +## if self.fd in r: +## try: +## s = os.read(self.fd, n) +## except OSError, e: +## self.flag_eof = 1 +## raise EOF('Read reached End Of File (EOF). Exception platform.') +## if s == '': +## self.flag_eof = 1 +## raise EOF('Read reached End Of File (EOF). Empty string platform.') +## return s +## +## self.flag_error = 1 +## raise ExceptionPexpect('PushbackReader.read() reached an unexpected state.'+ +## ' There is a logic error in the Pexpect source code.') +## +## def pushback(self, data): +## self.buffer = piece+self.buffer diff --git a/pexpect/pexpect.pyc b/pexpect/pexpect.pyc Binary files differnew file mode 100644 index 0000000..19a9e57 --- /dev/null +++ b/pexpect/pexpect.pyc diff --git a/pexpect/setup.py b/pexpect/setup.py new file mode 100644 index 0000000..c2e4805 --- /dev/null +++ b/pexpect/setup.py @@ -0,0 +1,13 @@ +from distutils.core import setup +setup (name='pexpect', + version='0.8', + py_modules=['pexpect'], + description='Pexpect, a pure Python Expect allows control of other applications.', + author='Noah Spurrier', + author_email='noah@noah.org', + url='http://www.noah.org/python/pexpect/', + license='Python Software Foundation License', + platforms='UNIX' + ) + + diff --git a/pexpect/sf.net.ssh b/pexpect/sf.net.ssh new file mode 100644 index 0000000..2e71c9f --- /dev/null +++ b/pexpect/sf.net.ssh @@ -0,0 +1,8 @@ +alias sf="ssh noah@use-pr-shell1.sourceforge.net" +export CVS_RSH=ssh +alias cvssf="cvs -z3 -d:ext:noah@cvs.pexpect.sourceforge.net:/cvsroot/pexpect" +alias cvsco="cvs -z3 -d:ext:noah@cvs.pexpect.sourceforge.net:/cvsroot/pexpect co pexpect" +alias cvscommit="cvs -z3 -d:ext:noah@cvs.pexpect.sourceforge.net:/cvsroot/pexpect commit" +alias cvsadd="cvs -z3 -d:ext:noah@cvs.pexpect.sourceforge.net:/cvsroot/pexpect add" +#ssh noah@use-pr-shell1.sourceforge.net "cd htdocs;rm index.html;wget http://www.noah.org/python/pexpect/index.html" +#ssh noah@use-pr-shell1.sourceforge.net "cd htdocs;tar zxvf doc.tgz" diff --git a/pexpect/tests/__init__.py b/pexpect/tests/__init__.py new file mode 100755 index 0000000..bd1f30c --- /dev/null +++ b/pexpect/tests/__init__.py @@ -0,0 +1,4 @@ +# __init__.py +# The mere presence of this file makes the dir a package. +pass + diff --git a/pexpect/tests/platform_tests/CSIGNALTEST/test.c b/pexpect/tests/platform_tests/CSIGNALTEST/test.c new file mode 100644 index 0000000..86bcc17 --- /dev/null +++ b/pexpect/tests/platform_tests/CSIGNALTEST/test.c @@ -0,0 +1,90 @@ +/* I built this with "gcc -lutil test.c -otest" */ +#include <sys/types.h> /* include this before any other sys headers */ +#include <sys/wait.h> /* header for waitpid() and various macros */ +#include <signal.h> /* header for signal functions */ +#include <stdio.h> /* header for fprintf() */ +#include <unistd.h> /* header for fork() */ +#ifdef LINUX +#include <pty.h> +#else +#include <util.h> /* header for forkpty, compile with -lutil */ +#endif + +void sig_chld(int); /* prototype for our SIGCHLD handler */ + +int main() +{ + struct sigaction act; + int pid; + int fdm; + char slave_name [20]; + + + /* Assign sig_chld as our SIGCHLD handler. + We don't want to block any other signals in this example + We're only interested in children that have terminated, not ones + which have been stopped (eg user pressing control-Z at terminal). + Finally, make these values effective. If we were writing a real + application, we would save the old value instead of passing NULL. + */ + act.sa_handler = sig_chld; + sigemptyset(&act.sa_mask); + act.sa_flags = SA_NOCLDSTOP; + if (sigaction(SIGCHLD, &act, NULL) < 0) + { + fprintf(stderr, "sigaction failed\n"); + return 1; + } + + /* Do the Fork thing. + */ + pid = forkpty (&fdm, slave_name, NULL, NULL); + /* pid = fork(); */ + + switch (pid) + { + case -1: + fprintf(stderr, "fork failed\n"); + return 1; + break; + + case 0: /* Child process. */ + printf ("This child output will cause trouble.\n"); + _exit(7); + break; + + default: /* Parent process. */ + sleep(1); + printf ("Child pid: %d\n", pid); + sleep(10); /* let child finish -- crappy way to avoid race. */ + break; + } + + return 0; +} + +void sig_chld(int signo) +{ + int status, wpid, child_val; + + printf ("In sig_chld signal handler.\n"); + + /* Wait for any child without blocking */ + wpid = waitpid (-1, & status, WNOHANG); + printf ("\tWaitpid found status for pid: %d\n", wpid); + if (wpid < 0) + { + fprintf(stderr, "\twaitpid failed\n"); + return; + } + printf("\tWaitpid status: %d\n", status); + + if (WIFEXITED(status)) /* did child exit normally? */ + { + child_val = WEXITSTATUS(status); + printf("\tchild exited normally with status %d\n", child_val); + } + printf ("End of sig_chld.\n"); +} + + diff --git a/pexpect/tests/platform_tests/test.py b/pexpect/tests/platform_tests/test.py new file mode 100755 index 0000000..f80263b --- /dev/null +++ b/pexpect/tests/platform_tests/test.py @@ -0,0 +1,74 @@ +#!/usr/bin/env python +import signal, os, time, errno, pty + +def signal_handler (signum, frame): + print 'Signal handler called with signal:', signum + print 'signal.SIGCHLD=', signal.SIGKILL + +# First thing we do is set up a handler for SIGCHLD. +signal.signal (signal.SIGCHLD, signal.SIG_IGN) + +print 'PART 1 -- Test signal handling with empty pipe.' +# Create a child process for us to kill. +try: + pid, fd = pty.fork() +except Exception, e: + print str(e) + +if pid == 0: +# os.write (sys.stdout.fileno(), 'This is a test.\n This is a test.') + time.sleep(10000) + +print 'Sending SIGKILL to child pid:', pid +os.kill (pid, signal.SIGKILL) + +# SIGCHLD should interrupt sleep. +# Note that this is a race. +# It is possible that the signal handler will get called +# before we try to sleep, but this has not happened yet. +# But in that case we can only tell by order of printed output. +print 'Entering sleep...' +try: + time.sleep(10) +except: + print 'sleep was interrupted by signal.' + +# Just for fun let's see if the process is alive. +try: + os.kill(pid, 0) + print 'Child is alive. This is ambiguous because it may be a Zombie.' +except OSError, e: + print 'Child appears to be dead.' + +print 'PART 2 -- Test signal handling with full pipe.' +# Create a child process for us to kill. +try: + pid, fd = pty.fork() +except Exception, e: + print str(e) + +if pid == 0: + os.write (sys.stdout.fileno(), 'This is a test.\n This is a test.') + time.sleep(10000) + +print 'Sending SIGKILL to child pid:', pid +os.kill (pid, signal.SIGKILL) + +# SIGCHLD should interrupt sleep. +# Note that this is a race. +# It is possible that the signal handler will get called +# before we try to sleep, but this has not happened yet. +# But in that case we can only tell by order of printed output. +print 'Entering sleep...' +try: + time.sleep(10) +except: + print 'sleep was interrupted by signal.' + +# Just for fun let's see if the process is alive. +try: + os.kill(pid, 0) + print 'Child is alive. This is ambiguous because it may be a Zombie.' +except OSError, e: + print 'Child appears to be dead.' + diff --git a/pexpect/tests/platform_tests/test2.py b/pexpect/tests/platform_tests/test2.py new file mode 100755 index 0000000..9611f39 --- /dev/null +++ b/pexpect/tests/platform_tests/test2.py @@ -0,0 +1,42 @@ +#!/usr/bin/env python +import signal, os, time, errno + +def signal_handler (signum, frame): + print 'Signal handler called with signal:', signum + print 'signal.SIGCHLD=', signal.SIGKILL + +# Create a child process for us to kill. +pid = os.fork() +if pid == 0: + time.sleep(10000) + +#signal.signal (signal.SIGCHLD, signal.SIG_IGN) +signal.signal (signal.SIGCHLD, signal_handler) + +print 'Sending SIGKILL to child pid:', pid +os.kill (pid, signal.SIGKILL) + +# SIGCHLD should interrupt sleep. +# Note that this is a race. +# It is possible that the signal handler will get called +# before we try to sleep, but this has not happened yet. +# But in that case we can only tell by order of printed output. +interrupted = 0 +try: + time.sleep(10) +except: + print 'sleep was interrupted by signal.' + interrupted = 1 + +if not interrupted: + print 'ERROR. Signal did not interrupt sleep.' +else: + print 'Signal interrupted sleep. This is good.' + +# Let's see if the process is alive. +try: + os.kill(pid, 0) + print 'Child is alive. This is ambiguous because it may be a Zombie.' +except OSError, e: + print 'Child appears to be dead.' + diff --git a/pexpect/tests/platform_tests/test_control_terminal.py b/pexpect/tests/platform_tests/test_control_terminal.py new file mode 100755 index 0000000..9598fd7 --- /dev/null +++ b/pexpect/tests/platform_tests/test_control_terminal.py @@ -0,0 +1,19 @@ +#!/usr/bin/env python + +import termios, fcntl, struct, os, sys + +def getwinsize(): + s = struct.pack("HHHH", 0, 0, 0, 0) + x = fcntl.ioctl(sys.stdout.fileno(), termios.TIOCGWINSZ, s) + rows, cols = struct.unpack("HHHH", x)[:2] + return rows, cols + +def setwinsize(r,c): + # Assume ws_xpixel and ws_ypixel are zero. + s = struct.pack("HHHH", r,c,0,0) + x = fcntl.ioctl(sys.stdout.fileno(), termios.TIOCSWINSZ, s) +print 'stdin tty:', os.ttyname(0) +print 'stdout tty:', os.ttyname(1) +print 'controlling terminal:', os.ctermid() +print 'winsize %d,%d' % getwinsize() +print 'ENDTEST' diff --git a/pexpect/tests/platform_tests/test_handler.py b/pexpect/tests/platform_tests/test_handler.py new file mode 100755 index 0000000..d9239b6 --- /dev/null +++ b/pexpect/tests/platform_tests/test_handler.py @@ -0,0 +1,54 @@ +#!/usr/bin/env python +import signal, os, time, errno, pty, sys, fcntl, tty +GLOBAL_SIGCHLD_RECEIVED = 0 + +def nonblock (fd): + # if O_NDELAY is set read() returns 0 (ambiguous with EOF). + # if O_NONBLOCK is set read() returns -1 and sets errno to EAGAIN + original_flags = fcntl.fcntl (fd, fcntl.F_GETFL, 0) + flags = original_flags | os.O_NONBLOCK + fcntl.fcntl(fd, fcntl.F_SETFL, flags) + return original_flags + +def signal_handler (signum, frame): + print '<HANDLER>' + global GLOBAL_SIGCHLD_RECEIVED + status = os.waitpid (-1, os.WNOHANG) + if status[0] == 0: + print 'No process for waitpid:', status + else: + print 'Status:', status + print 'WIFEXITED(status):', os.WIFEXITED(status[1]) + print 'WEXITSTATUS(status):', os.WEXITSTATUS(status[1]) + GLOBAL_SIGCHLD_RECEIVED = 1 + +def main (): + signal.signal (signal.SIGCHLD, signal_handler) + pid, fd = pty.fork() + if pid == 0: + os.write (sys.stdout.fileno(), 'This is a test.\nThis is a test.') + time.sleep(10000) + nonblock (fd) + tty.setraw(fd) #STDIN_FILENO) + print 'Sending SIGKILL to child pid:', pid + time.sleep(2) + os.kill (pid, signal.SIGKILL) + + print 'Entering to sleep...' + try: + time.sleep(2) + except: + print 'Sleep interrupted' + try: + os.kill(pid, 0) + print '\tChild is alive. This is ambiguous because it may be a Zombie.' + except OSError, e: + print '\tChild appears to be dead.' +# print str(e) + print + print 'Reading from master fd:', os.read (fd, 1000) + + + +if __name__ == '__main__': + main () diff --git a/pexpect/tests/platform_tests/test_signals.py b/pexpect/tests/platform_tests/test_signals.py new file mode 100755 index 0000000..cd1a6da --- /dev/null +++ b/pexpect/tests/platform_tests/test_signals.py @@ -0,0 +1,63 @@ +#!/usr/bin/env python +import signal, os, time, errno, pty, sys +GLOBAL_SIGCHLD_RECEIVED = 0 + +def signal_handler (signum, frame): + print '<HANDLER>' + global GLOBAL_SIGCHLD_RECEIVED + status = os.waitpid (-1, os.WNOHANG) + print 'WIFEXITED(status):', os.WIFEXITED(status) + print 'WEXITSTATUS(status):', os.WEXITSTATUS(status) + GLOBAL_SIGCHLD_RECEIVED = 1 + +def main (): +# sig_test ('SIG_IGN', 'ptyfork', 'yes') + sig_test ('handler', 'ptyfork', 'yes') +# sig_test ('SIG_IGN', 'ptyfork', 'no') +# sig_test ('handler', 'ptyfork', 'no') +# sig_test ('SIG_IGN', 'osfork', 'yes') +# sig_test ('handler', 'osfork', 'yes') +# sig_test ('SIG_IGN', 'osfork', 'no') +# sig_test ('handler', 'osfork', 'no') + +def sig_test (sig_handler_type, fork_type, child_output): + print 'Testing with:' + print '\tsig_handler_type:', sig_handler_type + print '\tfork_type:', fork_type + print '\tchild_output:', child_output + + if sig_handler_type == 'SIG_IGN': + signal.signal (signal.SIGCHLD, signal.SIG_IGN) + else: + signal.signal (signal.SIGCHLD, signal_handler) + pid = -1 + fd = -1 + if fork_type == 'ptyfork': + pid, fd = pty.fork() + else: + pid = os.fork() + + if pid == 0: + if child_output == 'yes': + os.write (sys.stdout.fileno(), 'This is a test.\nThis is a test.') + time.sleep(10000) + + #print 'Sending SIGKILL to child pid:', pid + time.sleep(2) + os.kill (pid, signal.SIGKILL) + + #print 'Entering to sleep...' + try: + time.sleep(2) + except: + pass + try: + os.kill(pid, 0) + print '\tChild is alive. This is ambiguous because it may be a Zombie.' + except OSError, e: + print '\tChild appears to be dead.' +# print str(e) + print + +if __name__ == '__main__': + main () diff --git a/pexpect/tests/test_badfork.py b/pexpect/tests/test_badfork.py new file mode 100755 index 0000000..ffe560a --- /dev/null +++ b/pexpect/tests/test_badfork.py @@ -0,0 +1,122 @@ +#!/usr/bin/env python +import select +import os,sys, struct +import errno +import time +import pty, tty, termios, fcntl +import traceback +import re +from types import * + +def main (): + pid, fd = fooork ('aThelaDSjd','-i') + print 'pid', pid + print 'fd', fd + Xexpect(fd, 'bash.*#',10) + os.write(fd, 'scp -P 6666 *.py noah@gw.tiered.com:expyct/\n') + Xexpect(fd, 'bash.*#',10) + os.write(fd, 'exit\n') + print _my_read (fd, 1000, 5) + sys.exit (1) + +def setwinsize(r,c): + # Assume ws_xpixel and ws_ypixel are zero. + s = struct.pack("HHHH", r,c,0,0) + x = fcntl.ioctl(sys.stdout.fileno(), termios.TIOCSWINSZ, s) + +def fooork (command, args): + '''This is fooork -- Foo Ork. Ork foo. + ''' + pid, fd = pty.fork() + if pid == 0: # Child + setwinsize (80,24) + os.execlp (command, command, args) + print 'Well, something went wrong. Here I am after an execlp.' + return (pid, fd) + +def _my_read (fd, n, timeout=None): + '''This is a non-blocking wrapper around os.read. + it uses select.select to supply a timeout. Note that if + this is called with timeout=None (the default) then this + actually MAY block. + ''' + try: + (r,w,e) = select.select ([fd], [], [], timeout) + if not r: + print 'TIMEOUT' + return -1 + if fd in r: + temp = os.read(fd, n) + if temp == '': + #print 'EOF' + pass + return temp + print 'Something weird happened.' + return -2 + except Exception, e: + print 'Exception in _read' + print str(e) + return -3 + +def expect_eof (fd, timeout=None): + pass + +def __expect_posix (fd, re_list, timeout=None): + done = 0 + result = "" + +# blocking (fin) + while not done: + try: + partial = _my_read(fd, 1, timeout) + result = result + partial + index = 0 + for cre in re_list: + match_result = cre.search(result) + if match_result is not None: + done = cre.pattern + matched_pattern = cre.pattern + done = 1 + print 'found!' + else: + index = index + 1 + + #done = 1 + #print 'Weird NOT found!' + except: + done = 1 + print 'NOT found EXCEPT!' +# sys.stderr = sys.stdout + traceback.print_exc() + time.sleep (10) + + #nonblocking (fin) + return result + +def Xexpect (fin, pattern, timeout=None): + """This searches through the stream for a pattern. + The search is non-blocking so this works with pipes. + The input pattern may be a string or a list of strings. + The first pattern matched will cause a return. + The reuturn value is a tuple (i,pat) where i is the + index of the pattern in the list and pat is the + actual matched pattern. If no pattern is matched then + i will be -1 and pat will be None. + """ + + if type(pattern) is ListType: + cpat_list = map (lambda x: re.compile(x[0]), pattern) + elif type(pattern) is StringType: + cpat_list = [ re.compile(pattern) ] + else: + raise TypeError, 'pattern argument is not a string or list.' + + # This is not strictly correct since pty is not POSIX. Alas... + if os.name == 'posix': + return __expect_posix (fin, cpat_list, 10) + else: + raise OSError, 'Pypect will not work with this operating system: "%s".' % os.name + + +if __name__ == '__main__': + main() diff --git a/pexpect/tests/test_command_list_split.py b/pexpect/tests/test_command_list_split.py new file mode 100755 index 0000000..953f859 --- /dev/null +++ b/pexpect/tests/test_command_list_split.py @@ -0,0 +1,19 @@ +import pexpect +import unittest + +class SplitCommandLineTestCase(unittest.TestCase): + #def runTest (self): + def testSplitSizes(self): + assert len(pexpect._split_command_line(r'')) == 0 + assert len(pexpect._split_command_line(r'one')) == 1 + assert len(pexpect._split_command_line(r'one two')) == 2 + assert len(pexpect._split_command_line(r'one\ one')) == 1 + assert len(pexpect._split_command_line('\'one one\'')) == 1 + assert len(pexpect._split_command_line(r'one\"one')) == 1 + assert len(pexpect._split_command_line(r'This\' is a\'\ test')) == 3 + +if __name__ == '__main__': + unittest.main() + +suite = unittest.makeSuite(SplitCommandLineTestCase,'test') + diff --git a/pexpect/tests/test_killed_pid.py b/pexpect/tests/test_killed_pid.py new file mode 100755 index 0000000..ff2fe5e --- /dev/null +++ b/pexpect/tests/test_killed_pid.py @@ -0,0 +1,17 @@ +#!/usr/bin/env python +import os, time, signal +import expyct + +e = expyct.expyct ('/bin/sh', '-i') +print 'pid,fd:', e.pid, e.fd +print 'isAlive:', e.isAlive() +# Treat it brusquely. +print 'sending SIGKILL...' +os.kill (e.pid, signal.SIGKILL) +time.sleep (1) +print os.read(e.fd, 1000) +print 'isAlive:', e.isAlive() +e.expect('\#') +e.send ('ls -la /\n') +r,m,i = e.expect ('\#') +print r diff --git a/pexpect/tests/test_middle_buffer.py b/pexpect/tests/test_middle_buffer.py new file mode 100755 index 0000000..ccb8302 --- /dev/null +++ b/pexpect/tests/test_middle_buffer.py @@ -0,0 +1,30 @@ +#!/usr/bin/env python2 +import expyct +import time + +e = expyct.expyct ('/bin/sh -i') +e.timeout=60 +e.expect(['#', '\$']) +e.send ('ls -la /\n') + +i = e.expect (['foo','(d[aeiou]v)']) +print '\nRead before match>%s<' % e.before +print 'Matched:>%s<' % e.matched +print 'index:', i + +i = e.expect(['#', '\$']) +print '\nRead before match>%s<' % e.before +print 'Matched:>%s<' % e.matched +print 'index:', i +e.send('exit\n') +print 'Sent exit' +time.sleep(2) +print 'isAlive:', e.isAlive() + +# This should test timeout... +i = e.expect ('#####') +print '\nRead before match>%s<' % e.before +print 'Matched:>%s<' % e.matched +print 'index:', i + + diff --git a/pexpect/tests/test_missing_command.py b/pexpect/tests/test_missing_command.py new file mode 100755 index 0000000..39a0028 --- /dev/null +++ b/pexpect/tests/test_missing_command.py @@ -0,0 +1,17 @@ +import pexpect +import unittest + +class MissingCommandTestCase (unittest.TestCase): + def testMissingCommand(self): + try: + i = pexpect.spawn ('ZXQYQZX') + except Exception: + pass + else: + fail('expected an Exception') + +if __name__ == '__main__': + unittest.main() + +suite = unittest.makeSuite(MissingCommandTestCase,'test') + diff --git a/pexpect/tests/test_read.py b/pexpect/tests/test_read.py new file mode 100755 index 0000000..1e16ed4 --- /dev/null +++ b/pexpect/tests/test_read.py @@ -0,0 +1,35 @@ +import os, sys + +filename = os.tmpnam() +print 'filename:', filename + +fd_out = os.open(filename, os.O_CREAT | os.O_WRONLY) +print 'fd_out:', fd_out +os.write (fd_out, 'This is a test.\n') +os.close(fd_out) +print +print 'testing read on good fd...' +fd_in = os.open (filename, os.O_RDONLY) +print 'fd_in:', fd_in +while 1: + data_in = os.read(fd_in, 1) + print 'data_in:', data_in + if data_in == '': + print 'data_in was empty' + break #sys.exit(1) +os.close(fd_in) +print +print +print 'testing read on closed fd...' +fd_in = os.open ('test_read.py', os.O_RDONLY) +print 'fd_in:', fd_in +while 1: + data_in = os.read(fd_in, 1) + print 'data_in:', data_in + if data_in == '': + print 'data_in was empty' + break +os.close(fd_in) +d = os.read(fd_in, 1) # fd_in should be closed now... +if s == '': + print 'd is empty. good.' diff --git a/pexpect/tests/test_signals.py b/pexpect/tests/test_signals.py new file mode 100755 index 0000000..734593a --- /dev/null +++ b/pexpect/tests/test_signals.py @@ -0,0 +1,45 @@ +#!/usr/bin/env python +import signal, os, time, errno + +def signal_handler (signum, frame): + print 'Signal handler called with signal:', signum + print 'signal.SIGCHLD=', signal.SIGKILL + + +# First thing we do is set up a handler for SIGCHLD. +signal.signal (signal.SIGCHLD, signal_handler) +#signal.signal (signal.SIGCHLD, signal.SIG_IGN) + + +# Create a child process for us to kill. +pid = os.fork() +if pid == 0: + time.sleep(10000) + + +print 'Sending SIGKILL to child pid:', pid +os.kill (pid, signal.SIGKILL) + + +# SIGCHLD should interrupt sleep. +# Note that this is a race. +# It is possible that the signal handler will get called +# before we try to sleep, but this has not happened yet. +# But in that case we can only tell by order of printed output. +try: + time.sleep(10) +except: + print 'sleep was interrupted by signal.' + + +print '''The signal handler should have been called either before +or durring the sleep. If the signal handler is called after or not at all +then something went wrong.''' + + +# Just for fun let's see if the process is alive. +try: + os.kill(pid, 0) + print 'Child is alive. This is ambiguous because it may be a Zombie.' +except OSError, e: + print 'Child appears to be dead.' diff --git a/pexpect/websync b/pexpect/websync new file mode 100755 index 0000000..d1f3ee8 --- /dev/null +++ b/pexpect/websync @@ -0,0 +1,4 @@ +#!/bin/sh +ssh noah@use-pr-shell1.sourceforge.net "cd htdocs;rm index.html;wget http://www.noah.org/python/pexpect/index.html" +scp doc.tgz noah@use-pr-shell1.sourceforge.net:htdocs/doc.tgz +ssh noah@use-pr-shell1.sourceforge.net "cd htdocs;tar zxvf doc.tgz" |