diff options
author | Thomas Kluyver <takowl@gmail.com> | 2013-09-19 17:04:23 -0700 |
---|---|---|
committer | Thomas Kluyver <takowl@gmail.com> | 2013-09-19 17:04:23 -0700 |
commit | 9ee22124477a882836330264be855e1ee138c035 (patch) | |
tree | 9d45e9ec54639b2526267232c6d712db7e9e57d2 | |
parent | dbff978b56a4c94c4361f083c40df14a7db7d6d4 (diff) | |
download | pexpect-git-9ee22124477a882836330264be855e1ee138c035.tar.gz |
Move psh and pxssh into pexpect
-rw-r--r-- | pexpect/psh.py | 185 | ||||
-rw-r--r-- | pexpect/pxssh.py | 347 | ||||
-rw-r--r-- | psh.py | 188 | ||||
-rw-r--r-- | pxssh.py | 350 |
4 files changed, 542 insertions, 528 deletions
diff --git a/pexpect/psh.py b/pexpect/psh.py new file mode 100644 index 0000000..467f546 --- /dev/null +++ b/pexpect/psh.py @@ -0,0 +1,185 @@ +'''This is a utility class to make shell scripting easier in Python. +It combines Pexpect and wraps many Standard Python Library functions +to make them look more shell-like. + +PEXPECT LICENSE + + This license is approved by the OSI and FSF as GPL-compatible. + http://opensource.org/licenses/isc-license.txt + + Copyright (c) 2012, Noah Spurrier <noah@noah.org> + PERMISSION TO USE, COPY, MODIFY, AND/OR DISTRIBUTE THIS SOFTWARE FOR ANY + PURPOSE WITH OR WITHOUT FEE IS HEREBY GRANTED, PROVIDED THAT THE ABOVE + COPYRIGHT NOTICE AND THIS PERMISSION NOTICE APPEAR IN ALL COPIES. + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + +''' + +import pexpect, os, sys, re +from types import * + +class ExceptionPsh(pexpect.ExceptionPexpect): + + '''Raised for Psh exceptions. + ''' + +class ExceptionErrorCode(ExceptionPsh): + + '''Raised when an program returns an error code. + ''' + + def __init__(self, string, err_code, cmd_output): + + ExceptionPsh.__init__(self,string) + self.error = err_code + self.output = cmd_output + +class psh (object): + + def __init__ (self,exp): + + self.exp = exp + self.default_timeout = 30 # Seconds + + def ls (self, path=''): + + fileStr = self.run("ls %s" % path) + return fileStr.split() + + def cd (self, path='-'): + + return self.run("cd %s" % path) + + def rm (self, path=''): + + return self.run("/bin/rm -f %s" % path) + + def cp (self, path_from='', path_to=''): + + return self.run("/bin/cp %s %s" % (path_from, path_to)) + + def mv (self, path_from='', path_to=''): + + return self.run("/bin/mv %s %s" % (path_from, path_to)) + + def pwd (self): + + return self.run("/bin/pwd") + + def which (self, exe_name): + + return self.run("/usr/bin/which %s" % exe_name) + + def chown (self, path, user='', group=None, recurse=False): + + xtra_flags = "" + if recurse: xtra_flags = "-R" + if group: group = ':' + group + else: group = "" + + return self.run("/bin/chmod %s %s%s %s" % (recurse,user,group,path)) + + def chmod (self, path, perms='', recurse=False): + + xtra_flags = "" + if recurse: xtra_flags = "-R" + return self.run("/usr/bin/chmod %s %s %s" % (xtra_flags, perms, path)) + + def chattr (self, path, attrs='', recurse=False): + + xtra_flags = "" + if recurse: xtra_flags = "-R" + return self.run("/usr/bin/chattr %s %s %s" % (xtra_flags, attrs, path)) + + def cat (self, path): + + return self.run("/bin/cat %s" % path) + + def run (self, cmd, stim_resp_dict = {}, timeout=None): + + (ret, output) = self.run_raw(cmd, stim_resp_dict, timeout) + if ret == 0: return output + raise ExceptionErrorCode("Running command [%s] returned error [%d]" + % (cmd,ret), ret, output) + + def run_raw(self, cmd, stim_resp_dict=None, timeout=None): + + '''Someone contributed this, but now I've lost touch and I forget the + motive of this. It was sort of a sketch at the time which doesn't make + this any easier to prioritize, but it seemed important at the time. ''' + + if not timeout: timeout = self.default_timeout + + def cmd_exp_loop(param): + if type(param) is DictType: + param = (param,) + for item in param: + if type(item) is type(()) or type(item) is type([]): + cmd_exp_loop(item) + elif type(item) is type(""): + self.exp.send(item) + elif type(item) is type({}): + dict = item + while(1): + stimulus = dict.keys() + idx = self.exp.expect_exact(stimulus, timeout) + keys = dict.keys() + respond = dict[keys[idx]] + if ( type(respond) is type({}) + or type(respond) is type(()) + or type(item) is type([]) ): + cmd_exp_loop(respond) + if type(respond) is type(""): + self.exp.send(respond) + elif ( type(respond) is InstanceType + and Exception + in inspect.getmro(respond.__class__) ): + raise respond + elif type(respond) is type(0): + return (respond, self.exp.before) + elif respond is None: + break + else: + self.exp.send(str(respond)) + + if stim_resp_dict == None: + stim_resp_dict = {} + + self.exp.sendline("") + if not self.exp.prompt(): raise SessionException("No prompt") + self.exp.sendline(cmd) + self.exp.expect_exact([cmd]) + stim_resp_dict[re.compile(self.exp.PROMPT)] = None + cmd_exp_loop(stim_resp_dict) + + output = self.exp.before + # Get the return code + self.exp.sendline("echo $?") + self.exp.expect_exact(["echo $?"]) + if not self.exp.prompt(): + raise SessionException("No prompt", 0, self.exp.before) + try: + reg = re.compile("^(\d+)") + s = self.exp.before.strip() + #print s + #pdb.set_trace() + s = reg.search(s).groups()[0] + error_code = int(s) + except ValueError: + log.error("Cannot parse %s into an int!" % self.exp.before) + raise + + if not output[0:2] == '\r\n': + log.warning("Returned output lacks leading \\r\\n which may indicate a tae error") + log.debug2("Offending output string: [%s]" % output) + return (error_code, output) + else: + return(error_code, output[2:]) + +# def pipe (self, cmd, string_to_send): diff --git a/pexpect/pxssh.py b/pexpect/pxssh.py new file mode 100644 index 0000000..5566029 --- /dev/null +++ b/pexpect/pxssh.py @@ -0,0 +1,347 @@ +'''This class extends pexpect.spawn to specialize setting up SSH connections. +This adds methods for login, logout, and expecting the shell prompt. + +PEXPECT LICENSE + + This license is approved by the OSI and FSF as GPL-compatible. + http://opensource.org/licenses/isc-license.txt + + Copyright (c) 2012, Noah Spurrier <noah@noah.org> + PERMISSION TO USE, COPY, MODIFY, AND/OR DISTRIBUTE THIS SOFTWARE FOR ANY + PURPOSE WITH OR WITHOUT FEE IS HEREBY GRANTED, PROVIDED THAT THE ABOVE + COPYRIGHT NOTICE AND THIS PERMISSION NOTICE APPEAR IN ALL COPIES. + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + +''' + +from pexpect import * +import pexpect +import time +import os + +__all__ = ['ExceptionPxssh', 'pxssh'] + +# Exception classes used by this module. +class ExceptionPxssh(ExceptionPexpect): + '''Raised for pxssh exceptions. + ''' + +class pxssh (spawn): + + '''This class extends pexpect.spawn to specialize setting up SSH + connections. This adds methods for login, logout, and expecting the shell + prompt. It does various tricky things to handle many situations in the SSH + login process. For example, if the session is your first login, then pxssh + automatically accepts the remote certificate; or if you have public key + authentication setup then pxssh won't wait for the password prompt. + + pxssh uses the shell prompt to synchronize output from the remote host. In + order to make this more robust it sets the shell prompt to something more + unique than just $ or #. This should work on most Borne/Bash or Csh style + shells. + + Example that runs a few commands on a remote server and prints the result:: + + import pxssh + import getpass + try: + s = pxssh.pxssh() + hostname = raw_input('hostname: ') + username = raw_input('username: ') + password = getpass.getpass('password: ') + s.login (hostname, username, password) + s.sendline ('uptime') # run a command + s.prompt() # match the prompt + print s.before # print everything before the prompt. + s.sendline ('ls -l') + s.prompt() + print s.before + s.sendline ('df') + s.prompt() + print s.before + s.logout() + except pxssh.ExceptionPxssh, e: + print "pxssh failed on login." + print str(e) + + Note that if you have ssh-agent running while doing development with pxssh + then this can lead to a lot of confusion. Many X display managers (xdm, + gdm, kdm, etc.) will automatically start a GUI agent. You may see a GUI + dialog box popup asking for a password during development. You should turn + off any key agents during testing. The 'force_password' attribute will turn + off public key authentication. This will only work if the remote SSH server + is configured to allow password logins. Example of using 'force_password' + attribute:: + + s = pxssh.pxssh() + s.force_password = True + hostname = raw_input('hostname: ') + username = raw_input('username: ') + password = getpass.getpass('password: ') + s.login (hostname, username, password) + ''' + + def __init__ (self, timeout=30, maxread=2000, searchwindowsize=None, logfile=None, cwd=None, env=None): + + spawn.__init__(self, None, timeout=timeout, maxread=maxread, searchwindowsize=searchwindowsize, logfile=logfile, cwd=cwd, env=env) + + self.name = '<pxssh>' + + #SUBTLE HACK ALERT! Note that the command that SETS the prompt uses a + #slightly different string than the regular expression to match it. This + #is because when you set the prompt the command will echo back, but we + #don't want to match the echoed command. So if we make the set command + #slightly different than the regex we eliminate the problem. To make the + #set command different we add a backslash in front of $. The $ doesn't + #need to be escaped, but it doesn't hurt and serves to make the set + #prompt command different than the regex. + + # used to match the command-line prompt + self.UNIQUE_PROMPT = "\[PEXPECT\][\$\#] " + self.PROMPT = self.UNIQUE_PROMPT + + # used to set shell command-line prompt to UNIQUE_PROMPT. + self.PROMPT_SET_SH = "PS1='[PEXPECT]\$ '" + self.PROMPT_SET_CSH = "set prompt='[PEXPECT]\$ '" + self.SSH_OPTS = ("-o'RSAAuthentication=no'" + + " -o 'PubkeyAuthentication=no'") +# Disabling host key checking, makes you vulnerable to MITM attacks. +# + " -o 'StrictHostKeyChecking=no'" +# + " -o 'UserKnownHostsFile /dev/null' ") + # Disabling X11 forwarding gets rid of the annoying SSH_ASKPASS from + # displaying a GUI password dialog. I have not figured out how to + # disable only SSH_ASKPASS without also disabling X11 forwarding. + # Unsetting SSH_ASKPASS on the remote side doesn't disable it! Annoying! + #self.SSH_OPTS = "-x -o'RSAAuthentication=no' -o 'PubkeyAuthentication=no'" + self.force_password = False + self.auto_prompt_reset = True + + def levenshtein_distance(self, a,b): + + '''This calculates the Levenshtein distance between a and b. + ''' + + n, m = len(a), len(b) + if n > m: + a,b = b,a + n,m = m,n + current = range(n+1) + for i in range(1,m+1): + previous, current = current, [i]+[0]*n + for j in range(1,n+1): + add, delete = previous[j]+1, current[j-1]+1 + change = previous[j-1] + if a[j-1] != b[i-1]: + change = change + 1 + current[j] = min(add, delete, change) + return current[n] + + def sync_original_prompt (self): + + '''This attempts to find the prompt. Basically, press enter and record + the response; press enter again and record the response; if the two + responses are similar then assume we are at the original prompt. This + is a slow function. It can take over 10 seconds. ''' + + # All of these timing pace values are magic. + # I came up with these based on what seemed reliable for + # connecting to a heavily loaded machine I have. + self.sendline() + time.sleep(0.1) + # If latency is worse than these values then this will fail. + + try: + # Clear the buffer before getting the prompt. + self.read_nonblocking(size=10000,timeout=1) + except TIMEOUT: + pass + time.sleep(0.1) + self.sendline() + time.sleep(0.5) + x = self.read_nonblocking(size=1000,timeout=1) + time.sleep(0.1) + self.sendline() + time.sleep(0.5) + a = self.read_nonblocking(size=1000,timeout=1) + time.sleep(0.1) + self.sendline() + time.sleep(0.5) + b = self.read_nonblocking(size=1000,timeout=1) + ld = self.levenshtein_distance(a,b) + len_a = len(a) + if len_a == 0: + return False + if float(ld)/len_a < 0.4: + return True + return False + + ### TODO: This is getting messy and I'm pretty sure this isn't perfect. + ### TODO: I need to draw a flow chart for this. + def login (self,server,username,password='',terminal_type='ansi',original_prompt=r"[#$]",login_timeout=10,port=None,auto_prompt_reset=True,ssh_key=None): + + '''This logs the user into the given server. It uses the + 'original_prompt' to try to find the prompt right after login. When it + finds the prompt it immediately tries to reset the prompt to something + more easily matched. The default 'original_prompt' is very optimistic + and is easily fooled. It's more reliable to try to match the original + prompt as exactly as possible to prevent false matches by server + strings such as the "Message Of The Day". On many systems you can + disable the MOTD on the remote server by creating a zero-length file + called "~/.hushlogin" on the remote server. If a prompt cannot be found + then this will not necessarily cause the login to fail. In the case of + a timeout when looking for the prompt we assume that the original + prompt was so weird that we could not match it, so we use a few tricks + to guess when we have reached the prompt. Then we hope for the best and + blindly try to reset the prompt to something more unique. If that fails + then login() raises an ExceptionPxssh exception. + + In some situations it is not possible or desirable to reset the + original prompt. In this case, set 'auto_prompt_reset' to False to + inhibit setting the prompt to the UNIQUE_PROMPT. Remember that pxssh + uses a unique prompt in the prompt() method. If the original prompt is + not reset then this will disable the prompt() method unless you + manually set the PROMPT attribute. ''' + + ssh_options = '-q' + if self.force_password: + ssh_options = ssh_options + ' ' + self.SSH_OPTS + if port is not None: + ssh_options = ssh_options + ' -p %s'%(str(port)) + if ssh_key is not None: + try: + os.path.isfile(ssh_key) + except: + raise ExceptionPxssh ('private ssh key does not exist') + ssh_options = ssh_options + ' -i %s' % (ssh_key) + cmd = "ssh %s -l %s %s" % (ssh_options, username, server) + + # This does not distinguish between a remote server 'password' prompt + # and a local ssh 'passphrase' prompt (for unlocking a private key). + spawn._spawn(self, cmd) + i = self.expect(["(?i)are you sure you want to continue connecting", original_prompt, "(?i)(?:password)|(?:passphrase for key)", "(?i)permission denied", "(?i)terminal type", TIMEOUT, "(?i)connection closed by remote host"], timeout=login_timeout) + + # First phase + if i==0: + # New certificate -- always accept it. + # This is what you get if SSH does not have the remote host's + # public key stored in the 'known_hosts' cache. + self.sendline("yes") + i = self.expect(["(?i)are you sure you want to continue connecting", original_prompt, "(?i)(?:password)|(?:passphrase for key)", "(?i)permission denied", "(?i)terminal type", TIMEOUT]) + if i==2: # password or passphrase + self.sendline(password) + i = self.expect(["(?i)are you sure you want to continue connecting", original_prompt, "(?i)(?:password)|(?:passphrase for key)", "(?i)permission denied", "(?i)terminal type", TIMEOUT]) + if i==4: + self.sendline(terminal_type) + i = self.expect(["(?i)are you sure you want to continue connecting", original_prompt, "(?i)(?:password)|(?:passphrase for key)", "(?i)permission denied", "(?i)terminal type", TIMEOUT]) + + # Second phase + if i==0: + # This is weird. This should not happen twice in a row. + self.close() + raise ExceptionPxssh ('Weird error. Got "are you sure" prompt twice.') + elif i==1: # can occur if you have a public key pair set to authenticate. + ### TODO: May NOT be OK if expect() got tricked and matched a false prompt. + pass + elif i==2: # password prompt again + # For incorrect passwords, some ssh servers will + # ask for the password again, others return 'denied' right away. + # If we get the password prompt again then this means + # we didn't get the password right the first time. + self.close() + raise ExceptionPxssh ('password refused') + elif i==3: # permission denied -- password was bad. + self.close() + raise ExceptionPxssh ('permission denied') + elif i==4: # terminal type again? WTF? + self.close() + raise ExceptionPxssh ('Weird error. Got "terminal type" prompt twice.') + elif i==5: # Timeout + #This is tricky... I presume that we are at the command-line prompt. + #It may be that the shell prompt was so weird that we couldn't match + #it. Or it may be that we couldn't log in for some other reason. I + #can't be sure, but it's safe to guess that we did login because if + #I presume wrong and we are not logged in then this should be caught + #later when I try to set the shell prompt. + pass + elif i==6: # Connection closed by remote host + self.close() + raise ExceptionPxssh ('connection closed') + else: # Unexpected + self.close() + raise ExceptionPxssh ('unexpected login response') + if not self.sync_original_prompt(): + self.close() + raise ExceptionPxssh ('could not synchronize with original prompt') + # We appear to be in. + # set shell prompt to something unique. + if auto_prompt_reset: + if not self.set_unique_prompt(): + self.close() + raise ExceptionPxssh ('could not set shell prompt\n'+self.before) + return True + + def logout (self): + + '''This sends exit to the remote shell. If there are stopped jobs then + this automatically sends exit twice. ''' + + self.sendline("exit") + index = self.expect([EOF, "(?i)there are stopped jobs"]) + if index==1: + self.sendline("exit") + self.expect(EOF) + self.close() + + def prompt (self, timeout=-1): + + '''This matches the shell prompt. This is little more than a short-cut + to the expect() method. This returns True if the shell prompt was + matched. This returns False if a timeout was raised. Note that if you + called login() with auto_prompt_reset set to False then before calling + prompt() you must set the PROMPT attribute to a regex that prompt() + will use for matching the prompt. Calling prompt() will erase the + contents of the 'before' attribute even if no prompt is ever matched. + If timeout is not given or it is set to -1 then self.timeout is used. + ''' + + if timeout == -1: + timeout = self.timeout + i = self.expect([self.PROMPT, TIMEOUT], timeout=timeout) + if i==1: + return False + return True + + def set_unique_prompt (self): + + '''This sets the remote prompt to something more unique than # or $. + This makes it easier for the prompt() method to match the shell prompt + unambiguously. This method is called automatically by the login() + method, but you may want to call it manually if you somehow reset the + shell prompt. For example, if you 'su' to a different user then you + will need to manually reset the prompt. This sends shell commands to + the remote host to set the prompt, so this assumes the remote host is + ready to receive commands. + + Alternatively, you may use your own prompt pattern. Just set the PROMPT + attribute to a regular expression that matches it. In this case you + should call login() with auto_prompt_reset=False; then set the PROMPT + attribute. After that the prompt() method will try to match your prompt + pattern.''' + + self.sendline ("unset PROMPT_COMMAND") + self.sendline (self.PROMPT_SET_SH) # sh-style + i = self.expect ([TIMEOUT, self.PROMPT], timeout=10) + if i == 0: # csh-style + self.sendline (self.PROMPT_SET_CSH) + i = self.expect ([TIMEOUT, self.PROMPT], timeout=10) + if i == 0: + return False + return True + +# vi:ts=4:sw=4:expandtab:ft=python: @@ -1,185 +1,7 @@ -'''This is a utility class to make shell scripting easier in Python. -It combines Pexpect and wraps many Standard Python Library functions -to make them look more shell-like. +import warnings -PEXPECT LICENSE +warnings.warn("This module has been moved to pexpect.psh, please update imports.", + ImportWarning) +del warnings - This license is approved by the OSI and FSF as GPL-compatible. - http://opensource.org/licenses/isc-license.txt - - Copyright (c) 2012, Noah Spurrier <noah@noah.org> - PERMISSION TO USE, COPY, MODIFY, AND/OR DISTRIBUTE THIS SOFTWARE FOR ANY - PURPOSE WITH OR WITHOUT FEE IS HEREBY GRANTED, PROVIDED THAT THE ABOVE - COPYRIGHT NOTICE AND THIS PERMISSION NOTICE APPEAR IN ALL COPIES. - THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES - WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF - MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR - ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES - WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN - ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF - OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. - -''' - -import pexpect, os, sys, re -from types import * - -class ExceptionPsh(pexpect.ExceptionPexpect): - - '''Raised for Psh exceptions. - ''' - -class ExceptionErrorCode(ExceptionPsh): - - '''Raised when an program returns an error code. - ''' - - def __init__(self, string, err_code, cmd_output): - - ExceptionPsh.__init__(self,string) - self.error = err_code - self.output = cmd_output - -class psh (object): - - def __init__ (self,exp): - - self.exp = exp - self.default_timeout = 30 # Seconds - - def ls (self, path=''): - - fileStr = self.run("ls %s" % path) - return fileStr.split() - - def cd (self, path='-'): - - return self.run("cd %s" % path) - - def rm (self, path=''): - - return self.run("/bin/rm -f %s" % path) - - def cp (self, path_from='', path_to=''): - - return self.run("/bin/cp %s %s" % (path_from, path_to)) - - def mv (self, path_from='', path_to=''): - - return self.run("/bin/mv %s %s" % (path_from, path_to)) - - def pwd (self): - - return self.run("/bin/pwd") - - def which (self, exe_name): - - return self.run("/usr/bin/which %s" % exe_name) - - def chown (self, path, user='', group=None, recurse=False): - - xtra_flags = "" - if recurse: xtra_flags = "-R" - if group: group = ':' + group - else: group = "" - - return self.run("/bin/chmod %s %s%s %s" % (recurse,user,group,path)) - - def chmod (self, path, perms='', recurse=False): - - xtra_flags = "" - if recurse: xtra_flags = "-R" - return self.run("/usr/bin/chmod %s %s %s" % (xtra_flags, perms, path)) - - def chattr (self, path, attrs='', recurse=False): - - xtra_flags = "" - if recurse: xtra_flags = "-R" - return self.run("/usr/bin/chattr %s %s %s" % (xtra_flags, attrs, path)) - - def cat (self, path): - - return self.run("/bin/cat %s" % path) - - def run (self, cmd, stim_resp_dict = {}, timeout=None): - - (ret, output) = self.run_raw(cmd, stim_resp_dict, timeout) - if ret == 0: return output - raise ExceptionErrorCode("Running command [%s] returned error [%d]" - % (cmd,ret), ret, output) - - def run_raw(self, cmd, stim_resp_dict=None, timeout=None): - - '''Someone contributed this, but now I've lost touch and I forget the - motive of this. It was sort of a sketch at the time which doesn't make - this any easier to prioritize, but it seemed important at the time. ''' - - if not timeout: timeout = self.default_timeout - - def cmd_exp_loop(param): - if type(param) is DictType: - param = (param,) - for item in param: - if type(item) is type(()) or type(item) is type([]): - cmd_exp_loop(item) - elif type(item) is type(""): - self.exp.send(item) - elif type(item) is type({}): - dict = item - while(1): - stimulus = dict.keys() - idx = self.exp.expect_exact(stimulus, timeout) - keys = dict.keys() - respond = dict[keys[idx]] - if ( type(respond) is type({}) - or type(respond) is type(()) - or type(item) is type([]) ): - cmd_exp_loop(respond) - if type(respond) is type(""): - self.exp.send(respond) - elif ( type(respond) is InstanceType - and Exception - in inspect.getmro(respond.__class__) ): - raise respond - elif type(respond) is type(0): - return (respond, self.exp.before) - elif respond is None: - break - else: - self.exp.send(str(respond)) - - if stim_resp_dict == None: - stim_resp_dict = {} - - self.exp.sendline("") - if not self.exp.prompt(): raise SessionException("No prompt") - self.exp.sendline(cmd) - self.exp.expect_exact([cmd]) - stim_resp_dict[re.compile(self.exp.PROMPT)] = None - cmd_exp_loop(stim_resp_dict) - - output = self.exp.before - # Get the return code - self.exp.sendline("echo $?") - self.exp.expect_exact(["echo $?"]) - if not self.exp.prompt(): - raise SessionException("No prompt", 0, self.exp.before) - try: - reg = re.compile("^(\d+)") - s = self.exp.before.strip() - #print s - #pdb.set_trace() - s = reg.search(s).groups()[0] - error_code = int(s) - except ValueError: - log.error("Cannot parse %s into an int!" % self.exp.before) - raise - - if not output[0:2] == '\r\n': - log.warning("Returned output lacks leading \\r\\n which may indicate a tae error") - log.debug2("Offending output string: [%s]" % output) - return (error_code, output) - else: - return(error_code, output[2:]) - -# def pipe (self, cmd, string_to_send): +from pexpect.psh import * # analysis:ignore
\ No newline at end of file @@ -1,347 +1,7 @@ -'''This class extends pexpect.spawn to specialize setting up SSH connections. -This adds methods for login, logout, and expecting the shell prompt. +import warnings -PEXPECT LICENSE +warnings.warn("This module has been moved to pexpect.pxssh, please update imports.", + ImportWarning) +del warnings - This license is approved by the OSI and FSF as GPL-compatible. - http://opensource.org/licenses/isc-license.txt - - Copyright (c) 2012, Noah Spurrier <noah@noah.org> - PERMISSION TO USE, COPY, MODIFY, AND/OR DISTRIBUTE THIS SOFTWARE FOR ANY - PURPOSE WITH OR WITHOUT FEE IS HEREBY GRANTED, PROVIDED THAT THE ABOVE - COPYRIGHT NOTICE AND THIS PERMISSION NOTICE APPEAR IN ALL COPIES. - THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES - WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF - MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR - ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES - WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN - ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF - OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. - -''' - -from pexpect import * -import pexpect -import time -import os - -__all__ = ['ExceptionPxssh', 'pxssh'] - -# Exception classes used by this module. -class ExceptionPxssh(ExceptionPexpect): - '''Raised for pxssh exceptions. - ''' - -class pxssh (spawn): - - '''This class extends pexpect.spawn to specialize setting up SSH - connections. This adds methods for login, logout, and expecting the shell - prompt. It does various tricky things to handle many situations in the SSH - login process. For example, if the session is your first login, then pxssh - automatically accepts the remote certificate; or if you have public key - authentication setup then pxssh won't wait for the password prompt. - - pxssh uses the shell prompt to synchronize output from the remote host. In - order to make this more robust it sets the shell prompt to something more - unique than just $ or #. This should work on most Borne/Bash or Csh style - shells. - - Example that runs a few commands on a remote server and prints the result:: - - import pxssh - import getpass - try: - s = pxssh.pxssh() - hostname = raw_input('hostname: ') - username = raw_input('username: ') - password = getpass.getpass('password: ') - s.login (hostname, username, password) - s.sendline ('uptime') # run a command - s.prompt() # match the prompt - print s.before # print everything before the prompt. - s.sendline ('ls -l') - s.prompt() - print s.before - s.sendline ('df') - s.prompt() - print s.before - s.logout() - except pxssh.ExceptionPxssh, e: - print "pxssh failed on login." - print str(e) - - Note that if you have ssh-agent running while doing development with pxssh - then this can lead to a lot of confusion. Many X display managers (xdm, - gdm, kdm, etc.) will automatically start a GUI agent. You may see a GUI - dialog box popup asking for a password during development. You should turn - off any key agents during testing. The 'force_password' attribute will turn - off public key authentication. This will only work if the remote SSH server - is configured to allow password logins. Example of using 'force_password' - attribute:: - - s = pxssh.pxssh() - s.force_password = True - hostname = raw_input('hostname: ') - username = raw_input('username: ') - password = getpass.getpass('password: ') - s.login (hostname, username, password) - ''' - - def __init__ (self, timeout=30, maxread=2000, searchwindowsize=None, logfile=None, cwd=None, env=None): - - spawn.__init__(self, None, timeout=timeout, maxread=maxread, searchwindowsize=searchwindowsize, logfile=logfile, cwd=cwd, env=env) - - self.name = '<pxssh>' - - #SUBTLE HACK ALERT! Note that the command that SETS the prompt uses a - #slightly different string than the regular expression to match it. This - #is because when you set the prompt the command will echo back, but we - #don't want to match the echoed command. So if we make the set command - #slightly different than the regex we eliminate the problem. To make the - #set command different we add a backslash in front of $. The $ doesn't - #need to be escaped, but it doesn't hurt and serves to make the set - #prompt command different than the regex. - - # used to match the command-line prompt - self.UNIQUE_PROMPT = "\[PEXPECT\][\$\#] " - self.PROMPT = self.UNIQUE_PROMPT - - # used to set shell command-line prompt to UNIQUE_PROMPT. - self.PROMPT_SET_SH = "PS1='[PEXPECT]\$ '" - self.PROMPT_SET_CSH = "set prompt='[PEXPECT]\$ '" - self.SSH_OPTS = ("-o'RSAAuthentication=no'" - + " -o 'PubkeyAuthentication=no'") -# Disabling host key checking, makes you vulnerable to MITM attacks. -# + " -o 'StrictHostKeyChecking=no'" -# + " -o 'UserKnownHostsFile /dev/null' ") - # Disabling X11 forwarding gets rid of the annoying SSH_ASKPASS from - # displaying a GUI password dialog. I have not figured out how to - # disable only SSH_ASKPASS without also disabling X11 forwarding. - # Unsetting SSH_ASKPASS on the remote side doesn't disable it! Annoying! - #self.SSH_OPTS = "-x -o'RSAAuthentication=no' -o 'PubkeyAuthentication=no'" - self.force_password = False - self.auto_prompt_reset = True - - def levenshtein_distance(self, a,b): - - '''This calculates the Levenshtein distance between a and b. - ''' - - n, m = len(a), len(b) - if n > m: - a,b = b,a - n,m = m,n - current = range(n+1) - for i in range(1,m+1): - previous, current = current, [i]+[0]*n - for j in range(1,n+1): - add, delete = previous[j]+1, current[j-1]+1 - change = previous[j-1] - if a[j-1] != b[i-1]: - change = change + 1 - current[j] = min(add, delete, change) - return current[n] - - def sync_original_prompt (self): - - '''This attempts to find the prompt. Basically, press enter and record - the response; press enter again and record the response; if the two - responses are similar then assume we are at the original prompt. This - is a slow function. It can take over 10 seconds. ''' - - # All of these timing pace values are magic. - # I came up with these based on what seemed reliable for - # connecting to a heavily loaded machine I have. - self.sendline() - time.sleep(0.1) - # If latency is worse than these values then this will fail. - - try: - # Clear the buffer before getting the prompt. - self.read_nonblocking(size=10000,timeout=1) - except TIMEOUT: - pass - time.sleep(0.1) - self.sendline() - time.sleep(0.5) - x = self.read_nonblocking(size=1000,timeout=1) - time.sleep(0.1) - self.sendline() - time.sleep(0.5) - a = self.read_nonblocking(size=1000,timeout=1) - time.sleep(0.1) - self.sendline() - time.sleep(0.5) - b = self.read_nonblocking(size=1000,timeout=1) - ld = self.levenshtein_distance(a,b) - len_a = len(a) - if len_a == 0: - return False - if float(ld)/len_a < 0.4: - return True - return False - - ### TODO: This is getting messy and I'm pretty sure this isn't perfect. - ### TODO: I need to draw a flow chart for this. - def login (self,server,username,password='',terminal_type='ansi',original_prompt=r"[#$]",login_timeout=10,port=None,auto_prompt_reset=True,ssh_key=None): - - '''This logs the user into the given server. It uses the - 'original_prompt' to try to find the prompt right after login. When it - finds the prompt it immediately tries to reset the prompt to something - more easily matched. The default 'original_prompt' is very optimistic - and is easily fooled. It's more reliable to try to match the original - prompt as exactly as possible to prevent false matches by server - strings such as the "Message Of The Day". On many systems you can - disable the MOTD on the remote server by creating a zero-length file - called "~/.hushlogin" on the remote server. If a prompt cannot be found - then this will not necessarily cause the login to fail. In the case of - a timeout when looking for the prompt we assume that the original - prompt was so weird that we could not match it, so we use a few tricks - to guess when we have reached the prompt. Then we hope for the best and - blindly try to reset the prompt to something more unique. If that fails - then login() raises an ExceptionPxssh exception. - - In some situations it is not possible or desirable to reset the - original prompt. In this case, set 'auto_prompt_reset' to False to - inhibit setting the prompt to the UNIQUE_PROMPT. Remember that pxssh - uses a unique prompt in the prompt() method. If the original prompt is - not reset then this will disable the prompt() method unless you - manually set the PROMPT attribute. ''' - - ssh_options = '-q' - if self.force_password: - ssh_options = ssh_options + ' ' + self.SSH_OPTS - if port is not None: - ssh_options = ssh_options + ' -p %s'%(str(port)) - if ssh_key is not None: - try: - os.path.isfile(ssh_key) - except: - raise ExceptionPxssh ('private ssh key does not exist') - ssh_options = ssh_options + ' -i %s' % (ssh_key) - cmd = "ssh %s -l %s %s" % (ssh_options, username, server) - - # This does not distinguish between a remote server 'password' prompt - # and a local ssh 'passphrase' prompt (for unlocking a private key). - spawn._spawn(self, cmd) - i = self.expect(["(?i)are you sure you want to continue connecting", original_prompt, "(?i)(?:password)|(?:passphrase for key)", "(?i)permission denied", "(?i)terminal type", TIMEOUT, "(?i)connection closed by remote host"], timeout=login_timeout) - - # First phase - if i==0: - # New certificate -- always accept it. - # This is what you get if SSH does not have the remote host's - # public key stored in the 'known_hosts' cache. - self.sendline("yes") - i = self.expect(["(?i)are you sure you want to continue connecting", original_prompt, "(?i)(?:password)|(?:passphrase for key)", "(?i)permission denied", "(?i)terminal type", TIMEOUT]) - if i==2: # password or passphrase - self.sendline(password) - i = self.expect(["(?i)are you sure you want to continue connecting", original_prompt, "(?i)(?:password)|(?:passphrase for key)", "(?i)permission denied", "(?i)terminal type", TIMEOUT]) - if i==4: - self.sendline(terminal_type) - i = self.expect(["(?i)are you sure you want to continue connecting", original_prompt, "(?i)(?:password)|(?:passphrase for key)", "(?i)permission denied", "(?i)terminal type", TIMEOUT]) - - # Second phase - if i==0: - # This is weird. This should not happen twice in a row. - self.close() - raise ExceptionPxssh ('Weird error. Got "are you sure" prompt twice.') - elif i==1: # can occur if you have a public key pair set to authenticate. - ### TODO: May NOT be OK if expect() got tricked and matched a false prompt. - pass - elif i==2: # password prompt again - # For incorrect passwords, some ssh servers will - # ask for the password again, others return 'denied' right away. - # If we get the password prompt again then this means - # we didn't get the password right the first time. - self.close() - raise ExceptionPxssh ('password refused') - elif i==3: # permission denied -- password was bad. - self.close() - raise ExceptionPxssh ('permission denied') - elif i==4: # terminal type again? WTF? - self.close() - raise ExceptionPxssh ('Weird error. Got "terminal type" prompt twice.') - elif i==5: # Timeout - #This is tricky... I presume that we are at the command-line prompt. - #It may be that the shell prompt was so weird that we couldn't match - #it. Or it may be that we couldn't log in for some other reason. I - #can't be sure, but it's safe to guess that we did login because if - #I presume wrong and we are not logged in then this should be caught - #later when I try to set the shell prompt. - pass - elif i==6: # Connection closed by remote host - self.close() - raise ExceptionPxssh ('connection closed') - else: # Unexpected - self.close() - raise ExceptionPxssh ('unexpected login response') - if not self.sync_original_prompt(): - self.close() - raise ExceptionPxssh ('could not synchronize with original prompt') - # We appear to be in. - # set shell prompt to something unique. - if auto_prompt_reset: - if not self.set_unique_prompt(): - self.close() - raise ExceptionPxssh ('could not set shell prompt\n'+self.before) - return True - - def logout (self): - - '''This sends exit to the remote shell. If there are stopped jobs then - this automatically sends exit twice. ''' - - self.sendline("exit") - index = self.expect([EOF, "(?i)there are stopped jobs"]) - if index==1: - self.sendline("exit") - self.expect(EOF) - self.close() - - def prompt (self, timeout=-1): - - '''This matches the shell prompt. This is little more than a short-cut - to the expect() method. This returns True if the shell prompt was - matched. This returns False if a timeout was raised. Note that if you - called login() with auto_prompt_reset set to False then before calling - prompt() you must set the PROMPT attribute to a regex that prompt() - will use for matching the prompt. Calling prompt() will erase the - contents of the 'before' attribute even if no prompt is ever matched. - If timeout is not given or it is set to -1 then self.timeout is used. - ''' - - if timeout == -1: - timeout = self.timeout - i = self.expect([self.PROMPT, TIMEOUT], timeout=timeout) - if i==1: - return False - return True - - def set_unique_prompt (self): - - '''This sets the remote prompt to something more unique than # or $. - This makes it easier for the prompt() method to match the shell prompt - unambiguously. This method is called automatically by the login() - method, but you may want to call it manually if you somehow reset the - shell prompt. For example, if you 'su' to a different user then you - will need to manually reset the prompt. This sends shell commands to - the remote host to set the prompt, so this assumes the remote host is - ready to receive commands. - - Alternatively, you may use your own prompt pattern. Just set the PROMPT - attribute to a regular expression that matches it. In this case you - should call login() with auto_prompt_reset=False; then set the PROMPT - attribute. After that the prompt() method will try to match your prompt - pattern.''' - - self.sendline ("unset PROMPT_COMMAND") - self.sendline (self.PROMPT_SET_SH) # sh-style - i = self.expect ([TIMEOUT, self.PROMPT], timeout=10) - if i == 0: # csh-style - self.sendline (self.PROMPT_SET_CSH) - i = self.expect ([TIMEOUT, self.PROMPT], timeout=10) - if i == 0: - return False - return True - -# vi:ts=4:sw=4:expandtab:ft=python: +from pexpect.pxssh import * # analysis:ignore
\ No newline at end of file |