summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorThomas Kluyver <takowl@gmail.com>2013-09-19 17:04:23 -0700
committerThomas Kluyver <takowl@gmail.com>2013-09-19 17:04:23 -0700
commit9ee22124477a882836330264be855e1ee138c035 (patch)
tree9d45e9ec54639b2526267232c6d712db7e9e57d2
parentdbff978b56a4c94c4361f083c40df14a7db7d6d4 (diff)
downloadpexpect-git-9ee22124477a882836330264be855e1ee138c035.tar.gz
Move psh and pxssh into pexpect
-rw-r--r--pexpect/psh.py185
-rw-r--r--pexpect/pxssh.py347
-rw-r--r--psh.py188
-rw-r--r--pxssh.py350
4 files changed, 542 insertions, 528 deletions
diff --git a/pexpect/psh.py b/pexpect/psh.py
new file mode 100644
index 0000000..467f546
--- /dev/null
+++ b/pexpect/psh.py
@@ -0,0 +1,185 @@
+'''This is a utility class to make shell scripting easier in Python.
+It combines Pexpect and wraps many Standard Python Library functions
+to make them look more shell-like.
+
+PEXPECT LICENSE
+
+ This license is approved by the OSI and FSF as GPL-compatible.
+ http://opensource.org/licenses/isc-license.txt
+
+ Copyright (c) 2012, Noah Spurrier <noah@noah.org>
+ PERMISSION TO USE, COPY, MODIFY, AND/OR DISTRIBUTE THIS SOFTWARE FOR ANY
+ PURPOSE WITH OR WITHOUT FEE IS HEREBY GRANTED, PROVIDED THAT THE ABOVE
+ COPYRIGHT NOTICE AND THIS PERMISSION NOTICE APPEAR IN ALL COPIES.
+ THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+ WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+ MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+ ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+ WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+ ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+ OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+
+'''
+
+import pexpect, os, sys, re
+from types import *
+
+class ExceptionPsh(pexpect.ExceptionPexpect):
+
+ '''Raised for Psh exceptions.
+ '''
+
+class ExceptionErrorCode(ExceptionPsh):
+
+ '''Raised when an program returns an error code.
+ '''
+
+ def __init__(self, string, err_code, cmd_output):
+
+ ExceptionPsh.__init__(self,string)
+ self.error = err_code
+ self.output = cmd_output
+
+class psh (object):
+
+ def __init__ (self,exp):
+
+ self.exp = exp
+ self.default_timeout = 30 # Seconds
+
+ def ls (self, path=''):
+
+ fileStr = self.run("ls %s" % path)
+ return fileStr.split()
+
+ def cd (self, path='-'):
+
+ return self.run("cd %s" % path)
+
+ def rm (self, path=''):
+
+ return self.run("/bin/rm -f %s" % path)
+
+ def cp (self, path_from='', path_to=''):
+
+ return self.run("/bin/cp %s %s" % (path_from, path_to))
+
+ def mv (self, path_from='', path_to=''):
+
+ return self.run("/bin/mv %s %s" % (path_from, path_to))
+
+ def pwd (self):
+
+ return self.run("/bin/pwd")
+
+ def which (self, exe_name):
+
+ return self.run("/usr/bin/which %s" % exe_name)
+
+ def chown (self, path, user='', group=None, recurse=False):
+
+ xtra_flags = ""
+ if recurse: xtra_flags = "-R"
+ if group: group = ':' + group
+ else: group = ""
+
+ return self.run("/bin/chmod %s %s%s %s" % (recurse,user,group,path))
+
+ def chmod (self, path, perms='', recurse=False):
+
+ xtra_flags = ""
+ if recurse: xtra_flags = "-R"
+ return self.run("/usr/bin/chmod %s %s %s" % (xtra_flags, perms, path))
+
+ def chattr (self, path, attrs='', recurse=False):
+
+ xtra_flags = ""
+ if recurse: xtra_flags = "-R"
+ return self.run("/usr/bin/chattr %s %s %s" % (xtra_flags, attrs, path))
+
+ def cat (self, path):
+
+ return self.run("/bin/cat %s" % path)
+
+ def run (self, cmd, stim_resp_dict = {}, timeout=None):
+
+ (ret, output) = self.run_raw(cmd, stim_resp_dict, timeout)
+ if ret == 0: return output
+ raise ExceptionErrorCode("Running command [%s] returned error [%d]"
+ % (cmd,ret), ret, output)
+
+ def run_raw(self, cmd, stim_resp_dict=None, timeout=None):
+
+ '''Someone contributed this, but now I've lost touch and I forget the
+ motive of this. It was sort of a sketch at the time which doesn't make
+ this any easier to prioritize, but it seemed important at the time. '''
+
+ if not timeout: timeout = self.default_timeout
+
+ def cmd_exp_loop(param):
+ if type(param) is DictType:
+ param = (param,)
+ for item in param:
+ if type(item) is type(()) or type(item) is type([]):
+ cmd_exp_loop(item)
+ elif type(item) is type(""):
+ self.exp.send(item)
+ elif type(item) is type({}):
+ dict = item
+ while(1):
+ stimulus = dict.keys()
+ idx = self.exp.expect_exact(stimulus, timeout)
+ keys = dict.keys()
+ respond = dict[keys[idx]]
+ if ( type(respond) is type({})
+ or type(respond) is type(())
+ or type(item) is type([]) ):
+ cmd_exp_loop(respond)
+ if type(respond) is type(""):
+ self.exp.send(respond)
+ elif ( type(respond) is InstanceType
+ and Exception
+ in inspect.getmro(respond.__class__) ):
+ raise respond
+ elif type(respond) is type(0):
+ return (respond, self.exp.before)
+ elif respond is None:
+ break
+ else:
+ self.exp.send(str(respond))
+
+ if stim_resp_dict == None:
+ stim_resp_dict = {}
+
+ self.exp.sendline("")
+ if not self.exp.prompt(): raise SessionException("No prompt")
+ self.exp.sendline(cmd)
+ self.exp.expect_exact([cmd])
+ stim_resp_dict[re.compile(self.exp.PROMPT)] = None
+ cmd_exp_loop(stim_resp_dict)
+
+ output = self.exp.before
+ # Get the return code
+ self.exp.sendline("echo $?")
+ self.exp.expect_exact(["echo $?"])
+ if not self.exp.prompt():
+ raise SessionException("No prompt", 0, self.exp.before)
+ try:
+ reg = re.compile("^(\d+)")
+ s = self.exp.before.strip()
+ #print s
+ #pdb.set_trace()
+ s = reg.search(s).groups()[0]
+ error_code = int(s)
+ except ValueError:
+ log.error("Cannot parse %s into an int!" % self.exp.before)
+ raise
+
+ if not output[0:2] == '\r\n':
+ log.warning("Returned output lacks leading \\r\\n which may indicate a tae error")
+ log.debug2("Offending output string: [%s]" % output)
+ return (error_code, output)
+ else:
+ return(error_code, output[2:])
+
+# def pipe (self, cmd, string_to_send):
diff --git a/pexpect/pxssh.py b/pexpect/pxssh.py
new file mode 100644
index 0000000..5566029
--- /dev/null
+++ b/pexpect/pxssh.py
@@ -0,0 +1,347 @@
+'''This class extends pexpect.spawn to specialize setting up SSH connections.
+This adds methods for login, logout, and expecting the shell prompt.
+
+PEXPECT LICENSE
+
+ This license is approved by the OSI and FSF as GPL-compatible.
+ http://opensource.org/licenses/isc-license.txt
+
+ Copyright (c) 2012, Noah Spurrier <noah@noah.org>
+ PERMISSION TO USE, COPY, MODIFY, AND/OR DISTRIBUTE THIS SOFTWARE FOR ANY
+ PURPOSE WITH OR WITHOUT FEE IS HEREBY GRANTED, PROVIDED THAT THE ABOVE
+ COPYRIGHT NOTICE AND THIS PERMISSION NOTICE APPEAR IN ALL COPIES.
+ THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+ WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+ MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+ ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+ WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+ ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+ OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+
+'''
+
+from pexpect import *
+import pexpect
+import time
+import os
+
+__all__ = ['ExceptionPxssh', 'pxssh']
+
+# Exception classes used by this module.
+class ExceptionPxssh(ExceptionPexpect):
+ '''Raised for pxssh exceptions.
+ '''
+
+class pxssh (spawn):
+
+ '''This class extends pexpect.spawn to specialize setting up SSH
+ connections. This adds methods for login, logout, and expecting the shell
+ prompt. It does various tricky things to handle many situations in the SSH
+ login process. For example, if the session is your first login, then pxssh
+ automatically accepts the remote certificate; or if you have public key
+ authentication setup then pxssh won't wait for the password prompt.
+
+ pxssh uses the shell prompt to synchronize output from the remote host. In
+ order to make this more robust it sets the shell prompt to something more
+ unique than just $ or #. This should work on most Borne/Bash or Csh style
+ shells.
+
+ Example that runs a few commands on a remote server and prints the result::
+
+ import pxssh
+ import getpass
+ try:
+ s = pxssh.pxssh()
+ hostname = raw_input('hostname: ')
+ username = raw_input('username: ')
+ password = getpass.getpass('password: ')
+ s.login (hostname, username, password)
+ s.sendline ('uptime') # run a command
+ s.prompt() # match the prompt
+ print s.before # print everything before the prompt.
+ s.sendline ('ls -l')
+ s.prompt()
+ print s.before
+ s.sendline ('df')
+ s.prompt()
+ print s.before
+ s.logout()
+ except pxssh.ExceptionPxssh, e:
+ print "pxssh failed on login."
+ print str(e)
+
+ Note that if you have ssh-agent running while doing development with pxssh
+ then this can lead to a lot of confusion. Many X display managers (xdm,
+ gdm, kdm, etc.) will automatically start a GUI agent. You may see a GUI
+ dialog box popup asking for a password during development. You should turn
+ off any key agents during testing. The 'force_password' attribute will turn
+ off public key authentication. This will only work if the remote SSH server
+ is configured to allow password logins. Example of using 'force_password'
+ attribute::
+
+ s = pxssh.pxssh()
+ s.force_password = True
+ hostname = raw_input('hostname: ')
+ username = raw_input('username: ')
+ password = getpass.getpass('password: ')
+ s.login (hostname, username, password)
+ '''
+
+ def __init__ (self, timeout=30, maxread=2000, searchwindowsize=None, logfile=None, cwd=None, env=None):
+
+ spawn.__init__(self, None, timeout=timeout, maxread=maxread, searchwindowsize=searchwindowsize, logfile=logfile, cwd=cwd, env=env)
+
+ self.name = '<pxssh>'
+
+ #SUBTLE HACK ALERT! Note that the command that SETS the prompt uses a
+ #slightly different string than the regular expression to match it. This
+ #is because when you set the prompt the command will echo back, but we
+ #don't want to match the echoed command. So if we make the set command
+ #slightly different than the regex we eliminate the problem. To make the
+ #set command different we add a backslash in front of $. The $ doesn't
+ #need to be escaped, but it doesn't hurt and serves to make the set
+ #prompt command different than the regex.
+
+ # used to match the command-line prompt
+ self.UNIQUE_PROMPT = "\[PEXPECT\][\$\#] "
+ self.PROMPT = self.UNIQUE_PROMPT
+
+ # used to set shell command-line prompt to UNIQUE_PROMPT.
+ self.PROMPT_SET_SH = "PS1='[PEXPECT]\$ '"
+ self.PROMPT_SET_CSH = "set prompt='[PEXPECT]\$ '"
+ self.SSH_OPTS = ("-o'RSAAuthentication=no'"
+ + " -o 'PubkeyAuthentication=no'")
+# Disabling host key checking, makes you vulnerable to MITM attacks.
+# + " -o 'StrictHostKeyChecking=no'"
+# + " -o 'UserKnownHostsFile /dev/null' ")
+ # Disabling X11 forwarding gets rid of the annoying SSH_ASKPASS from
+ # displaying a GUI password dialog. I have not figured out how to
+ # disable only SSH_ASKPASS without also disabling X11 forwarding.
+ # Unsetting SSH_ASKPASS on the remote side doesn't disable it! Annoying!
+ #self.SSH_OPTS = "-x -o'RSAAuthentication=no' -o 'PubkeyAuthentication=no'"
+ self.force_password = False
+ self.auto_prompt_reset = True
+
+ def levenshtein_distance(self, a,b):
+
+ '''This calculates the Levenshtein distance between a and b.
+ '''
+
+ n, m = len(a), len(b)
+ if n > m:
+ a,b = b,a
+ n,m = m,n
+ current = range(n+1)
+ for i in range(1,m+1):
+ previous, current = current, [i]+[0]*n
+ for j in range(1,n+1):
+ add, delete = previous[j]+1, current[j-1]+1
+ change = previous[j-1]
+ if a[j-1] != b[i-1]:
+ change = change + 1
+ current[j] = min(add, delete, change)
+ return current[n]
+
+ def sync_original_prompt (self):
+
+ '''This attempts to find the prompt. Basically, press enter and record
+ the response; press enter again and record the response; if the two
+ responses are similar then assume we are at the original prompt. This
+ is a slow function. It can take over 10 seconds. '''
+
+ # All of these timing pace values are magic.
+ # I came up with these based on what seemed reliable for
+ # connecting to a heavily loaded machine I have.
+ self.sendline()
+ time.sleep(0.1)
+ # If latency is worse than these values then this will fail.
+
+ try:
+ # Clear the buffer before getting the prompt.
+ self.read_nonblocking(size=10000,timeout=1)
+ except TIMEOUT:
+ pass
+ time.sleep(0.1)
+ self.sendline()
+ time.sleep(0.5)
+ x = self.read_nonblocking(size=1000,timeout=1)
+ time.sleep(0.1)
+ self.sendline()
+ time.sleep(0.5)
+ a = self.read_nonblocking(size=1000,timeout=1)
+ time.sleep(0.1)
+ self.sendline()
+ time.sleep(0.5)
+ b = self.read_nonblocking(size=1000,timeout=1)
+ ld = self.levenshtein_distance(a,b)
+ len_a = len(a)
+ if len_a == 0:
+ return False
+ if float(ld)/len_a < 0.4:
+ return True
+ return False
+
+ ### TODO: This is getting messy and I'm pretty sure this isn't perfect.
+ ### TODO: I need to draw a flow chart for this.
+ def login (self,server,username,password='',terminal_type='ansi',original_prompt=r"[#$]",login_timeout=10,port=None,auto_prompt_reset=True,ssh_key=None):
+
+ '''This logs the user into the given server. It uses the
+ 'original_prompt' to try to find the prompt right after login. When it
+ finds the prompt it immediately tries to reset the prompt to something
+ more easily matched. The default 'original_prompt' is very optimistic
+ and is easily fooled. It's more reliable to try to match the original
+ prompt as exactly as possible to prevent false matches by server
+ strings such as the "Message Of The Day". On many systems you can
+ disable the MOTD on the remote server by creating a zero-length file
+ called "~/.hushlogin" on the remote server. If a prompt cannot be found
+ then this will not necessarily cause the login to fail. In the case of
+ a timeout when looking for the prompt we assume that the original
+ prompt was so weird that we could not match it, so we use a few tricks
+ to guess when we have reached the prompt. Then we hope for the best and
+ blindly try to reset the prompt to something more unique. If that fails
+ then login() raises an ExceptionPxssh exception.
+
+ In some situations it is not possible or desirable to reset the
+ original prompt. In this case, set 'auto_prompt_reset' to False to
+ inhibit setting the prompt to the UNIQUE_PROMPT. Remember that pxssh
+ uses a unique prompt in the prompt() method. If the original prompt is
+ not reset then this will disable the prompt() method unless you
+ manually set the PROMPT attribute. '''
+
+ ssh_options = '-q'
+ if self.force_password:
+ ssh_options = ssh_options + ' ' + self.SSH_OPTS
+ if port is not None:
+ ssh_options = ssh_options + ' -p %s'%(str(port))
+ if ssh_key is not None:
+ try:
+ os.path.isfile(ssh_key)
+ except:
+ raise ExceptionPxssh ('private ssh key does not exist')
+ ssh_options = ssh_options + ' -i %s' % (ssh_key)
+ cmd = "ssh %s -l %s %s" % (ssh_options, username, server)
+
+ # This does not distinguish between a remote server 'password' prompt
+ # and a local ssh 'passphrase' prompt (for unlocking a private key).
+ spawn._spawn(self, cmd)
+ i = self.expect(["(?i)are you sure you want to continue connecting", original_prompt, "(?i)(?:password)|(?:passphrase for key)", "(?i)permission denied", "(?i)terminal type", TIMEOUT, "(?i)connection closed by remote host"], timeout=login_timeout)
+
+ # First phase
+ if i==0:
+ # New certificate -- always accept it.
+ # This is what you get if SSH does not have the remote host's
+ # public key stored in the 'known_hosts' cache.
+ self.sendline("yes")
+ i = self.expect(["(?i)are you sure you want to continue connecting", original_prompt, "(?i)(?:password)|(?:passphrase for key)", "(?i)permission denied", "(?i)terminal type", TIMEOUT])
+ if i==2: # password or passphrase
+ self.sendline(password)
+ i = self.expect(["(?i)are you sure you want to continue connecting", original_prompt, "(?i)(?:password)|(?:passphrase for key)", "(?i)permission denied", "(?i)terminal type", TIMEOUT])
+ if i==4:
+ self.sendline(terminal_type)
+ i = self.expect(["(?i)are you sure you want to continue connecting", original_prompt, "(?i)(?:password)|(?:passphrase for key)", "(?i)permission denied", "(?i)terminal type", TIMEOUT])
+
+ # Second phase
+ if i==0:
+ # This is weird. This should not happen twice in a row.
+ self.close()
+ raise ExceptionPxssh ('Weird error. Got "are you sure" prompt twice.')
+ elif i==1: # can occur if you have a public key pair set to authenticate.
+ ### TODO: May NOT be OK if expect() got tricked and matched a false prompt.
+ pass
+ elif i==2: # password prompt again
+ # For incorrect passwords, some ssh servers will
+ # ask for the password again, others return 'denied' right away.
+ # If we get the password prompt again then this means
+ # we didn't get the password right the first time.
+ self.close()
+ raise ExceptionPxssh ('password refused')
+ elif i==3: # permission denied -- password was bad.
+ self.close()
+ raise ExceptionPxssh ('permission denied')
+ elif i==4: # terminal type again? WTF?
+ self.close()
+ raise ExceptionPxssh ('Weird error. Got "terminal type" prompt twice.')
+ elif i==5: # Timeout
+ #This is tricky... I presume that we are at the command-line prompt.
+ #It may be that the shell prompt was so weird that we couldn't match
+ #it. Or it may be that we couldn't log in for some other reason. I
+ #can't be sure, but it's safe to guess that we did login because if
+ #I presume wrong and we are not logged in then this should be caught
+ #later when I try to set the shell prompt.
+ pass
+ elif i==6: # Connection closed by remote host
+ self.close()
+ raise ExceptionPxssh ('connection closed')
+ else: # Unexpected
+ self.close()
+ raise ExceptionPxssh ('unexpected login response')
+ if not self.sync_original_prompt():
+ self.close()
+ raise ExceptionPxssh ('could not synchronize with original prompt')
+ # We appear to be in.
+ # set shell prompt to something unique.
+ if auto_prompt_reset:
+ if not self.set_unique_prompt():
+ self.close()
+ raise ExceptionPxssh ('could not set shell prompt\n'+self.before)
+ return True
+
+ def logout (self):
+
+ '''This sends exit to the remote shell. If there are stopped jobs then
+ this automatically sends exit twice. '''
+
+ self.sendline("exit")
+ index = self.expect([EOF, "(?i)there are stopped jobs"])
+ if index==1:
+ self.sendline("exit")
+ self.expect(EOF)
+ self.close()
+
+ def prompt (self, timeout=-1):
+
+ '''This matches the shell prompt. This is little more than a short-cut
+ to the expect() method. This returns True if the shell prompt was
+ matched. This returns False if a timeout was raised. Note that if you
+ called login() with auto_prompt_reset set to False then before calling
+ prompt() you must set the PROMPT attribute to a regex that prompt()
+ will use for matching the prompt. Calling prompt() will erase the
+ contents of the 'before' attribute even if no prompt is ever matched.
+ If timeout is not given or it is set to -1 then self.timeout is used.
+ '''
+
+ if timeout == -1:
+ timeout = self.timeout
+ i = self.expect([self.PROMPT, TIMEOUT], timeout=timeout)
+ if i==1:
+ return False
+ return True
+
+ def set_unique_prompt (self):
+
+ '''This sets the remote prompt to something more unique than # or $.
+ This makes it easier for the prompt() method to match the shell prompt
+ unambiguously. This method is called automatically by the login()
+ method, but you may want to call it manually if you somehow reset the
+ shell prompt. For example, if you 'su' to a different user then you
+ will need to manually reset the prompt. This sends shell commands to
+ the remote host to set the prompt, so this assumes the remote host is
+ ready to receive commands.
+
+ Alternatively, you may use your own prompt pattern. Just set the PROMPT
+ attribute to a regular expression that matches it. In this case you
+ should call login() with auto_prompt_reset=False; then set the PROMPT
+ attribute. After that the prompt() method will try to match your prompt
+ pattern.'''
+
+ self.sendline ("unset PROMPT_COMMAND")
+ self.sendline (self.PROMPT_SET_SH) # sh-style
+ i = self.expect ([TIMEOUT, self.PROMPT], timeout=10)
+ if i == 0: # csh-style
+ self.sendline (self.PROMPT_SET_CSH)
+ i = self.expect ([TIMEOUT, self.PROMPT], timeout=10)
+ if i == 0:
+ return False
+ return True
+
+# vi:ts=4:sw=4:expandtab:ft=python:
diff --git a/psh.py b/psh.py
index 467f546..400efde 100644
--- a/psh.py
+++ b/psh.py
@@ -1,185 +1,7 @@
-'''This is a utility class to make shell scripting easier in Python.
-It combines Pexpect and wraps many Standard Python Library functions
-to make them look more shell-like.
+import warnings
-PEXPECT LICENSE
+warnings.warn("This module has been moved to pexpect.psh, please update imports.",
+ ImportWarning)
+del warnings
- This license is approved by the OSI and FSF as GPL-compatible.
- http://opensource.org/licenses/isc-license.txt
-
- Copyright (c) 2012, Noah Spurrier <noah@noah.org>
- PERMISSION TO USE, COPY, MODIFY, AND/OR DISTRIBUTE THIS SOFTWARE FOR ANY
- PURPOSE WITH OR WITHOUT FEE IS HEREBY GRANTED, PROVIDED THAT THE ABOVE
- COPYRIGHT NOTICE AND THIS PERMISSION NOTICE APPEAR IN ALL COPIES.
- THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
- WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
- MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
- ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
- WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
- ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
- OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
-
-'''
-
-import pexpect, os, sys, re
-from types import *
-
-class ExceptionPsh(pexpect.ExceptionPexpect):
-
- '''Raised for Psh exceptions.
- '''
-
-class ExceptionErrorCode(ExceptionPsh):
-
- '''Raised when an program returns an error code.
- '''
-
- def __init__(self, string, err_code, cmd_output):
-
- ExceptionPsh.__init__(self,string)
- self.error = err_code
- self.output = cmd_output
-
-class psh (object):
-
- def __init__ (self,exp):
-
- self.exp = exp
- self.default_timeout = 30 # Seconds
-
- def ls (self, path=''):
-
- fileStr = self.run("ls %s" % path)
- return fileStr.split()
-
- def cd (self, path='-'):
-
- return self.run("cd %s" % path)
-
- def rm (self, path=''):
-
- return self.run("/bin/rm -f %s" % path)
-
- def cp (self, path_from='', path_to=''):
-
- return self.run("/bin/cp %s %s" % (path_from, path_to))
-
- def mv (self, path_from='', path_to=''):
-
- return self.run("/bin/mv %s %s" % (path_from, path_to))
-
- def pwd (self):
-
- return self.run("/bin/pwd")
-
- def which (self, exe_name):
-
- return self.run("/usr/bin/which %s" % exe_name)
-
- def chown (self, path, user='', group=None, recurse=False):
-
- xtra_flags = ""
- if recurse: xtra_flags = "-R"
- if group: group = ':' + group
- else: group = ""
-
- return self.run("/bin/chmod %s %s%s %s" % (recurse,user,group,path))
-
- def chmod (self, path, perms='', recurse=False):
-
- xtra_flags = ""
- if recurse: xtra_flags = "-R"
- return self.run("/usr/bin/chmod %s %s %s" % (xtra_flags, perms, path))
-
- def chattr (self, path, attrs='', recurse=False):
-
- xtra_flags = ""
- if recurse: xtra_flags = "-R"
- return self.run("/usr/bin/chattr %s %s %s" % (xtra_flags, attrs, path))
-
- def cat (self, path):
-
- return self.run("/bin/cat %s" % path)
-
- def run (self, cmd, stim_resp_dict = {}, timeout=None):
-
- (ret, output) = self.run_raw(cmd, stim_resp_dict, timeout)
- if ret == 0: return output
- raise ExceptionErrorCode("Running command [%s] returned error [%d]"
- % (cmd,ret), ret, output)
-
- def run_raw(self, cmd, stim_resp_dict=None, timeout=None):
-
- '''Someone contributed this, but now I've lost touch and I forget the
- motive of this. It was sort of a sketch at the time which doesn't make
- this any easier to prioritize, but it seemed important at the time. '''
-
- if not timeout: timeout = self.default_timeout
-
- def cmd_exp_loop(param):
- if type(param) is DictType:
- param = (param,)
- for item in param:
- if type(item) is type(()) or type(item) is type([]):
- cmd_exp_loop(item)
- elif type(item) is type(""):
- self.exp.send(item)
- elif type(item) is type({}):
- dict = item
- while(1):
- stimulus = dict.keys()
- idx = self.exp.expect_exact(stimulus, timeout)
- keys = dict.keys()
- respond = dict[keys[idx]]
- if ( type(respond) is type({})
- or type(respond) is type(())
- or type(item) is type([]) ):
- cmd_exp_loop(respond)
- if type(respond) is type(""):
- self.exp.send(respond)
- elif ( type(respond) is InstanceType
- and Exception
- in inspect.getmro(respond.__class__) ):
- raise respond
- elif type(respond) is type(0):
- return (respond, self.exp.before)
- elif respond is None:
- break
- else:
- self.exp.send(str(respond))
-
- if stim_resp_dict == None:
- stim_resp_dict = {}
-
- self.exp.sendline("")
- if not self.exp.prompt(): raise SessionException("No prompt")
- self.exp.sendline(cmd)
- self.exp.expect_exact([cmd])
- stim_resp_dict[re.compile(self.exp.PROMPT)] = None
- cmd_exp_loop(stim_resp_dict)
-
- output = self.exp.before
- # Get the return code
- self.exp.sendline("echo $?")
- self.exp.expect_exact(["echo $?"])
- if not self.exp.prompt():
- raise SessionException("No prompt", 0, self.exp.before)
- try:
- reg = re.compile("^(\d+)")
- s = self.exp.before.strip()
- #print s
- #pdb.set_trace()
- s = reg.search(s).groups()[0]
- error_code = int(s)
- except ValueError:
- log.error("Cannot parse %s into an int!" % self.exp.before)
- raise
-
- if not output[0:2] == '\r\n':
- log.warning("Returned output lacks leading \\r\\n which may indicate a tae error")
- log.debug2("Offending output string: [%s]" % output)
- return (error_code, output)
- else:
- return(error_code, output[2:])
-
-# def pipe (self, cmd, string_to_send):
+from pexpect.psh import * # analysis:ignore \ No newline at end of file
diff --git a/pxssh.py b/pxssh.py
index 5566029..1849769 100644
--- a/pxssh.py
+++ b/pxssh.py
@@ -1,347 +1,7 @@
-'''This class extends pexpect.spawn to specialize setting up SSH connections.
-This adds methods for login, logout, and expecting the shell prompt.
+import warnings
-PEXPECT LICENSE
+warnings.warn("This module has been moved to pexpect.pxssh, please update imports.",
+ ImportWarning)
+del warnings
- This license is approved by the OSI and FSF as GPL-compatible.
- http://opensource.org/licenses/isc-license.txt
-
- Copyright (c) 2012, Noah Spurrier <noah@noah.org>
- PERMISSION TO USE, COPY, MODIFY, AND/OR DISTRIBUTE THIS SOFTWARE FOR ANY
- PURPOSE WITH OR WITHOUT FEE IS HEREBY GRANTED, PROVIDED THAT THE ABOVE
- COPYRIGHT NOTICE AND THIS PERMISSION NOTICE APPEAR IN ALL COPIES.
- THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
- WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
- MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
- ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
- WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
- ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
- OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
-
-'''
-
-from pexpect import *
-import pexpect
-import time
-import os
-
-__all__ = ['ExceptionPxssh', 'pxssh']
-
-# Exception classes used by this module.
-class ExceptionPxssh(ExceptionPexpect):
- '''Raised for pxssh exceptions.
- '''
-
-class pxssh (spawn):
-
- '''This class extends pexpect.spawn to specialize setting up SSH
- connections. This adds methods for login, logout, and expecting the shell
- prompt. It does various tricky things to handle many situations in the SSH
- login process. For example, if the session is your first login, then pxssh
- automatically accepts the remote certificate; or if you have public key
- authentication setup then pxssh won't wait for the password prompt.
-
- pxssh uses the shell prompt to synchronize output from the remote host. In
- order to make this more robust it sets the shell prompt to something more
- unique than just $ or #. This should work on most Borne/Bash or Csh style
- shells.
-
- Example that runs a few commands on a remote server and prints the result::
-
- import pxssh
- import getpass
- try:
- s = pxssh.pxssh()
- hostname = raw_input('hostname: ')
- username = raw_input('username: ')
- password = getpass.getpass('password: ')
- s.login (hostname, username, password)
- s.sendline ('uptime') # run a command
- s.prompt() # match the prompt
- print s.before # print everything before the prompt.
- s.sendline ('ls -l')
- s.prompt()
- print s.before
- s.sendline ('df')
- s.prompt()
- print s.before
- s.logout()
- except pxssh.ExceptionPxssh, e:
- print "pxssh failed on login."
- print str(e)
-
- Note that if you have ssh-agent running while doing development with pxssh
- then this can lead to a lot of confusion. Many X display managers (xdm,
- gdm, kdm, etc.) will automatically start a GUI agent. You may see a GUI
- dialog box popup asking for a password during development. You should turn
- off any key agents during testing. The 'force_password' attribute will turn
- off public key authentication. This will only work if the remote SSH server
- is configured to allow password logins. Example of using 'force_password'
- attribute::
-
- s = pxssh.pxssh()
- s.force_password = True
- hostname = raw_input('hostname: ')
- username = raw_input('username: ')
- password = getpass.getpass('password: ')
- s.login (hostname, username, password)
- '''
-
- def __init__ (self, timeout=30, maxread=2000, searchwindowsize=None, logfile=None, cwd=None, env=None):
-
- spawn.__init__(self, None, timeout=timeout, maxread=maxread, searchwindowsize=searchwindowsize, logfile=logfile, cwd=cwd, env=env)
-
- self.name = '<pxssh>'
-
- #SUBTLE HACK ALERT! Note that the command that SETS the prompt uses a
- #slightly different string than the regular expression to match it. This
- #is because when you set the prompt the command will echo back, but we
- #don't want to match the echoed command. So if we make the set command
- #slightly different than the regex we eliminate the problem. To make the
- #set command different we add a backslash in front of $. The $ doesn't
- #need to be escaped, but it doesn't hurt and serves to make the set
- #prompt command different than the regex.
-
- # used to match the command-line prompt
- self.UNIQUE_PROMPT = "\[PEXPECT\][\$\#] "
- self.PROMPT = self.UNIQUE_PROMPT
-
- # used to set shell command-line prompt to UNIQUE_PROMPT.
- self.PROMPT_SET_SH = "PS1='[PEXPECT]\$ '"
- self.PROMPT_SET_CSH = "set prompt='[PEXPECT]\$ '"
- self.SSH_OPTS = ("-o'RSAAuthentication=no'"
- + " -o 'PubkeyAuthentication=no'")
-# Disabling host key checking, makes you vulnerable to MITM attacks.
-# + " -o 'StrictHostKeyChecking=no'"
-# + " -o 'UserKnownHostsFile /dev/null' ")
- # Disabling X11 forwarding gets rid of the annoying SSH_ASKPASS from
- # displaying a GUI password dialog. I have not figured out how to
- # disable only SSH_ASKPASS without also disabling X11 forwarding.
- # Unsetting SSH_ASKPASS on the remote side doesn't disable it! Annoying!
- #self.SSH_OPTS = "-x -o'RSAAuthentication=no' -o 'PubkeyAuthentication=no'"
- self.force_password = False
- self.auto_prompt_reset = True
-
- def levenshtein_distance(self, a,b):
-
- '''This calculates the Levenshtein distance between a and b.
- '''
-
- n, m = len(a), len(b)
- if n > m:
- a,b = b,a
- n,m = m,n
- current = range(n+1)
- for i in range(1,m+1):
- previous, current = current, [i]+[0]*n
- for j in range(1,n+1):
- add, delete = previous[j]+1, current[j-1]+1
- change = previous[j-1]
- if a[j-1] != b[i-1]:
- change = change + 1
- current[j] = min(add, delete, change)
- return current[n]
-
- def sync_original_prompt (self):
-
- '''This attempts to find the prompt. Basically, press enter and record
- the response; press enter again and record the response; if the two
- responses are similar then assume we are at the original prompt. This
- is a slow function. It can take over 10 seconds. '''
-
- # All of these timing pace values are magic.
- # I came up with these based on what seemed reliable for
- # connecting to a heavily loaded machine I have.
- self.sendline()
- time.sleep(0.1)
- # If latency is worse than these values then this will fail.
-
- try:
- # Clear the buffer before getting the prompt.
- self.read_nonblocking(size=10000,timeout=1)
- except TIMEOUT:
- pass
- time.sleep(0.1)
- self.sendline()
- time.sleep(0.5)
- x = self.read_nonblocking(size=1000,timeout=1)
- time.sleep(0.1)
- self.sendline()
- time.sleep(0.5)
- a = self.read_nonblocking(size=1000,timeout=1)
- time.sleep(0.1)
- self.sendline()
- time.sleep(0.5)
- b = self.read_nonblocking(size=1000,timeout=1)
- ld = self.levenshtein_distance(a,b)
- len_a = len(a)
- if len_a == 0:
- return False
- if float(ld)/len_a < 0.4:
- return True
- return False
-
- ### TODO: This is getting messy and I'm pretty sure this isn't perfect.
- ### TODO: I need to draw a flow chart for this.
- def login (self,server,username,password='',terminal_type='ansi',original_prompt=r"[#$]",login_timeout=10,port=None,auto_prompt_reset=True,ssh_key=None):
-
- '''This logs the user into the given server. It uses the
- 'original_prompt' to try to find the prompt right after login. When it
- finds the prompt it immediately tries to reset the prompt to something
- more easily matched. The default 'original_prompt' is very optimistic
- and is easily fooled. It's more reliable to try to match the original
- prompt as exactly as possible to prevent false matches by server
- strings such as the "Message Of The Day". On many systems you can
- disable the MOTD on the remote server by creating a zero-length file
- called "~/.hushlogin" on the remote server. If a prompt cannot be found
- then this will not necessarily cause the login to fail. In the case of
- a timeout when looking for the prompt we assume that the original
- prompt was so weird that we could not match it, so we use a few tricks
- to guess when we have reached the prompt. Then we hope for the best and
- blindly try to reset the prompt to something more unique. If that fails
- then login() raises an ExceptionPxssh exception.
-
- In some situations it is not possible or desirable to reset the
- original prompt. In this case, set 'auto_prompt_reset' to False to
- inhibit setting the prompt to the UNIQUE_PROMPT. Remember that pxssh
- uses a unique prompt in the prompt() method. If the original prompt is
- not reset then this will disable the prompt() method unless you
- manually set the PROMPT attribute. '''
-
- ssh_options = '-q'
- if self.force_password:
- ssh_options = ssh_options + ' ' + self.SSH_OPTS
- if port is not None:
- ssh_options = ssh_options + ' -p %s'%(str(port))
- if ssh_key is not None:
- try:
- os.path.isfile(ssh_key)
- except:
- raise ExceptionPxssh ('private ssh key does not exist')
- ssh_options = ssh_options + ' -i %s' % (ssh_key)
- cmd = "ssh %s -l %s %s" % (ssh_options, username, server)
-
- # This does not distinguish between a remote server 'password' prompt
- # and a local ssh 'passphrase' prompt (for unlocking a private key).
- spawn._spawn(self, cmd)
- i = self.expect(["(?i)are you sure you want to continue connecting", original_prompt, "(?i)(?:password)|(?:passphrase for key)", "(?i)permission denied", "(?i)terminal type", TIMEOUT, "(?i)connection closed by remote host"], timeout=login_timeout)
-
- # First phase
- if i==0:
- # New certificate -- always accept it.
- # This is what you get if SSH does not have the remote host's
- # public key stored in the 'known_hosts' cache.
- self.sendline("yes")
- i = self.expect(["(?i)are you sure you want to continue connecting", original_prompt, "(?i)(?:password)|(?:passphrase for key)", "(?i)permission denied", "(?i)terminal type", TIMEOUT])
- if i==2: # password or passphrase
- self.sendline(password)
- i = self.expect(["(?i)are you sure you want to continue connecting", original_prompt, "(?i)(?:password)|(?:passphrase for key)", "(?i)permission denied", "(?i)terminal type", TIMEOUT])
- if i==4:
- self.sendline(terminal_type)
- i = self.expect(["(?i)are you sure you want to continue connecting", original_prompt, "(?i)(?:password)|(?:passphrase for key)", "(?i)permission denied", "(?i)terminal type", TIMEOUT])
-
- # Second phase
- if i==0:
- # This is weird. This should not happen twice in a row.
- self.close()
- raise ExceptionPxssh ('Weird error. Got "are you sure" prompt twice.')
- elif i==1: # can occur if you have a public key pair set to authenticate.
- ### TODO: May NOT be OK if expect() got tricked and matched a false prompt.
- pass
- elif i==2: # password prompt again
- # For incorrect passwords, some ssh servers will
- # ask for the password again, others return 'denied' right away.
- # If we get the password prompt again then this means
- # we didn't get the password right the first time.
- self.close()
- raise ExceptionPxssh ('password refused')
- elif i==3: # permission denied -- password was bad.
- self.close()
- raise ExceptionPxssh ('permission denied')
- elif i==4: # terminal type again? WTF?
- self.close()
- raise ExceptionPxssh ('Weird error. Got "terminal type" prompt twice.')
- elif i==5: # Timeout
- #This is tricky... I presume that we are at the command-line prompt.
- #It may be that the shell prompt was so weird that we couldn't match
- #it. Or it may be that we couldn't log in for some other reason. I
- #can't be sure, but it's safe to guess that we did login because if
- #I presume wrong and we are not logged in then this should be caught
- #later when I try to set the shell prompt.
- pass
- elif i==6: # Connection closed by remote host
- self.close()
- raise ExceptionPxssh ('connection closed')
- else: # Unexpected
- self.close()
- raise ExceptionPxssh ('unexpected login response')
- if not self.sync_original_prompt():
- self.close()
- raise ExceptionPxssh ('could not synchronize with original prompt')
- # We appear to be in.
- # set shell prompt to something unique.
- if auto_prompt_reset:
- if not self.set_unique_prompt():
- self.close()
- raise ExceptionPxssh ('could not set shell prompt\n'+self.before)
- return True
-
- def logout (self):
-
- '''This sends exit to the remote shell. If there are stopped jobs then
- this automatically sends exit twice. '''
-
- self.sendline("exit")
- index = self.expect([EOF, "(?i)there are stopped jobs"])
- if index==1:
- self.sendline("exit")
- self.expect(EOF)
- self.close()
-
- def prompt (self, timeout=-1):
-
- '''This matches the shell prompt. This is little more than a short-cut
- to the expect() method. This returns True if the shell prompt was
- matched. This returns False if a timeout was raised. Note that if you
- called login() with auto_prompt_reset set to False then before calling
- prompt() you must set the PROMPT attribute to a regex that prompt()
- will use for matching the prompt. Calling prompt() will erase the
- contents of the 'before' attribute even if no prompt is ever matched.
- If timeout is not given or it is set to -1 then self.timeout is used.
- '''
-
- if timeout == -1:
- timeout = self.timeout
- i = self.expect([self.PROMPT, TIMEOUT], timeout=timeout)
- if i==1:
- return False
- return True
-
- def set_unique_prompt (self):
-
- '''This sets the remote prompt to something more unique than # or $.
- This makes it easier for the prompt() method to match the shell prompt
- unambiguously. This method is called automatically by the login()
- method, but you may want to call it manually if you somehow reset the
- shell prompt. For example, if you 'su' to a different user then you
- will need to manually reset the prompt. This sends shell commands to
- the remote host to set the prompt, so this assumes the remote host is
- ready to receive commands.
-
- Alternatively, you may use your own prompt pattern. Just set the PROMPT
- attribute to a regular expression that matches it. In this case you
- should call login() with auto_prompt_reset=False; then set the PROMPT
- attribute. After that the prompt() method will try to match your prompt
- pattern.'''
-
- self.sendline ("unset PROMPT_COMMAND")
- self.sendline (self.PROMPT_SET_SH) # sh-style
- i = self.expect ([TIMEOUT, self.PROMPT], timeout=10)
- if i == 0: # csh-style
- self.sendline (self.PROMPT_SET_CSH)
- i = self.expect ([TIMEOUT, self.PROMPT], timeout=10)
- if i == 0:
- return False
- return True
-
-# vi:ts=4:sw=4:expandtab:ft=python:
+from pexpect.pxssh import * # analysis:ignore \ No newline at end of file