summaryrefslogtreecommitdiff
path: root/tests
diff options
context:
space:
mode:
authorRobey Pointer <robey@lag.net>2007-02-10 18:25:53 -0800
committerRobey Pointer <robey@lag.net>2007-02-10 18:25:53 -0800
commitecb8ffe37324f802500c219e6dc93128db87bad9 (patch)
tree88246fa73928f7682ace2e7c8cabaacc4397047c /tests
parentf384749a8c90f706394db6c64ba022998c9aa54c (diff)
downloadparamiko-ecb8ffe37324f802500c219e6dc93128db87bad9.tar.gz
[project @ robey@lag.net-20070211022553-mjbl0w7wygpl7os5]
add another test to check out private key auth.
Diffstat (limited to 'tests')
-rw-r--r--tests/test_client.py43
1 files changed, 41 insertions, 2 deletions
diff --git a/tests/test_client.py b/tests/test_client.py
index a8f8bc8c..a801e3b7 100644
--- a/tests/test_client.py
+++ b/tests/test_client.py
@@ -25,6 +25,7 @@ import threading
import time
import unittest
import weakref
+from binascii import hexlify
import paramiko
@@ -41,6 +42,11 @@ class NullServer (paramiko.ServerInterface):
return paramiko.AUTH_SUCCESSFUL
return paramiko.AUTH_FAILED
+ def check_auth_publickey(self, username, key):
+ if (key.get_name() == 'ssh-dss') and (hexlify(key.get_fingerprint()) == '4478f0b9a23cc5182009ff755bc1d26c'):
+ return paramiko.AUTH_SUCCESSFUL
+ return paramiko.AUTH_FAILED
+
def check_channel_request(self, kind, chanid):
return paramiko.OPEN_SUCCEEDED
@@ -109,8 +115,41 @@ class SSHClientTest (unittest.TestCase):
stdin.close()
stdout.close()
stderr.close()
+
+ def test_2_client_dsa(self):
+ """
+ verify that SSHClient works with a DSA key.
+ """
+ host_key = paramiko.RSAKey.from_private_key_file('tests/test_rsa.key')
+ public_host_key = paramiko.RSAKey(data=str(host_key))
+
+ self.tc = paramiko.SSHClient()
+ self.tc.get_host_keys().add(self.addr, 'ssh-rsa', public_host_key)
+ self.tc.connect(self.addr, self.port, username='slowdive', key_filename='tests/test_dss.key')
+
+ self.event.wait(1.0)
+ self.assert_(self.event.isSet())
+ self.assert_(self.ts.is_active())
+ self.assertEquals('slowdive', self.ts.get_username())
+ self.assertEquals(True, self.ts.is_authenticated())
+
+ stdin, stdout, stderr = self.tc.exec_command('yes')
+ schan = self.ts.accept(1.0)
+
+ schan.send('Hello there.\n')
+ schan.send_stderr('This is on stderr.\n')
+ schan.close()
+
+ self.assertEquals('Hello there.\n', stdout.readline())
+ self.assertEquals('', stdout.readline())
+ self.assertEquals('This is on stderr.\n', stderr.readline())
+ self.assertEquals('', stderr.readline())
+
+ stdin.close()
+ stdout.close()
+ stderr.close()
- def test_2_auto_add_policy(self):
+ def test_3_auto_add_policy(self):
"""
verify that SSHClient's AutoAddPolicy works.
"""
@@ -130,7 +169,7 @@ class SSHClientTest (unittest.TestCase):
self.assertEquals(1, len(self.tc.get_host_keys()))
self.assertEquals(public_host_key, self.tc.get_host_keys()[self.addr]['ssh-rsa'])
- def test_3_cleanup(self):
+ def test_4_cleanup(self):
"""
verify that when an SSHClient is collected, its transport (and the
transport's packetizer) is closed.