diff options
-rw-r--r-- | pexpect/pxssh.py | 4 | ||||
-rwxr-xr-x | tests/fakessh/ssh | 46 | ||||
-rw-r--r-- | tests/test_pxssh.py | 80 |
3 files changed, 125 insertions, 5 deletions
diff --git a/pexpect/pxssh.py b/pexpect/pxssh.py index 49d20cc..2b55209 100644 --- a/pexpect/pxssh.py +++ b/pexpect/pxssh.py @@ -259,7 +259,7 @@ class pxssh (spawn): sync_multiplier=1, check_local_ip=True, password_regex=r'(?i)(?:password:)|(?:passphrase for key)', ssh_tunnels={}, spawn_local_ssh=True, - sync_original_prompt=True, ssh_config=None): + sync_original_prompt=True, ssh_config=None, cmd='ssh'): '''This logs the user into the given server. It uses @@ -354,7 +354,7 @@ class pxssh (spawn): if spawn_local_ssh==False: tunnel = quote(str(tunnel)) ssh_options = ssh_options + ' -' + cmd_type + ' ' + str(tunnel) - cmd = "ssh %s -l %s %s" % (ssh_options, username, server) + cmd += " %s -l %s %s" % (ssh_options, username, server) if self.debug_command_string: return(cmd) diff --git a/tests/fakessh/ssh b/tests/fakessh/ssh index d3259e4..4a5be1b 100755 --- a/tests/fakessh/ssh +++ b/tests/fakessh/ssh @@ -3,13 +3,52 @@ from __future__ import print_function import getpass import sys +import getopt PY3 = (sys.version_info[0] >= 3) if not PY3: input = raw_input -server = sys.argv[-1] -if server == 'noserver': - print('No route to host') +ssh_usage = "usage: ssh [-2qV] [-c cipher_spec] [-l login_name]\r\n" \ + + " hostname" + +cipher_valid_list = ['aes128-ctr', 'aes192-ctr', 'aes256-ctr', 'arcfour256', 'arcfour128', \ + 'aes128-cbc','3des-cbc','blowfish-cbc','cast128-cbc','aes192-cbc', \ + 'aes256-cbc','arcfour'] + +try: + server = sys.argv[-1] + if server == 'noserver': + print('No route to host') + sys.exit(1) + + elif len(sys.argv) < 2: + print(ssh_usage) + sys.exit(1) + + cipher = '' + cipher_list = [] + fullCmdArguments = sys.argv + argumentList = fullCmdArguments[1:] + unixOptions = "2qVc:l" + arguments, values = getopt.getopt(argumentList, unixOptions) + for currentArgument, currentValue in arguments: + if currentArgument in ("-2"): + pass + elif currentArgument in ("-V"): + print("Mock SSH client version 0.2") + sys.exit(1) + elif currentArgument in ("-c"): + cipher = currentValue + cipher_list = cipher.split(",") + for cipher_item in cipher_list: + if cipher_item not in cipher_valid_list: + print("Unknown cipher type '" + str(cipher_item) + "'") + sys.exit(1) + + +except Exception as e: + print(ssh_usage) + print('error = ' + str(e)) sys.exit(1) print("Mock SSH client for tests. Do not enter real security info.") @@ -31,4 +70,5 @@ while True: elif cmd == 'echo $?': print(0) elif cmd in ('exit', 'logout'): + print('Closed connection') break diff --git a/tests/test_pxssh.py b/tests/test_pxssh.py index 5f82302..ac53a9c 100644 --- a/tests/test_pxssh.py +++ b/tests/test_pxssh.py @@ -115,6 +115,86 @@ class PxsshTestCase(SSHTestBase): if confirmation_strings!=len(confirmation_array): assert False, 'String generated from adding an SSH key is incorrect.' + def test_custom_ssh_cmd_debug(self): + ssh = pxssh.pxssh(debug_command_string=True) + cipher_string = '-c aes128-ctr,aes192-ctr,aes256-ctr,arcfour256,arcfour128,' \ + + 'aes128-cbc,3des-cbc,blowfish-cbc,cast128-cbc,aes192-cbc,' \ + + 'aes256-cbc,arcfour' + confirmation_strings = 0 + confirmation_array = [cipher_string, '-2'] + string = ssh.login('server', 'me', password='s3cret', cmd='ssh ' + cipher_string + ' -2') + for confirmation in confirmation_array: + if confirmation in string: + confirmation_strings+=1 + + if confirmation_strings!=len(confirmation_array): + assert False, 'String generated for custom ssh client command is incorrect.' + + def test_custom_ssh_cmd_debug(self): + ssh = pxssh.pxssh(debug_command_string=True) + cipher_string = '-c aes128-ctr,aes192-ctr,aes256-ctr,arcfour256,arcfour128,' \ + + 'aes128-cbc,3des-cbc,blowfish-cbc,cast128-cbc,aes192-cbc,' \ + + 'aes256-cbc,arcfour' + confirmation_strings = 0 + confirmation_array = [cipher_string, '-2'] + string = ssh.login('server', 'me', password='s3cret', cmd='ssh ' + cipher_string + ' -2') + for confirmation in confirmation_array: + if confirmation in string: + confirmation_strings+=1 + + if confirmation_strings!=len(confirmation_array): + assert False, 'String generated for custom ssh client command is incorrect.' + + def test_failed_custom_ssh_cmd_debug(self): + ssh = pxssh.pxssh(debug_command_string=True) + cipher_string = '-c invalid_cipher' + confirmation_strings = 0 + confirmation_array = [cipher_string, '-2'] + string = ssh.login('server', 'me', password='s3cret', cmd='ssh ' + cipher_string + ' -2') + for confirmation in confirmation_array: + if confirmation in string: + confirmation_strings+=1 + + if confirmation_strings!=len(confirmation_array): + assert False, 'String generated for custom ssh client command is incorrect.' + + def test_custom_ssh_cmd(self): + try: + ssh = pxssh.pxssh() + cipher_string = '-c aes128-ctr,aes192-ctr,aes256-ctr,arcfour256,arcfour128,' \ + + 'aes128-cbc,3des-cbc,blowfish-cbc,cast128-cbc,aes192-cbc,' \ + + 'aes256-cbc,arcfour' + result = ssh.login('server', 'me', password='s3cret', cmd='ssh ' + cipher_string + ' -2') + + ssh.PROMPT = r'Closed connection' + ssh.sendline('exit') + ssh.prompt(timeout=5) + string = str(ssh.before) + str(ssh.after) + + if 'Closed connection' not in string: + assert False, 'should have logged into Mock SSH client and exited' + except pxssh.ExceptionPxssh as e: + assert False, 'should not have raised exception, pxssh.ExceptionPxssh' + else: + pass + + def test_failed_custom_ssh_cmd(self): + try: + ssh = pxssh.pxssh() + cipher_string = '-c invalid_cipher' + result = ssh.login('server', 'me', password='s3cret', cmd='ssh ' + cipher_string + ' -2') + + ssh.PROMPT = r'Closed connection' + ssh.sendline('exit') + ssh.prompt(timeout=5) + string = str(ssh.before) + str(ssh.after) + + if 'Closed connection' not in string: + assert False, 'should not have completed logging into Mock SSH client and exited' + except pxssh.ExceptionPxssh as e: + pass + else: + assert False, 'should have raised exception, pxssh.ExceptionPxssh' if __name__ == '__main__': unittest.main() |