summaryrefslogtreecommitdiff
path: root/tests
diff options
context:
space:
mode:
authorRobey Pointer <robey@lag.net>2008-01-23 20:50:17 -0800
committerRobey Pointer <robey@lag.net>2008-01-23 20:50:17 -0800
commit888aa8d5b7288496343e2700f94c56ddd25f2e65 (patch)
tree039208f05cd5d6dc550ce960356e63a358292269 /tests
parent953392c0a107a087a774e0cc62101c2e541c8ef3 (diff)
downloadparamiko-888aa8d5b7288496343e2700f94c56ddd25f2e65.tar.gz
[project @ robey@lag.net-20080124045017-dfqiamorj356btrd]
fix the utf-8 password bug for good (aka bug 177117) and add unit tests this time.
Diffstat (limited to 'tests')
-rw-r--r--tests/test_auth.py109
1 files changed, 56 insertions, 53 deletions
diff --git a/tests/test_auth.py b/tests/test_auth.py
index cc35ee9b..a18c57d0 100644
--- a/tests/test_auth.py
+++ b/tests/test_auth.py
@@ -48,6 +48,10 @@ class NullServer (ServerInterface):
return 'password'
if username == 'commie':
return 'keyboard-interactive'
+ if username == 'utf8':
+ return 'password'
+ if username == 'non-utf8':
+ return 'password'
return 'publickey'
def check_auth_password(self, username, password):
@@ -59,7 +63,9 @@ class NullServer (ServerInterface):
if self.paranoid_did_public_key:
return AUTH_SUCCESSFUL
return AUTH_PARTIALLY_SUCCESSFUL
- if (username == 'utf8') and (password == u'\u2022'.encode('utf-8')):
+ if (username == 'utf8') and (password == u'\u2022'):
+ return AUTH_SUCCESSFUL
+ if (username == 'non-utf8') and (password == '\xff'):
return AUTH_SUCCESSFUL
return AUTH_FAILED
@@ -99,21 +105,29 @@ class AuthTest (unittest.TestCase):
self.ts.close()
self.socks.close()
self.sockc.close()
+
+ def start_server(self):
+ host_key = RSAKey.from_private_key_file('tests/test_rsa.key')
+ self.public_host_key = RSAKey(data=str(host_key))
+ self.ts.add_server_key(host_key)
+ self.event = threading.Event()
+ self.server = NullServer()
+ self.assert_(not self.event.isSet())
+ self.ts.start_server(self.event, self.server)
+
+ def verify_finished(self):
+ self.event.wait(1.0)
+ self.assert_(self.event.isSet())
+ self.assert_(self.ts.is_active())
def test_1_bad_auth_type(self):
"""
verify that we get the right exception when an unsupported auth
type is requested.
"""
- host_key = RSAKey.from_private_key_file('tests/test_rsa.key')
- public_host_key = RSAKey(data=str(host_key))
- self.ts.add_server_key(host_key)
- event = threading.Event()
- server = NullServer()
- self.assert_(not event.isSet())
- self.ts.start_server(event, server)
+ self.start_server()
try:
- self.tc.connect(hostkey=public_host_key,
+ self.tc.connect(hostkey=self.public_host_key,
username='unknown', password='error')
self.assert_(False)
except:
@@ -126,14 +140,8 @@ class AuthTest (unittest.TestCase):
verify that a bad password gets the right exception, and that a retry
with the right password works.
"""
- host_key = RSAKey.from_private_key_file('tests/test_rsa.key')
- public_host_key = RSAKey(data=str(host_key))
- self.ts.add_server_key(host_key)
- event = threading.Event()
- server = NullServer()
- self.assert_(not event.isSet())
- self.ts.start_server(event, server)
- self.tc.connect(hostkey=public_host_key)
+ self.start_server()
+ self.tc.connect(hostkey=self.public_host_key)
try:
self.tc.auth_password(username='slowdive', password='error')
self.assert_(False)
@@ -141,43 +149,27 @@ class AuthTest (unittest.TestCase):
etype, evalue, etb = sys.exc_info()
self.assert_(issubclass(etype, SSHException))
self.tc.auth_password(username='slowdive', password='pygmalion')
- event.wait(1.0)
- self.assert_(event.isSet())
- self.assert_(self.ts.is_active())
+ self.verify_finished()
def test_3_multipart_auth(self):
"""
verify that multipart auth works.
"""
- host_key = RSAKey.from_private_key_file('tests/test_rsa.key')
- public_host_key = RSAKey(data=str(host_key))
- self.ts.add_server_key(host_key)
- event = threading.Event()
- server = NullServer()
- self.assert_(not event.isSet())
- self.ts.start_server(event, server)
- self.tc.connect(hostkey=public_host_key)
+ self.start_server()
+ self.tc.connect(hostkey=self.public_host_key)
remain = self.tc.auth_password(username='paranoid', password='paranoid')
self.assertEquals(['publickey'], remain)
key = DSSKey.from_private_key_file('tests/test_dss.key')
remain = self.tc.auth_publickey(username='paranoid', key=key)
self.assertEquals([], remain)
- event.wait(1.0)
- self.assert_(event.isSet())
- self.assert_(self.ts.is_active())
+ self.verify_finished()
def test_4_interactive_auth(self):
"""
verify keyboard-interactive auth works.
"""
- host_key = RSAKey.from_private_key_file('tests/test_rsa.key')
- public_host_key = RSAKey(data=str(host_key))
- self.ts.add_server_key(host_key)
- event = threading.Event()
- server = NullServer()
- self.assert_(not event.isSet())
- self.ts.start_server(event, server)
- self.tc.connect(hostkey=public_host_key)
+ self.start_server()
+ self.tc.connect(hostkey=self.public_host_key)
def handler(title, instructions, prompts):
self.got_title = title
@@ -188,25 +180,36 @@ class AuthTest (unittest.TestCase):
self.assertEquals(self.got_title, 'password')
self.assertEquals(self.got_prompts, [('Password', False)])
self.assertEquals([], remain)
- event.wait(1.0)
- self.assert_(event.isSet())
- self.assert_(self.ts.is_active())
+ self.verify_finished()
def test_5_interactive_auth_fallback(self):
"""
verify that a password auth attempt will fallback to "interactive"
if password auth isn't supported but interactive is.
"""
- host_key = RSAKey.from_private_key_file('tests/test_rsa.key')
- public_host_key = RSAKey(data=str(host_key))
- self.ts.add_server_key(host_key)
- event = threading.Event()
- server = NullServer()
- self.assert_(not event.isSet())
- self.ts.start_server(event, server)
- self.tc.connect(hostkey=public_host_key)
+ self.start_server()
+ self.tc.connect(hostkey=self.public_host_key)
remain = self.tc.auth_password('commie', 'cat')
self.assertEquals([], remain)
- event.wait(1.0)
- self.assert_(event.isSet())
- self.assert_(self.ts.is_active())
+ self.verify_finished()
+
+ def test_6_auth_utf8(self):
+ """
+ verify that utf-8 encoding happens in authentication.
+ """
+ self.start_server()
+ self.tc.connect(hostkey=self.public_host_key)
+ remain = self.tc.auth_password('utf8', u'\u2022')
+ self.assertEquals([], remain)
+ self.verify_finished()
+
+ def test_7_auth_non_utf8(self):
+ """
+ verify that non-utf-8 encoded passwords can be used for broken
+ servers.
+ """
+ self.start_server()
+ self.tc.connect(hostkey=self.public_host_key)
+ remain = self.tc.auth_password('non-utf8', '\xff')
+ self.assertEquals([], remain)
+ self.verify_finished()