diff options
author | Robey Pointer <robey@lag.net> | 2008-01-23 20:50:17 -0800 |
---|---|---|
committer | Robey Pointer <robey@lag.net> | 2008-01-23 20:50:17 -0800 |
commit | 888aa8d5b7288496343e2700f94c56ddd25f2e65 (patch) | |
tree | 039208f05cd5d6dc550ce960356e63a358292269 /tests | |
parent | 953392c0a107a087a774e0cc62101c2e541c8ef3 (diff) | |
download | paramiko-888aa8d5b7288496343e2700f94c56ddd25f2e65.tar.gz |
[project @ robey@lag.net-20080124045017-dfqiamorj356btrd]
fix the utf-8 password bug for good (aka bug 177117) and add unit tests
this time.
Diffstat (limited to 'tests')
-rw-r--r-- | tests/test_auth.py | 109 |
1 files changed, 56 insertions, 53 deletions
diff --git a/tests/test_auth.py b/tests/test_auth.py index cc35ee9b..a18c57d0 100644 --- a/tests/test_auth.py +++ b/tests/test_auth.py @@ -48,6 +48,10 @@ class NullServer (ServerInterface): return 'password' if username == 'commie': return 'keyboard-interactive' + if username == 'utf8': + return 'password' + if username == 'non-utf8': + return 'password' return 'publickey' def check_auth_password(self, username, password): @@ -59,7 +63,9 @@ class NullServer (ServerInterface): if self.paranoid_did_public_key: return AUTH_SUCCESSFUL return AUTH_PARTIALLY_SUCCESSFUL - if (username == 'utf8') and (password == u'\u2022'.encode('utf-8')): + if (username == 'utf8') and (password == u'\u2022'): + return AUTH_SUCCESSFUL + if (username == 'non-utf8') and (password == '\xff'): return AUTH_SUCCESSFUL return AUTH_FAILED @@ -99,21 +105,29 @@ class AuthTest (unittest.TestCase): self.ts.close() self.socks.close() self.sockc.close() + + def start_server(self): + host_key = RSAKey.from_private_key_file('tests/test_rsa.key') + self.public_host_key = RSAKey(data=str(host_key)) + self.ts.add_server_key(host_key) + self.event = threading.Event() + self.server = NullServer() + self.assert_(not self.event.isSet()) + self.ts.start_server(self.event, self.server) + + def verify_finished(self): + self.event.wait(1.0) + self.assert_(self.event.isSet()) + self.assert_(self.ts.is_active()) def test_1_bad_auth_type(self): """ verify that we get the right exception when an unsupported auth type is requested. """ - host_key = RSAKey.from_private_key_file('tests/test_rsa.key') - public_host_key = RSAKey(data=str(host_key)) - self.ts.add_server_key(host_key) - event = threading.Event() - server = NullServer() - self.assert_(not event.isSet()) - self.ts.start_server(event, server) + self.start_server() try: - self.tc.connect(hostkey=public_host_key, + self.tc.connect(hostkey=self.public_host_key, username='unknown', password='error') self.assert_(False) except: @@ -126,14 +140,8 @@ class AuthTest (unittest.TestCase): verify that a bad password gets the right exception, and that a retry with the right password works. """ - host_key = RSAKey.from_private_key_file('tests/test_rsa.key') - public_host_key = RSAKey(data=str(host_key)) - self.ts.add_server_key(host_key) - event = threading.Event() - server = NullServer() - self.assert_(not event.isSet()) - self.ts.start_server(event, server) - self.tc.connect(hostkey=public_host_key) + self.start_server() + self.tc.connect(hostkey=self.public_host_key) try: self.tc.auth_password(username='slowdive', password='error') self.assert_(False) @@ -141,43 +149,27 @@ class AuthTest (unittest.TestCase): etype, evalue, etb = sys.exc_info() self.assert_(issubclass(etype, SSHException)) self.tc.auth_password(username='slowdive', password='pygmalion') - event.wait(1.0) - self.assert_(event.isSet()) - self.assert_(self.ts.is_active()) + self.verify_finished() def test_3_multipart_auth(self): """ verify that multipart auth works. """ - host_key = RSAKey.from_private_key_file('tests/test_rsa.key') - public_host_key = RSAKey(data=str(host_key)) - self.ts.add_server_key(host_key) - event = threading.Event() - server = NullServer() - self.assert_(not event.isSet()) - self.ts.start_server(event, server) - self.tc.connect(hostkey=public_host_key) + self.start_server() + self.tc.connect(hostkey=self.public_host_key) remain = self.tc.auth_password(username='paranoid', password='paranoid') self.assertEquals(['publickey'], remain) key = DSSKey.from_private_key_file('tests/test_dss.key') remain = self.tc.auth_publickey(username='paranoid', key=key) self.assertEquals([], remain) - event.wait(1.0) - self.assert_(event.isSet()) - self.assert_(self.ts.is_active()) + self.verify_finished() def test_4_interactive_auth(self): """ verify keyboard-interactive auth works. """ - host_key = RSAKey.from_private_key_file('tests/test_rsa.key') - public_host_key = RSAKey(data=str(host_key)) - self.ts.add_server_key(host_key) - event = threading.Event() - server = NullServer() - self.assert_(not event.isSet()) - self.ts.start_server(event, server) - self.tc.connect(hostkey=public_host_key) + self.start_server() + self.tc.connect(hostkey=self.public_host_key) def handler(title, instructions, prompts): self.got_title = title @@ -188,25 +180,36 @@ class AuthTest (unittest.TestCase): self.assertEquals(self.got_title, 'password') self.assertEquals(self.got_prompts, [('Password', False)]) self.assertEquals([], remain) - event.wait(1.0) - self.assert_(event.isSet()) - self.assert_(self.ts.is_active()) + self.verify_finished() def test_5_interactive_auth_fallback(self): """ verify that a password auth attempt will fallback to "interactive" if password auth isn't supported but interactive is. """ - host_key = RSAKey.from_private_key_file('tests/test_rsa.key') - public_host_key = RSAKey(data=str(host_key)) - self.ts.add_server_key(host_key) - event = threading.Event() - server = NullServer() - self.assert_(not event.isSet()) - self.ts.start_server(event, server) - self.tc.connect(hostkey=public_host_key) + self.start_server() + self.tc.connect(hostkey=self.public_host_key) remain = self.tc.auth_password('commie', 'cat') self.assertEquals([], remain) - event.wait(1.0) - self.assert_(event.isSet()) - self.assert_(self.ts.is_active()) + self.verify_finished() + + def test_6_auth_utf8(self): + """ + verify that utf-8 encoding happens in authentication. + """ + self.start_server() + self.tc.connect(hostkey=self.public_host_key) + remain = self.tc.auth_password('utf8', u'\u2022') + self.assertEquals([], remain) + self.verify_finished() + + def test_7_auth_non_utf8(self): + """ + verify that non-utf-8 encoded passwords can be used for broken + servers. + """ + self.start_server() + self.tc.connect(hostkey=self.public_host_key) + remain = self.tc.auth_password('non-utf8', '\xff') + self.assertEquals([], remain) + self.verify_finished() |