summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--demos/demo_server.py4
-rwxr-xr-xdemos/demo_sftp.py8
-rw-r--r--paramiko/hostkeys.py50
-rw-r--r--paramiko/pkey.py22
-rw-r--r--paramiko/primes.py20
-rw-r--r--paramiko/py3compat.py12
-rw-r--r--tests/test_auth.py34
-rw-r--r--tests/test_buffered_pipe.py28
-rw-r--r--tests/test_client.py60
-rwxr-xr-xtests/test_file.py12
-rw-r--r--tests/test_hostkeys.py48
-rw-r--r--tests/test_kex.py82
-rw-r--r--tests/test_message.py54
-rw-r--r--tests/test_packetizer.py12
-rw-r--r--tests/test_pkey.py112
-rwxr-xr-xtests/test_sftp.py4
-rw-r--r--tests/test_sftp_big.py1
-rw-r--r--tests/test_transport.py176
-rw-r--r--tests/test_util.py34
19 files changed, 398 insertions, 375 deletions
diff --git a/demos/demo_server.py b/demos/demo_server.py
index deb21387..5a41a714 100644
--- a/demos/demo_server.py
+++ b/demos/demo_server.py
@@ -27,7 +27,7 @@ import threading
import traceback
import paramiko
-from paramiko.py3compat import b, u
+from paramiko.py3compat import b, u, decodebytes
# setup logging
@@ -46,7 +46,7 @@ class Server (paramiko.ServerInterface):
'fAu7jJ2d7eothvfeuoRFtJwhUmZDluRdFyhFY/hFAh76PJKGAusIqIQKlkJxMC' + \
'KDqIexkgHAfID/6mqvmnSJf0b5W8v5h2pI/stOSwTQ+pxVhwJ9ctYDhRSlF0iT' + \
'UWT10hcuO4Ks8=')
- good_pub_key = paramiko.RSAKey(data=base64.decodestring(data))
+ good_pub_key = paramiko.RSAKey(data=decodebytes(data))
def __init__(self):
self.event = threading.Event()
diff --git a/demos/demo_sftp.py b/demos/demo_sftp.py
index 2dba1722..d7f28084 100755
--- a/demos/demo_sftp.py
+++ b/demos/demo_sftp.py
@@ -20,6 +20,8 @@
# based on code provided by raymond mosteller (thanks!)
+from __future__ import with_statement
+
import base64
import getpass
import os
@@ -95,13 +97,15 @@ try:
except IOError:
print('(assuming demo_sftp_folder/ already exists)')
sftp.open('demo_sftp_folder/README', 'w').write('This was created by demo_sftp.py.\n')
- data = open('demo_sftp.py', 'r').read()
+ with open('demo_sftp.py', 'r') as f:
+ data = f.read()
sftp.open('demo_sftp_folder/demo_sftp.py', 'w').write(data)
print('created demo_sftp_folder/ on the server')
# copy the README back here
data = sftp.open('demo_sftp_folder/README', 'r').read()
- open('README_demo_sftp', 'w').write(data)
+ with open('README_demo_sftp', 'w') as f:
+ f.write(data)
print('copied README back here')
# BETTER: use the get() and put() methods
diff --git a/paramiko/hostkeys.py b/paramiko/hostkeys.py
index a7f9b430..9da883e6 100644
--- a/paramiko/hostkeys.py
+++ b/paramiko/hostkeys.py
@@ -83,11 +83,11 @@ class HostKeyEntry:
try:
key = b(key)
if keytype == 'ssh-rsa':
- key = RSAKey(data=base64.decodestring(key))
+ key = RSAKey(data=decodebytes(key))
elif keytype == 'ssh-dss':
- key = DSSKey(data=base64.decodestring(key))
+ key = DSSKey(data=decodebytes(key))
elif keytype == 'ecdsa-sha2-nistp256':
- key = ECDSAKey(data=base64.decodestring(key))
+ key = ECDSAKey(data=decodebytes(key))
else:
log.info("Unable to handle key of type %s" % (keytype,))
return None
@@ -173,19 +173,21 @@ class HostKeys (MutableMapping):
@raise IOError: if there was an error reading the file
"""
f = open(filename, 'r')
- for lineno, line in enumerate(f):
- line = line.strip()
- if (len(line) == 0) or (line[0] == '#'):
- continue
- e = HostKeyEntry.from_line(line, lineno)
- if e is not None:
- _hostnames = e.hostnames
- for h in _hostnames:
- if self.check(h, e.key):
- e.hostnames.remove(h)
- if len(e.hostnames):
- self._entries.append(e)
- f.close()
+ try:
+ for lineno, line in enumerate(f):
+ line = line.strip()
+ if (len(line) == 0) or (line[0] == '#'):
+ continue
+ e = HostKeyEntry.from_line(line, lineno)
+ if e is not None:
+ _hostnames = e.hostnames
+ for h in _hostnames:
+ if self.check(h, e.key):
+ e.hostnames.remove(h)
+ if len(e.hostnames):
+ self._entries.append(e)
+ finally:
+ f.close()
def save(self, filename):
"""
@@ -202,11 +204,13 @@ class HostKeys (MutableMapping):
@since: 1.6.1
"""
f = open(filename, 'w')
- for e in self._entries:
- line = e.to_line()
- if line:
- f.write(line)
- f.close()
+ try:
+ for e in self._entries:
+ line = e.to_line()
+ if line:
+ f.write(line)
+ finally:
+ f.close()
def lookup(self, hostname):
"""
@@ -362,10 +366,10 @@ class HostKeys (MutableMapping):
else:
if salt.startswith('|1|'):
salt = salt.split('|')[2]
- salt = base64.decodestring(b(salt))
+ salt = decodebytes(b(salt))
assert len(salt) == SHA.digest_size
hmac = HMAC.HMAC(salt, b(hostname), SHA).digest()
- hostkey = '|1|%s|%s' % (u(base64.encodestring(salt)), u(base64.encodestring(hmac)))
+ hostkey = '|1|%s|%s' % (u(encodebytes(salt)), u(encodebytes(hmac)))
return hostkey.replace('\n', '')
hash_host = staticmethod(hash_host)
diff --git a/paramiko/pkey.py b/paramiko/pkey.py
index 53361d02..3d786aec 100644
--- a/paramiko/pkey.py
+++ b/paramiko/pkey.py
@@ -148,7 +148,7 @@ class PKey (object):
@return: a base64 string containing the public part of the key.
@rtype: str
"""
- return u(base64.encodestring(self.asbytes())).replace('\n', '')
+ return u(encodebytes(self.asbytes())).replace('\n', '')
def sign_ssh_data(self, rng, data):
"""
@@ -283,8 +283,10 @@ class PKey (object):
@raise SSHException: if the key file is invalid.
"""
f = open(filename, 'r')
- data = self._read_private_key(tag, f, password)
- f.close()
+ try:
+ data = self._read_private_key(tag, f, password)
+ finally:
+ f.close()
return data
def _read_private_key(self, tag, f, password=None):
@@ -309,7 +311,7 @@ class PKey (object):
end += 1
# if we trudged to the end of the file, just try to cope.
try:
- data = base64.decodestring(b(''.join(lines[start:end])))
+ data = decodebytes(b(''.join(lines[start:end])))
except base64.binascii.Error:
raise SSHException('base64 decoding error: ' + str(sys.exc_info()[1]))
if 'proc-type' not in headers:
@@ -353,10 +355,12 @@ class PKey (object):
@raise IOError: if there was an error writing the file.
"""
f = open(filename, 'w', o600)
- # grrr... the mode doesn't always take hold
- os.chmod(filename, o600)
- self._write_private_key(tag, f, data, password)
- f.close()
+ try:
+ # grrr... the mode doesn't always take hold
+ os.chmod(filename, o600)
+ self._write_private_key(tag, f, data, password)
+ finally:
+ f.close()
def _write_private_key(self, tag, f, data, password=None):
f.write('-----BEGIN %s PRIVATE KEY-----\n' % tag)
@@ -378,7 +382,7 @@ class PKey (object):
f.write('Proc-Type: 4,ENCRYPTED\n')
f.write('DEK-Info: %s,%s\n' % (cipher_name, u(hexlify(salt)).upper()))
f.write('\n')
- s = u(base64.encodestring(data))
+ s = u(encodebytes(data))
# re-wrap to 64-char lines
s = ''.join(s.split('\n'))
s = '\n'.join([s[i : i+64] for i in range(0, len(s), 64)])
diff --git a/paramiko/primes.py b/paramiko/primes.py
index 4db6d52d..13ec52d0 100644
--- a/paramiko/primes.py
+++ b/paramiko/primes.py
@@ -114,15 +114,17 @@ class ModulusPack (object):
"""
self.pack = {}
f = open(filename, 'r')
- for line in f:
- line = line.strip()
- if (len(line) == 0) or (line[0] == '#'):
- continue
- try:
- self._parse_modulus(line)
- except:
- continue
- f.close()
+ try:
+ for line in f:
+ line = line.strip()
+ if (len(line) == 0) or (line[0] == '#'):
+ continue
+ try:
+ self._parse_modulus(line)
+ except:
+ continue
+ finally:
+ f.close()
def get_modulus(self, min, prefer, max):
bitsizes = sorted(self.pack.keys())
diff --git a/paramiko/py3compat.py b/paramiko/py3compat.py
index 8a01ba08..0aad3618 100644
--- a/paramiko/py3compat.py
+++ b/paramiko/py3compat.py
@@ -1,6 +1,9 @@
import sys
+import base64
-__all__ = ['PY3', 'string_types', 'integer_types', 'text_type', 'bytes_types', 'bytes', 'long', 'input', 'bytestring', 'byte_ord', 'byte_chr', 'byte_mask', 'b', 'u', 'b2s', 'StringIO', 'BytesIO', 'is_callable', 'MAXSIZE', 'next']
+__all__ = ['PY3', 'string_types', 'integer_types', 'text_type', 'bytes_types', 'bytes', 'long', 'input',
+ 'decodebytes', 'encodebytes', 'bytestring', 'byte_ord', 'byte_chr', 'byte_mask',
+ 'b', 'u', 'b2s', 'StringIO', 'BytesIO', 'is_callable', 'MAXSIZE', 'next']
PY3 = sys.version_info[0] >= 3
@@ -12,8 +15,11 @@ if PY3:
bytes = bytes
bytes_types = bytes
integer_types = int
- long = int
+ class long(int):
+ pass
input = input
+ decodebytes = base64.decodebytes
+ encodebytes = base64.encodebytes
def bytestring(s):
return s
@@ -72,6 +78,8 @@ else:
integer_types = (int, long)
long = long
input = raw_input
+ decodebytes = base64.decodestring
+ encodebytes = base64.encodestring
def bytestring(s): # NOQA
if isinstance(s, unicode):
diff --git a/tests/test_auth.py b/tests/test_auth.py
index 586289ba..d26b1807 100644
--- a/tests/test_auth.py
+++ b/tests/test_auth.py
@@ -119,13 +119,13 @@ class AuthTest (unittest.TestCase):
self.ts.add_server_key(host_key)
self.event = threading.Event()
self.server = NullServer()
- self.assert_(not self.event.isSet())
+ self.assertTrue(not self.event.isSet())
self.ts.start_server(self.event, self.server)
def verify_finished(self):
self.event.wait(1.0)
- self.assert_(self.event.isSet())
- self.assert_(self.ts.is_active())
+ self.assertTrue(self.event.isSet())
+ self.assertTrue(self.ts.is_active())
def test_1_bad_auth_type(self):
"""
@@ -136,11 +136,11 @@ class AuthTest (unittest.TestCase):
try:
self.tc.connect(hostkey=self.public_host_key,
username='unknown', password='error')
- self.assert_(False)
+ self.assertTrue(False)
except:
etype, evalue, etb = sys.exc_info()
- self.assertEquals(BadAuthenticationType, etype)
- self.assertEquals(['publickey'], evalue.allowed_types)
+ self.assertEqual(BadAuthenticationType, etype)
+ self.assertEqual(['publickey'], evalue.allowed_types)
def test_2_bad_password(self):
"""
@@ -151,10 +151,10 @@ class AuthTest (unittest.TestCase):
self.tc.connect(hostkey=self.public_host_key)
try:
self.tc.auth_password(username='slowdive', password='error')
- self.assert_(False)
+ self.assertTrue(False)
except:
etype, evalue, etb = sys.exc_info()
- self.assert_(issubclass(etype, AuthenticationException))
+ self.assertTrue(issubclass(etype, AuthenticationException))
self.tc.auth_password(username='slowdive', password='pygmalion')
self.verify_finished()
@@ -165,10 +165,10 @@ class AuthTest (unittest.TestCase):
self.start_server()
self.tc.connect(hostkey=self.public_host_key)
remain = self.tc.auth_password(username='paranoid', password='paranoid')
- self.assertEquals(['publickey'], remain)
+ self.assertEqual(['publickey'], remain)
key = DSSKey.from_private_key_file(test_path('test_dss.key'))
remain = self.tc.auth_publickey(username='paranoid', key=key)
- self.assertEquals([], remain)
+ self.assertEqual([], remain)
self.verify_finished()
def test_4_interactive_auth(self):
@@ -184,9 +184,9 @@ class AuthTest (unittest.TestCase):
self.got_prompts = prompts
return ['cat']
remain = self.tc.auth_interactive('commie', handler)
- self.assertEquals(self.got_title, 'password')
- self.assertEquals(self.got_prompts, [('Password', False)])
- self.assertEquals([], remain)
+ self.assertEqual(self.got_title, 'password')
+ self.assertEqual(self.got_prompts, [('Password', False)])
+ self.assertEqual([], remain)
self.verify_finished()
def test_5_interactive_auth_fallback(self):
@@ -197,7 +197,7 @@ class AuthTest (unittest.TestCase):
self.start_server()
self.tc.connect(hostkey=self.public_host_key)
remain = self.tc.auth_password('commie', 'cat')
- self.assertEquals([], remain)
+ self.assertEqual([], remain)
self.verify_finished()
def test_6_auth_utf8(self):
@@ -207,7 +207,7 @@ class AuthTest (unittest.TestCase):
self.start_server()
self.tc.connect(hostkey=self.public_host_key)
remain = self.tc.auth_password('utf8', _pwd)
- self.assertEquals([], remain)
+ self.assertEqual([], remain)
self.verify_finished()
def test_7_auth_non_utf8(self):
@@ -218,7 +218,7 @@ class AuthTest (unittest.TestCase):
self.start_server()
self.tc.connect(hostkey=self.public_host_key)
remain = self.tc.auth_password('non-utf8', '\xff')
- self.assertEquals([], remain)
+ self.assertEqual([], remain)
self.verify_finished()
def test_8_auth_gets_disconnected(self):
@@ -232,4 +232,4 @@ class AuthTest (unittest.TestCase):
remain = self.tc.auth_password('bad-server', 'hello')
except:
etype, evalue, etb = sys.exc_info()
- self.assert_(issubclass(etype, AuthenticationException))
+ self.assertTrue(issubclass(etype, AuthenticationException))
diff --git a/tests/test_buffered_pipe.py b/tests/test_buffered_pipe.py
index 7f48b705..5a088d80 100644
--- a/tests/test_buffered_pipe.py
+++ b/tests/test_buffered_pipe.py
@@ -45,39 +45,39 @@ def close_thread(pipe):
class BufferedPipeTest(ParamikoTest):
def test_1_buffered_pipe(self):
p = BufferedPipe()
- self.assert_(not p.read_ready())
+ self.assertTrue(not p.read_ready())
p.feed('hello.')
- self.assert_(p.read_ready())
+ self.assertTrue(p.read_ready())
data = p.read(6)
- self.assertEquals(b('hello.'), data)
+ self.assertEqual(b('hello.'), data)
p.feed('plus/minus')
- self.assertEquals(b('plu'), p.read(3))
- self.assertEquals(b('s/m'), p.read(3))
- self.assertEquals(b('inus'), p.read(4))
+ self.assertEqual(b('plu'), p.read(3))
+ self.assertEqual(b('s/m'), p.read(3))
+ self.assertEqual(b('inus'), p.read(4))
p.close()
- self.assert_(not p.read_ready())
- self.assertEquals(b(''), p.read(1))
+ self.assertTrue(not p.read_ready())
+ self.assertEqual(b(''), p.read(1))
def test_2_delay(self):
p = BufferedPipe()
- self.assert_(not p.read_ready())
+ self.assertTrue(not p.read_ready())
threading.Thread(target=delay_thread, args=(p,)).start()
- self.assertEquals(b('a'), p.read(1, 0.1))
+ self.assertEqual(b('a'), p.read(1, 0.1))
try:
p.read(1, 0.1)
- self.assert_(False)
+ self.assertTrue(False)
except PipeTimeout:
pass
- self.assertEquals(b('b'), p.read(1, 1.0))
- self.assertEquals(b(''), p.read(1))
+ self.assertEqual(b('b'), p.read(1, 1.0))
+ self.assertEqual(b(''), p.read(1))
def test_3_close_while_reading(self):
p = BufferedPipe()
threading.Thread(target=close_thread, args=(p,)).start()
data = p.read(1, 1.0)
- self.assertEquals(b(''), data)
+ self.assertEqual(b(''), data)
def test_4_or_pipe(self):
p = pipe.make_pipe()
diff --git a/tests/test_client.py b/tests/test_client.py
index b77b90d7..e96a426f 100644
--- a/tests/test_client.py
+++ b/tests/test_client.py
@@ -95,10 +95,10 @@ class SSHClientTest (unittest.TestCase):
self.tc.connect(self.addr, self.port, username='slowdive', password='pygmalion')
self.event.wait(1.0)
- self.assert_(self.event.isSet())
- self.assert_(self.ts.is_active())
- self.assertEquals('slowdive', self.ts.get_username())
- self.assertEquals(True, self.ts.is_authenticated())
+ self.assertTrue(self.event.isSet())
+ self.assertTrue(self.ts.is_active())
+ self.assertEqual('slowdive', self.ts.get_username())
+ self.assertEqual(True, self.ts.is_authenticated())
stdin, stdout, stderr = self.tc.exec_command('yes')
schan = self.ts.accept(1.0)
@@ -107,10 +107,10 @@ class SSHClientTest (unittest.TestCase):
schan.send_stderr('This is on stderr.\n')
schan.close()
- self.assertEquals('Hello there.\n', stdout.readline())
- self.assertEquals('', stdout.readline())
- self.assertEquals('This is on stderr.\n', stderr.readline())
- self.assertEquals('', stderr.readline())
+ self.assertEqual('Hello there.\n', stdout.readline())
+ self.assertEqual('', stdout.readline())
+ self.assertEqual('This is on stderr.\n', stderr.readline())
+ self.assertEqual('', stderr.readline())
stdin.close()
stdout.close()
@@ -128,10 +128,10 @@ class SSHClientTest (unittest.TestCase):
self.tc.connect(self.addr, self.port, username='slowdive', key_filename=test_path('test_dss.key'))
self.event.wait(1.0)
- self.assert_(self.event.isSet())
- self.assert_(self.ts.is_active())
- self.assertEquals('slowdive', self.ts.get_username())
- self.assertEquals(True, self.ts.is_authenticated())
+ self.assertTrue(self.event.isSet())
+ self.assertTrue(self.ts.is_active())
+ self.assertEqual('slowdive', self.ts.get_username())
+ self.assertEqual(True, self.ts.is_authenticated())
stdin, stdout, stderr = self.tc.exec_command('yes')
schan = self.ts.accept(1.0)
@@ -140,10 +140,10 @@ class SSHClientTest (unittest.TestCase):
schan.send_stderr('This is on stderr.\n')
schan.close()
- self.assertEquals('Hello there.\n', stdout.readline())
- self.assertEquals('', stdout.readline())
- self.assertEquals('This is on stderr.\n', stderr.readline())
- self.assertEquals('', stderr.readline())
+ self.assertEqual('Hello there.\n', stdout.readline())
+ self.assertEqual('', stdout.readline())
+ self.assertEqual('This is on stderr.\n', stderr.readline())
+ self.assertEqual('', stderr.readline())
stdin.close()
stdout.close()
@@ -161,10 +161,10 @@ class SSHClientTest (unittest.TestCase):
self.tc.connect(self.addr, self.port, username='slowdive', key_filename=[ test_path('test_rsa.key'), test_path('test_dss.key') ])
self.event.wait(1.0)
- self.assert_(self.event.isSet())
- self.assert_(self.ts.is_active())
- self.assertEquals('slowdive', self.ts.get_username())
- self.assertEquals(True, self.ts.is_authenticated())
+ self.assertTrue(self.event.isSet())
+ self.assertTrue(self.ts.is_active())
+ self.assertEqual('slowdive', self.ts.get_username())
+ self.assertEqual(True, self.ts.is_authenticated())
def test_4_auto_add_policy(self):
"""
@@ -175,16 +175,16 @@ class SSHClientTest (unittest.TestCase):
self.tc = paramiko.SSHClient()
self.tc.set_missing_host_key_policy(paramiko.AutoAddPolicy())
- self.assertEquals(0, len(self.tc.get_host_keys()))
+ self.assertEqual(0, len(self.tc.get_host_keys()))
self.tc.connect(self.addr, self.port, username='slowdive', password='pygmalion')
self.event.wait(1.0)
- self.assert_(self.event.isSet())
- self.assert_(self.ts.is_active())
- self.assertEquals('slowdive', self.ts.get_username())
- self.assertEquals(True, self.ts.is_authenticated())
- self.assertEquals(1, len(self.tc.get_host_keys()))
- self.assertEquals(public_host_key, self.tc.get_host_keys()['[%s]:%d' % (self.addr, self.port)]['ssh-rsa'])
+ self.assertTrue(self.event.isSet())
+ self.assertTrue(self.ts.is_active())
+ self.assertEqual('slowdive', self.ts.get_username())
+ self.assertEqual(True, self.ts.is_authenticated())
+ self.assertEqual(1, len(self.tc.get_host_keys()))
+ self.assertEqual(public_host_key, self.tc.get_host_keys()['[%s]:%d' % (self.addr, self.port)]['ssh-rsa'])
def test_5_cleanup(self):
"""
@@ -196,12 +196,12 @@ class SSHClientTest (unittest.TestCase):
self.tc = paramiko.SSHClient()
self.tc.set_missing_host_key_policy(paramiko.AutoAddPolicy())
- self.assertEquals(0, len(self.tc.get_host_keys()))
+ self.assertEqual(0, len(self.tc.get_host_keys()))
self.tc.connect(self.addr, self.port, username='slowdive', password='pygmalion')
self.event.wait(1.0)
- self.assert_(self.event.isSet())
- self.assert_(self.ts.is_active())
+ self.assertTrue(self.event.isSet())
+ self.assertTrue(self.ts.is_active())
p = weakref.ref(self.tc._transport.packetizer)
self.assertTrue(p() is not None)
diff --git a/tests/test_file.py b/tests/test_file.py
index 0430040c..33a49130 100755
--- a/tests/test_file.py
+++ b/tests/test_file.py
@@ -54,7 +54,7 @@ class BufferedFileTest (unittest.TestCase):
f = LoopbackFile('r')
try:
f.write('hi')
- self.assert_(False, 'no exception on write to read-only file')
+ self.assertTrue(False, 'no exception on write to read-only file')
except:
pass
f.close()
@@ -62,7 +62,7 @@ class BufferedFileTest (unittest.TestCase):
f = LoopbackFile('w')
try:
f.read(1)
- self.assert_(False, 'no exception to read from write-only file')
+ self.assertTrue(False, 'no exception to read from write-only file')
except:
pass
f.close()
@@ -81,12 +81,12 @@ class BufferedFileTest (unittest.TestCase):
f.close()
try:
f.readline()
- self.assert_(False, 'no exception on readline of closed file')
+ self.assertTrue(False, 'no exception on readline of closed file')
except IOError:
pass
- self.assert_(linefeed_byte in f.newlines)
- self.assert_(crlf in f.newlines)
- self.assert_(cr_byte not in f.newlines)
+ self.assertTrue(linefeed_byte in f.newlines)
+ self.assertTrue(crlf in f.newlines)
+ self.assertTrue(cr_byte not in f.newlines)
def test_3_lf(self):
"""
diff --git a/tests/test_hostkeys.py b/tests/test_hostkeys.py
index 66b41b6f..a7621d8b 100644
--- a/tests/test_hostkeys.py
+++ b/tests/test_hostkeys.py
@@ -25,7 +25,7 @@ from binascii import hexlify
import os
import unittest
import paramiko
-from paramiko.py3compat import b
+from paramiko.py3compat import b, decodebytes
test_hosts_file = """\
@@ -65,42 +65,42 @@ class HostKeysTest (unittest.TestCase):
def test_1_load(self):
hostdict = paramiko.HostKeys('hostfile.temp')
- self.assertEquals(2, len(hostdict))
- self.assertEquals(1, len(list(hostdict.values())[0]))
- self.assertEquals(1, len(list(hostdict.values())[1]))
+ self.assertEqual(2, len(hostdict))
+ self.assertEqual(1, len(list(hostdict.values())[0]))
+ self.assertEqual(1, len(list(hostdict.values())[1]))
fp = hexlify(hostdict['secure.example.com']['ssh-rsa'].get_fingerprint()).upper()
- self.assertEquals(b('E6684DB30E109B67B70FF1DC5C7F1363'), fp)
+ self.assertEqual(b('E6684DB30E109B67B70FF1DC5C7F1363'), fp)
def test_2_add(self):
hostdict = paramiko.HostKeys('hostfile.temp')
hh = '|1|BMsIC6cUIP2zBuXR3t2LRcJYjzM=|hpkJMysjTk/+zzUUzxQEa2ieq6c='
- key = paramiko.RSAKey(data=base64.decodestring(keyblob))
+ key = paramiko.RSAKey(data=decodebytes(keyblob))
hostdict.add(hh, 'ssh-rsa', key)
- self.assertEquals(3, len(list(hostdict)))
+ self.assertEqual(3, len(list(hostdict)))
x = hostdict['foo.example.com']
fp = hexlify(x['ssh-rsa'].get_fingerprint()).upper()
- self.assertEquals(b('7EC91BB336CB6D810B124B1353C32396'), fp)
- self.assert_(hostdict.check('foo.example.com', key))
+ self.assertEqual(b('7EC91BB336CB6D810B124B1353C32396'), fp)
+ self.assertTrue(hostdict.check('foo.example.com', key))
def test_3_dict(self):
hostdict = paramiko.HostKeys('hostfile.temp')
- self.assert_('secure.example.com' in hostdict)
- self.assert_('not.example.com' not in hostdict)
- self.assert_('secure.example.com' in hostdict)
- self.assert_('not.example.com' not in hostdict)
+ self.assertTrue('secure.example.com' in hostdict)
+ self.assertTrue('not.example.com' not in hostdict)
+ self.assertTrue('secure.example.com' in hostdict)
+ self.assertTrue('not.example.com' not in hostdict)
x = hostdict.get('secure.example.com', None)
- self.assert_(x is not None)
+ self.assertTrue(x is not None)
fp = hexlify(x['ssh-rsa'].get_fingerprint()).upper()
- self.assertEquals(b('E6684DB30E109B67B70FF1DC5C7F1363'), fp)
+ self.assertEqual(b('E6684DB30E109B67B70FF1DC5C7F1363'), fp)
i = 0
for key in hostdict:
i += 1
- self.assertEquals(2, i)
+ self.assertEqual(2, i)
def test_4_dict_set(self):
hostdict = paramiko.HostKeys('hostfile.temp')
- key = paramiko.RSAKey(data=base64.decodestring(keyblob))
- key_dss = paramiko.DSSKey(data=base64.decodestring(keyblob_dss))
+ key = paramiko.RSAKey(data=decodebytes(keyblob))
+ key_dss = paramiko.DSSKey(data=decodebytes(keyblob_dss))
hostdict['secure.example.com'] = {
'ssh-rsa': key,
'ssh-dss': key_dss
@@ -108,11 +108,11 @@ class HostKeysTest (unittest.TestCase):
hostdict['fake.example.com'] = {}
hostdict['fake.example.com']['ssh-rsa'] = key
- self.assertEquals(3, len(hostdict))
- self.assertEquals(2, len(list(hostdict.values())[0]))
- self.assertEquals(1, len(list(hostdict.values())[1]))
- self.assertEquals(1, len(list(hostdict.values())[2]))
+ self.assertEqual(3, len(hostdict))
+ self.assertEqual(2, len(list(hostdict.values())[0]))
+ self.assertEqual(1, len(list(hostdict.values())[1]))
+ self.assertEqual(1, len(list(hostdict.values())[2]))
fp = hexlify(hostdict['secure.example.com']['ssh-rsa'].get_fingerprint()).upper()
- self.assertEquals(b('7EC91BB336CB6D810B124B1353C32396'), fp)
+ self.assertEqual(b('7EC91BB336CB6D810B124B1353C32396'), fp)
fp = hexlify(hostdict['secure.example.com']['ssh-dss'].get_fingerprint()).upper()
- self.assertEquals(b('4478F0B9A23CC5182009FF755BC1D26C'), fp)
+ self.assertEqual(b('4478F0B9A23CC5182009FF755BC1D26C'), fp)
diff --git a/tests/test_kex.py b/tests/test_kex.py
index f7cb0647..dbe377a4 100644
--- a/tests/test_kex.py
+++ b/tests/test_kex.py
@@ -92,8 +92,8 @@ class KexTest (unittest.TestCase):
kex = KexGroup1(transport)
kex.start_kex()
x = b('1E000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D4')
- self.assertEquals(x, hexlify(transport._message.asbytes()).upper())
- self.assertEquals((paramiko.kex_group1._MSG_KEXDH_REPLY,), transport._expect)
+ self.assertEqual(x, hexlify(transport._message.asbytes()).upper())
+ self.assertEqual((paramiko.kex_group1._MSG_KEXDH_REPLY,), transport._expect)
# fake "reply"
msg = Message()
@@ -103,17 +103,17 @@ class KexTest (unittest.TestCase):
msg.rewind()
kex.parse_next(paramiko.kex_group1._MSG_KEXDH_REPLY, msg)
H = b('03079780F3D3AD0B3C6DB30C8D21685F367A86D2')
- self.assertEquals(self.K, transport._K)
- self.assertEquals(H, hexlify(transport._H).upper())
- self.assertEquals((b('fake-host-key'), b('fake-sig')), transport._verify)
- self.assert_(transport._activated)
+ self.assertEqual(self.K, transport._K)
+ self.assertEqual(H, hexlify(transport._H).upper())
+ self.assertEqual((b('fake-host-key'), b('fake-sig')), transport._verify)
+ self.assertTrue(transport._activated)
def test_2_group1_server(self):
transport = FakeTransport()
transport.server_mode = True
kex = KexGroup1(transport)
kex.start_kex()
- self.assertEquals((paramiko.kex_group1._MSG_KEXDH_INIT,), transport._expect)
+ self.assertEqual((paramiko.kex_group1._MSG_KEXDH_INIT,), transport._expect)
msg = Message()
msg.add_mpint(69)
@@ -121,10 +121,10 @@ class KexTest (unittest.TestCase):
kex.parse_next(paramiko.kex_group1._MSG_KEXDH_INIT, msg)
H = b('B16BF34DD10945EDE84E9C1EF24A14BFDC843389')
x = b('1F0000000866616B652D6B6579000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D40000000866616B652D736967')
- self.assertEquals(self.K, transport._K)
- self.assertEquals(H, hexlify(transport._H).upper())
- self.assertEquals(x, hexlify(transport._message.asbytes()).upper())
- self.assert_(transport._activated)
+ self.assertEqual(self.K, transport._K)
+ self.assertEqual(H, hexlify(transport._H).upper())
+ self.assertEqual(x, hexlify(transport._message.asbytes()).upper())
+ self.assertTrue(transport._activated)
def test_3_gex_client(self):
transport = FakeTransport()
@@ -132,8 +132,8 @@ class KexTest (unittest.TestCase):
kex = KexGex(transport)
kex.start_kex()
x = b('22000004000000080000002000')
- self.assertEquals(x, hexlify(transport._message.asbytes()).upper())
- self.assertEquals((paramiko.kex_gex._MSG_KEXDH_GEX_GROUP,), transport._expect)
+ self.assertEqual(x, hexlify(transport._message.asbytes()).upper())
+ self.assertEqual((paramiko.kex_gex._MSG_KEXDH_GEX_GROUP,), transport._expect)
msg = Message()
msg.add_mpint(FakeModulusPack.P)
@@ -141,8 +141,8 @@ class KexTest (unittest.TestCase):
msg.rewind()
kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_GROUP, msg)
x = b('20000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D4')
- self.assertEquals(x, hexlify(transport._message.asbytes()).upper())
- self.assertEquals((paramiko.kex_gex._MSG_KEXDH_GEX_REPLY,), transport._expect)
+ self.assertEqual(x, hexlify(transport._message.asbytes()).upper())
+ self.assertEqual((paramiko.kex_gex._MSG_KEXDH_GEX_REPLY,), transport._expect)
msg = Message()
msg.add_string('fake-host-key')
@@ -151,10 +151,10 @@ class KexTest (unittest.TestCase):
msg.rewind()
kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_REPLY, msg)
H = b('A265563F2FA87F1A89BF007EE90D58BE2E4A4BD0')
- self.assertEquals(self.K, transport._K)
- self.assertEquals(H, hexlify(transport._H).upper())
- self.assertEquals((b('fake-host-key'), b('fake-sig')), transport._verify)
- self.assert_(transport._activated)
+ self.assertEqual(self.K, transport._K)
+ self.assertEqual(H, hexlify(transport._H).upper())
+ self.assertEqual((b('fake-host-key'), b('fake-sig')), transport._verify)
+ self.assertTrue(transport._activated)
def test_4_gex_old_client(self):
transport = FakeTransport()
@@ -162,8 +162,8 @@ class KexTest (unittest.TestCase):
kex = KexGex(transport)
kex.start_kex(_test_old_style=True)
x = b('1E00000800')
- self.assertEquals(x, hexlify(transport._message.asbytes()).upper())
- self.assertEquals((paramiko.kex_gex._MSG_KEXDH_GEX_GROUP,), transport._expect)
+ self.assertEqual(x, hexlify(transport._message.asbytes()).upper())
+ self.assertEqual((paramiko.kex_gex._MSG_KEXDH_GEX_GROUP,), transport._expect)
msg = Message()
msg.add_mpint(FakeModulusPack.P)
@@ -171,8 +171,8 @@ class KexTest (unittest.TestCase):
msg.rewind()
kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_GROUP, msg)
x = b('20000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D4')
- self.assertEquals(x, hexlify(transport._message.asbytes()).upper())
- self.assertEquals((paramiko.kex_gex._MSG_KEXDH_GEX_REPLY,), transport._expect)
+ self.assertEqual(x, hexlify(transport._message.asbytes()).upper())
+ self.assertEqual((paramiko.kex_gex._MSG_KEXDH_GEX_REPLY,), transport._expect)
msg = Message()
msg.add_string('fake-host-key')
@@ -181,17 +181,17 @@ class KexTest (unittest.TestCase):
msg.rewind()
kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_REPLY, msg)
H = b('807F87B269EF7AC5EC7E75676808776A27D5864C')
- self.assertEquals(self.K, transport._K)
- self.assertEquals(H, hexlify(transport._H).upper())
- self.assertEquals((b('fake-host-key'), b('fake-sig')), transport._verify)
- self.assert_(transport._activated)
+ self.assertEqual(self.K, transport._K)
+ self.assertEqual(H, hexlify(transport._H).upper())
+ self.assertEqual((b('fake-host-key'), b('fake-sig')), transport._verify)
+ self.assertTrue(transport._activated)
def test_5_gex_server(self):
transport = FakeTransport()
transport.server_mode = True
kex = KexGex(transport)
kex.start_kex()
- self.assertEquals((paramiko.kex_gex._MSG_KEXDH_GEX_REQUEST, paramiko.kex_gex._MSG_KEXDH_GEX_REQUEST_OLD), transport._expect)
+ self.assertEqual((paramiko.kex_gex._MSG_KEXDH_GEX_REQUEST, paramiko.kex_gex._MSG_KEXDH_GEX_REQUEST_OLD), transport._expect)
msg = Message()
msg.add_int(1024)
@@ -200,8 +200,8 @@ class KexTest (unittest.TestCase):
msg.rewind()
kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_REQUEST, msg)
x = b('1F0000008100FFFFFFFFFFFFFFFFC90FDAA22168C234C4C6628B80DC1CD129024E088A67CC74020BBEA63B139B22514A08798E3404DDEF9519B3CD3A431B302B0A6DF25F14374FE1356D6D51C245E485B576625E7EC6F44C42E9A637ED6B0BFF5CB6F406B7EDEE386BFB5A899FA5AE9F24117C4B1FE649286651ECE65381FFFFFFFFFFFFFFFF0000000102')
- self.assertEquals(x, hexlify(transport._message.asbytes()).upper())
- self.assertEquals((paramiko.kex_gex._MSG_KEXDH_GEX_INIT,), transport._expect)
+ self.assertEqual(x, hexlify(transport._message.asbytes()).upper())
+ self.assertEqual((paramiko.kex_gex._MSG_KEXDH_GEX_INIT,), transport._expect)
msg = Message()
msg.add_mpint(12345)
@@ -210,25 +210,25 @@ class KexTest (unittest.TestCase):
K = 67592995013596137876033460028393339951879041140378510871612128162185209509220726296697886624612526735888348020498716482757677848959420073720160491114319163078862905400020959196386947926388406687288901564192071077389283980347784184487280885335302632305026248574716290537036069329724382811853044654824945750581
H = b('CE754197C21BF3452863B4F44D0B3951F12516EF')
x = b('210000000866616B652D6B6579000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D40000000866616B652D736967')
- self.assertEquals(K, transport._K)
- self.assertEquals(H, hexlify(transport._H).upper())
- self.assertEquals(x, hexlify(transport._message.asbytes()).upper())
- self.assert_(transport._activated)
+ self.assertEqual(K, transport._K)
+ self.assertEqual(H, hexlify(transport._H).upper())
+ self.assertEqual(x, hexlify(transport._message.asbytes()).upper())
+ self.assertTrue(transport._activated)
def test_6_gex_server_with_old_client(self):
transport = FakeTransport()
transport.server_mode = True
kex = KexGex(transport)
kex.start_kex()
- self.assertEquals((paramiko.kex_gex._MSG_KEXDH_GEX_REQUEST, paramiko.kex_gex._MSG_KEXDH_GEX_REQUEST_OLD), transport._expect)
+ self.assertEqual((paramiko.kex_gex._MSG_KEXDH_GEX_REQUEST, paramiko.kex_gex._MSG_KEXDH_GEX_REQUEST_OLD), transport._expect)
msg = Message()
msg.add_int(2048)
msg.rewind()
kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_REQUEST_OLD, msg)
x = b('1F0000008100FFFFFFFFFFFFFFFFC90FDAA22168C234C4C6628B80DC1CD129024E088A67CC74020BBEA63B139B22514A08798E3404DDEF9519B3CD3A431B302B0A6DF25F14374FE1356D6D51C245E485B576625E7EC6F44C42E9A637ED6B0BFF5CB6F406B7EDEE386BFB5A899FA5AE9F24117C4B1FE649286651ECE65381FFFFFFFFFFFFFFFF0000000102')
- self.assertEquals(x, hexlify(transport._message.asbytes()).upper())
- self.assertEquals((paramiko.kex_gex._MSG_KEXDH_GEX_INIT,), transport._expect)
+ self.assertEqual(x, hexlify(transport._message.asbytes()).upper())
+ self.assertEqual((paramiko.kex_gex._MSG_KEXDH_GEX_INIT,), transport._expect)
msg = Message()
msg.add_mpint(12345)
@@ -237,7 +237,7 @@ class KexTest (unittest.TestCase):
K = 67592995013596137876033460028393339951879041140378510871612128162185209509220726296697886624612526735888348020498716482757677848959420073720160491114319163078862905400020959196386947926388406687288901564192071077389283980347784184487280885335302632305026248574716290537036069329724382811853044654824945750581
H = b('B41A06B2E59043CEFC1AE16EC31F1E2D12EC455B')
x = b('210000000866616B652D6B6579000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D40000000866616B652D736967')
- self.assertEquals(K, transport._K)
- self.assertEquals(H, hexlify(transport._H).upper())
- self.assertEquals(x, hexlify(transport._message.asbytes()).upper())
- self.assert_(transport._activated)
+ self.assertEqual(K, transport._K)
+ self.assertEqual(H, hexlify(transport._H).upper())
+ self.assertEqual(x, hexlify(transport._message.asbytes()).upper())
+ self.assertTrue(transport._activated)
diff --git a/tests/test_message.py b/tests/test_message.py
index 0aed88c2..f983b4de 100644
--- a/tests/test_message.py
+++ b/tests/test_message.py
@@ -40,7 +40,7 @@ class MessageTest (unittest.TestCase):
msg.add_string('q')
msg.add_string('hello')
msg.add_string('x' * 1000)
- self.assertEquals(msg.asbytes(), self.__a)
+ self.assertEqual(msg.asbytes(), self.__a)
msg = Message()
msg.add_boolean(True)
@@ -49,7 +49,7 @@ class MessageTest (unittest.TestCase):
msg.add_bytes(zero_byte + byte_chr(0x3f))
msg.add_list(['huey', 'dewey', 'louie'])
- self.assertEquals(msg.asbytes(), self.__b)
+ self.assertEqual(msg.asbytes(), self.__b)
msg = Message()
msg.add_int64(5)
@@ -57,29 +57,29 @@ class MessageTest (unittest.TestCase):
msg.add_mpint(17)
msg.add_mpint(0xf5e4d3c2b109)
msg.add_mpint(-0x65e4d3c2b109)
- self.assertEquals(msg.asbytes(), self.__c)
+ self.assertEqual(msg.asbytes(), self.__c)
def test_2_decode(self):
msg = Message(self.__a)
- self.assertEquals(msg.get_int(), 23)
- self.assertEquals(msg.get_int(), 123789456)
- self.assertEquals(msg.get_text(), 'q')
- self.assertEquals(msg.get_text(), 'hello')
- self.assertEquals(msg.get_text(), 'x' * 1000)
+ self.assertEqual(msg.get_int(), 23)
+ self.assertEqual(msg.get_int(), 123789456)
+ self.assertEqual(msg.get_text(), 'q')
+ self.assertEqual(msg.get_text(), 'hello')
+ self.assertEqual(msg.get_text(), 'x' * 1000)
msg = Message(self.__b)
- self.assertEquals(msg.get_boolean(), True)
- self.assertEquals(msg.get_boolean(), False)
- self.assertEquals(msg.get_byte(), byte_chr(0xf3))
- self.assertEquals(msg.get_bytes(2), zero_byte + byte_chr(0x3f))
- self.assertEquals(msg.get_list(), ['huey', 'dewey', 'louie'])
+ self.assertEqual(msg.get_boolean(), True)
+ self.assertEqual(msg.get_boolean(), False)
+ self.assertEqual(msg.get_byte(), byte_chr(0xf3))
+ self.assertEqual(msg.get_bytes(2), zero_byte + byte_chr(0x3f))
+ self.assertEqual(msg.get_list(), ['huey', 'dewey', 'louie'])
msg = Message(self.__c)
- self.assertEquals(msg.get_int64(), 5)
- self.assertEquals(msg.get_int64(), 0xf5e4d3c2b109)
- self.assertEquals(msg.get_mpint(), 17)
- self.assertEquals(msg.get_mpint(), 0xf5e4d3c2b109)
- self.assertEquals(msg.get_mpint(), -0x65e4d3c2b109)
+ self.assertEqual(msg.get_int64(), 5)
+ self.assertEqual(msg.get_int64(), 0xf5e4d3c2b109)
+ self.assertEqual(msg.get_mpint(), 17)
+ self.assertEqual(msg.get_mpint(), 0xf5e4d3c2b109)
+ self.assertEqual(msg.get_mpint(), -0x65e4d3c2b109)
def test_3_add(self):
msg = Message()
@@ -89,16 +89,16 @@ class MessageTest (unittest.TestCase):
msg.add(True)
msg.add('cat')
msg.add(['a', 'b'])
- self.assertEquals(msg.asbytes(), self.__d)
+ self.assertEqual(msg.asbytes(), self.__d)
def test_4_misc(self):
msg = Message(self.__d)
- self.assertEquals(msg.get_int(), 5)
- self.assertEquals(msg.get_int(), 0x1122334455)
- self.assertEquals(msg.get_int(), 0xf00000000000000000)
- self.assertEquals(msg.get_so_far(), self.__d[:29])
- self.assertEquals(msg.get_remainder(), self.__d[29:])
+ self.assertEqual(msg.get_int(), 5)
+ self.assertEqual(msg.get_int(), 0x1122334455)
+ self.assertEqual(msg.get_int(), 0xf00000000000000000)
+ self.assertEqual(msg.get_so_far(), self.__d[:29])
+ self.assertEqual(msg.get_remainder(), self.__d[29:])
msg.rewind()
- self.assertEquals(msg.get_int(), 5)
- self.assertEquals(msg.get_so_far(), self.__d[:4])
- self.assertEquals(msg.get_remainder(), self.__d[4:])
+ self.assertEqual(msg.get_int(), 5)
+ self.assertEqual(msg.get_so_far(), self.__d[:4])
+ self.assertEqual(msg.get_remainder(), self.__d[4:])
diff --git a/tests/test_packetizer.py b/tests/test_packetizer.py
index a4ada72f..43dea38a 100644
--- a/tests/test_packetizer.py
+++ b/tests/test_packetizer.py
@@ -54,8 +54,8 @@ class PacketizerTest (unittest.TestCase):
p.send_message(m)
data = rsock.recv(100)
# 32 + 12 bytes of MAC = 44
- self.assertEquals(44, len(data))
- self.assertEquals(unhexlify(b('439197bd5b50ac2587c2c46bc7e938c0')), data[:16])
+ self.assertEqual(44, len(data))
+ self.assertEqual(unhexlify(b('439197bd5b50ac2587c2c46bc7e938c0')), data[:16])
def test_2_read (self):
rsock = LoopSocket()
@@ -68,7 +68,7 @@ class PacketizerTest (unittest.TestCase):
p.set_inbound_cipher(cipher, 16, SHA, 12, x1f * 20)
wsock.send(unhexlify(b('439197bd5b50ac2587c2c46bc7e938c090d216560d717361387c4c3dfb977de26e03b1a0c21cd641414cb459')))
cmd, m = p.read_message()
- self.assertEquals(100, cmd)
- self.assertEquals(100, m.get_int())
- self.assertEquals(1, m.get_int())
- self.assertEquals(900, m.get_int())
+ self.assertEqual(100, cmd)
+ self.assertEqual(100, m.get_int())
+ self.assertEqual(1, m.get_int())
+ self.assertEqual(900, m.get_int())
diff --git a/tests/test_pkey.py b/tests/test_pkey.py
index 8ab21a3a..f8549468 100644
--- a/tests/test_pkey.py
+++ b/tests/test_pkey.py
@@ -92,157 +92,157 @@ class KeyTest (unittest.TestCase):
from Crypto.Hash import MD5
key = util.generate_key_bytes(MD5, x1234, 'happy birthday', 30)
exp = unhexlify(b('61E1F272F4C1C4561586BD322498C0E924672780F47BB37DDA7D54019E64'))
- self.assertEquals(exp, key)
+ self.assertEqual(exp, key)
def test_2_load_rsa(self):
key = RSAKey.from_private_key_file(test_path('test_rsa.key'))
- self.assertEquals('ssh-rsa', key.get_name())
+ self.assertEqual('ssh-rsa', key.get_name())
exp_rsa = b(FINGER_RSA.split()[1].replace(':', ''))
my_rsa = hexlify(key.get_fingerprint())
- self.assertEquals(exp_rsa, my_rsa)
- self.assertEquals(PUB_RSA.split()[1], key.get_base64())
- self.assertEquals(1024, key.get_bits())
+ self.assertEqual(exp_rsa, my_rsa)
+ self.assertEqual(PUB_RSA.split()[1], key.get_base64())
+ self.assertEqual(1024, key.get_bits())
s = StringIO()
key.write_private_key(s)
- self.assertEquals(RSA_PRIVATE_OUT, s.getvalue())
+ self.assertEqual(RSA_PRIVATE_OUT, s.getvalue())
s.seek(0)
key2 = RSAKey.from_private_key(s)
- self.assertEquals(key, key2)
+ self.assertEqual(key, key2)
def test_3_load_rsa_password(self):
key = RSAKey.from_private_key_file(test_path('test_rsa_password.key'), 'television')
- self.assertEquals('ssh-rsa', key.get_name())
+ self.assertEqual('ssh-rsa', key.get_name())
exp_rsa = b(FINGER_RSA.split()[1].replace(':', ''))
my_rsa = hexlify(key.get_fingerprint())
- self.assertEquals(exp_rsa, my_rsa)
- self.assertEquals(PUB_RSA.split()[1], key.get_base64())
- self.assertEquals(1024, key.get_bits())
+ self.assertEqual(exp_rsa, my_rsa)
+ self.assertEqual(PUB_RSA.split()[1], key.get_base64())
+ self.assertEqual(1024, key.get_bits())
def test_4_load_dss(self):
key = DSSKey.from_private_key_file(test_path('test_dss.key'))
- self.assertEquals('ssh-dss', key.get_name())
+ self.assertEqual('ssh-dss', key.get_name())
exp_dss = b(FINGER_DSS.split()[1].replace(':', ''))
my_dss = hexlify(key.get_fingerprint())
- self.assertEquals(exp_dss, my_dss)
- self.assertEquals(PUB_DSS.split()[1], key.get_base64())
- self.assertEquals(1024, key.get_bits())
+ self.assertEqual(exp_dss, my_dss)
+ self.assertEqual(PUB_DSS.split()[1], key.get_base64())
+ self.assertEqual(1024, key.get_bits())
s = StringIO()
key.write_private_key(s)
- self.assertEquals(DSS_PRIVATE_OUT, s.getvalue())
+ self.assertEqual(DSS_PRIVATE_OUT, s.getvalue())
s.seek(0)
key2 = DSSKey.from_private_key(s)
- self.assertEquals(key, key2)
+ self.assertEqual(key, key2)
def test_5_load_dss_password(self):
key = DSSKey.from_private_key_file(test_path('test_dss_password.key'), 'television')
- self.assertEquals('ssh-dss', key.get_name())
+ self.assertEqual('ssh-dss', key.get_name())
exp_dss = b(FINGER_DSS.split()[1].replace(':', ''))
my_dss = hexlify(key.get_fingerprint())
- self.assertEquals(exp_dss, my_dss)
- self.assertEquals(PUB_DSS.split()[1], key.get_base64())
- self.assertEquals(1024, key.get_bits())
+ self.assertEqual(exp_dss, my_dss)
+ self.assertEqual(PUB_DSS.split()[1], key.get_base64())
+ self.assertEqual(1024, key.get_bits())
def test_6_compare_rsa(self):
# verify that the private & public keys compare equal
key = RSAKey.from_private_key_file(test_path('test_rsa.key'))
- self.assertEquals(key, key)
+ self.assertEqual(key, key)
pub = RSAKey(data=key.asbytes())
- self.assert_(key.can_sign())
- self.assert_(not pub.can_sign())
- self.assertEquals(key, pub)
+ self.assertTrue(key.can_sign())
+ self.assertTrue(not pub.can_sign())
+ self.assertEqual(key, pub)
def test_7_compare_dss(self):
# verify that the private & public keys compare equal
key = DSSKey.from_private_key_file(test_path('test_dss.key'))
- self.assertEquals(key, key)
+ self.assertEqual(key, key)
pub = DSSKey(data=key.asbytes())
- self.assert_(key.can_sign())
- self.assert_(not pub.can_sign())
- self.assertEquals(key, pub)
+ self.assertTrue(key.can_sign())
+ self.assertTrue(not pub.can_sign())
+ self.assertEqual(key, pub)
def test_8_sign_rsa(self):
# verify that the rsa private key can sign and verify
key = RSAKey.from_private_key_file(test_path('test_rsa.key'))
msg = key.sign_ssh_data(rng, b('ice weasels'))
- self.assert_(type(msg) is Message)
+ self.assertTrue(type(msg) is Message)
msg.rewind()
- self.assertEquals('ssh-rsa', msg.get_text())
+ self.assertEqual('ssh-rsa', msg.get_text())
sig = bytes().join([byte_chr(int(x, 16)) for x in SIGNED_RSA.split(':')])
- self.assertEquals(sig, msg.get_binary())
+ self.assertEqual(sig, msg.get_binary())
msg.rewind()
pub = RSAKey(data=key.asbytes())
- self.assert_(pub.verify_ssh_sig(b('ice weasels'), msg))
+ self.assertTrue(pub.verify_ssh_sig(b('ice weasels'), msg))
def test_9_sign_dss(self):
# verify that the dss private key can sign and verify
key = DSSKey.from_private_key_file(test_path('test_dss.key'))
msg = key.sign_ssh_data(rng, b('ice weasels'))
- self.assert_(type(msg) is Message)
+ self.assertTrue(type(msg) is Message)
msg.rewind()
- self.assertEquals('ssh-dss', msg.get_text())
+ self.assertEqual('ssh-dss', msg.get_text())
# can't do the same test as we do for RSA, because DSS signatures
# are usually different each time. but we can test verification
# anyway so it's ok.
- self.assertEquals(40, len(msg.get_binary()))
+ self.assertEqual(40, len(msg.get_binary()))
msg.rewind()
pub = DSSKey(data=key.asbytes())
- self.assert_(pub.verify_ssh_sig(b('ice weasels'), msg))
+ self.assertTrue(pub.verify_ssh_sig(b('ice weasels'), msg))
def test_A_generate_rsa(self):
key = RSAKey.generate(1024)
msg = key.sign_ssh_data(rng, b('jerri blank'))
msg.rewind()
- self.assert_(key.verify_ssh_sig(b('jerri blank'), msg))
+ self.assertTrue(key.verify_ssh_sig(b('jerri blank'), msg))
def test_B_generate_dss(self):
key = DSSKey.generate(1024)
msg = key.sign_ssh_data(rng, b('jerri blank'))
msg.rewind()
- self.assert_(key.verify_ssh_sig(b('jerri blank'), msg))
+ self.assertTrue(key.verify_ssh_sig(b('jerri blank'), msg))
def test_10_load_ecdsa(self):
key = ECDSAKey.from_private_key_file(test_path('test_ecdsa.key'))
- self.assertEquals('ecdsa-sha2-nistp256', key.get_name())
+ self.assertEqual('ecdsa-sha2-nistp256', key.get_name())
exp_ecdsa = b(FINGER_ECDSA.split()[1].replace(':', ''))
my_ecdsa = hexlify(key.get_fingerprint())
- self.assertEquals(exp_ecdsa, my_ecdsa)
- self.assertEquals(PUB_ECDSA.split()[1], key.get_base64())
- self.assertEquals(256, key.get_bits())
+ self.assertEqual(exp_ecdsa, my_ecdsa)
+ self.assertEqual(PUB_ECDSA.split()[1], key.get_base64())
+ self.assertEqual(256, key.get_bits())
s = StringIO()
key.write_private_key(s)
- self.assertEquals(ECDSA_PRIVATE_OUT, s.getvalue())
+ self.assertEqual(ECDSA_PRIVATE_OUT, s.getvalue())
s.seek(0)
key2 = ECDSAKey.from_private_key(s)
- self.assertEquals(key, key2)
+ self.assertEqual(key, key2)
def test_11_load_ecdsa_password(self):
key = ECDSAKey.from_private_key_file(test_path('test_ecdsa_password.key'), b('television'))
- self.assertEquals('ecdsa-sha2-nistp256', key.get_name())
+ self.assertEqual('ecdsa-sha2-nistp256', key.get_name())
exp_ecdsa = b(FINGER_ECDSA.split()[1].replace(':', ''))
my_ecdsa = hexlify(key.get_fingerprint())
- self.assertEquals(exp_ecdsa, my_ecdsa)
- self.assertEquals(PUB_ECDSA.split()[1], key.get_base64())
- self.assertEquals(256, key.get_bits())
+ self.assertEqual(exp_ecdsa, my_ecdsa)
+ self.assertEqual(PUB_ECDSA.split()[1], key.get_base64())
+ self.assertEqual(256, key.get_bits())
def test_12_compare_ecdsa(self):
# verify that the private & public keys compare equal
key = ECDSAKey.from_private_key_file(test_path('test_ecdsa.key'))
- self.assertEquals(key, key)
+ self.assertEqual(key, key)
pub = ECDSAKey(data=key.asbytes())
- self.assert_(key.can_sign())
- self.assert_(not pub.can_sign())
- self.assertEquals(key, pub)
+ self.assertTrue(key.can_sign())
+ self.assertTrue(not pub.can_sign())
+ self.assertEqual(key, pub)
def test_13_sign_ecdsa(self):
# verify that the rsa private key can sign and verify
key = ECDSAKey.from_private_key_file(test_path('test_ecdsa.key'))
msg = key.sign_ssh_data(rng, b('ice weasels'))
- self.assert_(type(msg) is Message)
+ self.assertTrue(type(msg) is Message)
msg.rewind()
- self.assertEquals('ecdsa-sha2-nistp256', msg.get_text())
+ self.assertEqual('ecdsa-sha2-nistp256', msg.get_text())
# ECDSA signatures, like DSS signatures, tend to be different
# each time, so we can't compare against a "known correct"
# signature.
@@ -250,4 +250,4 @@ class KeyTest (unittest.TestCase):
msg.rewind()
pub = ECDSAKey(data=key.asbytes())
- self.assert_(pub.verify_ssh_sig(b('ice weasels'), msg))
+ self.assertTrue(pub.verify_ssh_sig(b('ice weasels'), msg))
diff --git a/tests/test_sftp.py b/tests/test_sftp.py
index c17defaa..b84b3fd6 100755
--- a/tests/test_sftp.py
+++ b/tests/test_sftp.py
@@ -162,8 +162,8 @@ class SFTPTest (unittest.TestCase):
f = sftp.open(FOLDER + '/test', 'w')
try:
self.assertEqual(f.stat().st_size, 0)
- f.close()
finally:
+ f.close()
sftp.remove(FOLDER + '/test')
def test_2_close(self):
@@ -219,8 +219,8 @@ class SFTPTest (unittest.TestCase):
self.assertEqual(f.stat().st_size, 37)
f.seek(-26, f.SEEK_CUR)
self.assertEqual(f.readline(), 'second line\n')
- f.close()
finally:
+ f.close()
sftp.remove(FOLDER + '/append.txt')
def test_5_rename(self):
diff --git a/tests/test_sftp_big.py b/tests/test_sftp_big.py
index a53a6c3d..dc9cba93 100644
--- a/tests/test_sftp_big.py
+++ b/tests/test_sftp_big.py
@@ -262,6 +262,7 @@ class BigSFTPTest (unittest.TestCase):
for i in range(10):
f = sftp.open('%s/hongry.txt' % FOLDER, 'r')
f.prefetch()
+ f.close()
f = sftp.open('%s/hongry.txt' % FOLDER, 'r')
f.prefetch()
for n in range(1024):
diff --git a/tests/test_transport.py b/tests/test_transport.py
index 22a02a8e..397b00ca 100644
--- a/tests/test_transport.py
+++ b/tests/test_transport.py
@@ -132,28 +132,28 @@ class TransportTest(ParamikoTest):
event = threading.Event()
self.server = NullServer()
- self.assert_(not event.isSet())
+ self.assertTrue(not event.isSet())
self.ts.start_server(event, self.server)
self.tc.connect(hostkey=public_host_key,
username='slowdive', password='pygmalion')
event.wait(1.0)
- self.assert_(event.isSet())
- self.assert_(self.ts.is_active())
+ self.assertTrue(event.isSet())
+ self.assertTrue(self.ts.is_active())
def test_1_security_options(self):
o = self.tc.get_security_options()
- self.assertEquals(type(o), SecurityOptions)
- self.assert_(('aes256-cbc', 'blowfish-cbc') != o.ciphers)
+ self.assertEqual(type(o), SecurityOptions)
+ self.assertTrue(('aes256-cbc', 'blowfish-cbc') != o.ciphers)
o.ciphers = ('aes256-cbc', 'blowfish-cbc')
- self.assertEquals(('aes256-cbc', 'blowfish-cbc'), o.ciphers)
+ self.assertEqual(('aes256-cbc', 'blowfish-cbc'), o.ciphers)
try:
o.ciphers = ('aes256-cbc', 'made-up-cipher')
- self.assert_(False)
+ self.assertTrue(False)
except ValueError:
pass
try:
o.ciphers = 23
- self.assert_(False)
+ self.assertTrue(False)
except TypeError:
pass
@@ -162,7 +162,7 @@ class TransportTest(ParamikoTest):
self.tc.H = unhexlify(b('0C8307CDE6856FF30BA93684EB0F04C2520E9ED3'))
self.tc.session_id = self.tc.H
key = self.tc._compute_key('C', 32)
- self.assertEquals(b('207E66594CA87C44ECCBA3B3CD39FDDB378E6FDB0F97C54B2AA0CFBF900CD995'),
+ self.assertEqual(b('207E66594CA87C44ECCBA3B3CD39FDDB378E6FDB0F97C54B2AA0CFBF900CD995'),
hexlify(key).upper())
def test_3_simple(self):
@@ -176,21 +176,21 @@ class TransportTest(ParamikoTest):
self.ts.add_server_key(host_key)
event = threading.Event()
server = NullServer()
- self.assert_(not event.isSet())
- self.assertEquals(None, self.tc.get_username())
- self.assertEquals(None, self.ts.get_username())
- self.assertEquals(False, self.tc.is_authenticated())
- self.assertEquals(False, self.ts.is_authenticated())
+ self.assertTrue(not event.isSet())
+ self.assertEqual(None, self.tc.get_username())
+ self.assertEqual(None, self.ts.get_username())
+ self.assertEqual(False, self.tc.is_authenticated())
+ self.assertEqual(False, self.ts.is_authenticated())
self.ts.start_server(event, server)
self.tc.connect(hostkey=public_host_key,
username='slowdive', password='pygmalion')
event.wait(1.0)
- self.assert_(event.isSet())
- self.assert_(self.ts.is_active())
- self.assertEquals('slowdive', self.tc.get_username())
- self.assertEquals('slowdive', self.ts.get_username())
- self.assertEquals(True, self.tc.is_authenticated())
- self.assertEquals(True, self.ts.is_authenticated())
+ self.assertTrue(event.isSet())
+ self.assertTrue(self.ts.is_active())
+ self.assertEqual('slowdive', self.tc.get_username())
+ self.assertEqual('slowdive', self.ts.get_username())
+ self.assertEqual(True, self.tc.is_authenticated())
+ self.assertEqual(True, self.ts.is_authenticated())
def test_3a_long_banner(self):
"""
@@ -201,14 +201,14 @@ class TransportTest(ParamikoTest):
self.ts.add_server_key(host_key)
event = threading.Event()
server = NullServer()
- self.assert_(not event.isSet())
+ self.assertTrue(not event.isSet())
self.socks.send(LONG_BANNER)
self.ts.start_server(event, server)
self.tc.connect(hostkey=public_host_key,
username='slowdive', password='pygmalion')
event.wait(1.0)
- self.assert_(event.isSet())
- self.assert_(self.ts.is_active())
+ self.assertTrue(event.isSet())
+ self.assertTrue(self.ts.is_active())
def test_4_special(self):
"""
@@ -219,10 +219,10 @@ class TransportTest(ParamikoTest):
options.ciphers = ('aes256-cbc',)
options.digests = ('hmac-md5-96',)
self.setup_test_server(client_options=force_algorithms)
- self.assertEquals('aes256-cbc', self.tc.local_cipher)
- self.assertEquals('aes256-cbc', self.tc.remote_cipher)
- self.assertEquals(12, self.tc.packetizer.get_mac_size_out())
- self.assertEquals(12, self.tc.packetizer.get_mac_size_in())
+ self.assertEqual('aes256-cbc', self.tc.local_cipher)
+ self.assertEqual('aes256-cbc', self.tc.remote_cipher)
+ self.assertEqual(12, self.tc.packetizer.get_mac_size_out())
+ self.assertEqual(12, self.tc.packetizer.get_mac_size_in())
self.tc.send_ignore(1024)
self.tc.renegotiate_keys()
@@ -233,10 +233,10 @@ class TransportTest(ParamikoTest):
verify that the keepalive will be sent.
"""
self.setup_test_server()
- self.assertEquals(None, getattr(self.server, '_global_request', None))
+ self.assertEqual(None, getattr(self.server, '_global_request', None))
self.tc.set_keepalive(1)
time.sleep(2)
- self.assertEquals('keepalive@lag.net', self.server._global_request)
+ self.assertEqual('keepalive@lag.net', self.server._global_request)
def test_6_exec_command(self):
"""
@@ -248,7 +248,7 @@ class TransportTest(ParamikoTest):
schan = self.ts.accept(1.0)
try:
chan.exec_command('no')
- self.assert_(False)
+ self.assertTrue(False)
except SSHException:
pass
@@ -260,11 +260,11 @@ class TransportTest(ParamikoTest):
schan.close()
f = chan.makefile()
- self.assertEquals('Hello there.\n', f.readline())
- self.assertEquals('', f.readline())
+ self.assertEqual('Hello there.\n', f.readline())
+ self.assertEqual('', f.readline())
f = chan.makefile_stderr()
- self.assertEquals('This is on stderr.\n', f.readline())
- self.assertEquals('', f.readline())
+ self.assertEqual('This is on stderr.\n', f.readline())
+ self.assertEqual('', f.readline())
# now try it with combined stdout/stderr
chan = self.tc.open_session()
@@ -276,9 +276,9 @@ class TransportTest(ParamikoTest):
chan.set_combine_stderr(True)
f = chan.makefile()
- self.assertEquals('Hello there.\n', f.readline())
- self.assertEquals('This is on stderr.\n', f.readline())
- self.assertEquals('', f.readline())
+ self.assertEqual('Hello there.\n', f.readline())
+ self.assertEqual('This is on stderr.\n', f.readline())
+ self.assertEqual('', f.readline())
def test_7_invoke_shell(self):
"""
@@ -290,9 +290,9 @@ class TransportTest(ParamikoTest):
schan = self.ts.accept(1.0)
chan.send('communist j. cat\n')
f = schan.makefile()
- self.assertEquals('communist j. cat\n', f.readline())
+ self.assertEqual('communist j. cat\n', f.readline())
chan.close()
- self.assertEquals('', f.readline())
+ self.assertEqual('', f.readline())
def test_8_channel_exception(self):
"""
@@ -304,7 +304,7 @@ class TransportTest(ParamikoTest):
self.fail('expected exception')
except ChannelException:
x = sys.exc_info()[1]
- self.assert_(x.code == OPEN_FAILED_ADMINISTRATIVELY_PROHIBITED)
+ self.assertTrue(x.code == OPEN_FAILED_ADMINISTRATIVELY_PROHIBITED)
def test_9_exit_status(self):
"""
@@ -316,7 +316,7 @@ class TransportTest(ParamikoTest):
schan = self.ts.accept(1.0)
chan.exec_command('yes')
schan.send('Hello there.\n')
- self.assert_(not chan.exit_status_ready())
+ self.assertTrue(not chan.exit_status_ready())
# trigger an EOF
schan.shutdown_read()
schan.shutdown_write()
@@ -324,15 +324,15 @@ class TransportTest(ParamikoTest):
schan.close()
f = chan.makefile()
- self.assertEquals('Hello there.\n', f.readline())
- self.assertEquals('', f.readline())
+ self.assertEqual('Hello there.\n', f.readline())
+ self.assertEqual('', f.readline())
count = 0
while not chan.exit_status_ready():
time.sleep(0.1)
count += 1
if count > 50:
raise Exception("timeout")
- self.assertEquals(23, chan.recv_exit_status())
+ self.assertEqual(23, chan.recv_exit_status())
chan.close()
def test_A_select(self):
@@ -346,9 +346,9 @@ class TransportTest(ParamikoTest):
# nothing should be ready
r, w, e = select.select([chan], [], [], 0.1)
- self.assertEquals([], r)
- self.assertEquals([], w)
- self.assertEquals([], e)
+ self.assertEqual([], r)
+ self.assertEqual([], w)
+ self.assertEqual([], e)
schan.send('hello\n')
@@ -358,17 +358,17 @@ class TransportTest(ParamikoTest):
if chan in r:
break
time.sleep(0.1)
- self.assertEquals([chan], r)
- self.assertEquals([], w)
- self.assertEquals([], e)
+ self.assertEqual([chan], r)
+ self.assertEqual([], w)
+ self.assertEqual([], e)
- self.assertEquals(b('hello\n'), chan.recv(6))
+ self.assertEqual(b('hello\n'), chan.recv(6))
# and, should be dead again now
r, w, e = select.select([chan], [], [], 0.1)
- self.assertEquals([], r)
- self.assertEquals([], w)
- self.assertEquals([], e)
+ self.assertEqual([], r)
+ self.assertEqual([], w)
+ self.assertEqual([], e)
schan.close()
@@ -378,17 +378,17 @@ class TransportTest(ParamikoTest):
if chan in r:
break
time.sleep(0.1)
- self.assertEquals([chan], r)
- self.assertEquals([], w)
- self.assertEquals([], e)
- self.assertEquals(bytes(), chan.recv(16))
+ self.assertEqual([chan], r)
+ self.assertEqual([], w)
+ self.assertEqual([], e)
+ self.assertEqual(bytes(), chan.recv(16))
# make sure the pipe is still open for now...
p = chan._pipe
- self.assertEquals(False, p._closed)
+ self.assertEqual(False, p._closed)
chan.close()
# ...and now is closed.
- self.assertEquals(True, p._closed)
+ self.assertEqual(True, p._closed)
def test_B_renegotiate(self):
"""
@@ -400,7 +400,7 @@ class TransportTest(ParamikoTest):
chan.exec_command('yes')
schan = self.ts.accept(1.0)
- self.assertEquals(self.tc.H, self.tc.session_id)
+ self.assertEqual(self.tc.H, self.tc.session_id)
for i in range(20):
chan.send('x' * 1024)
chan.close()
@@ -410,7 +410,7 @@ class TransportTest(ParamikoTest):
if self.tc.H != self.tc.session_id:
break
time.sleep(0.1)
- self.assertNotEquals(self.tc.H, self.tc.session_id)
+ self.assertNotEqual(self.tc.H, self.tc.session_id)
schan.close()
@@ -429,8 +429,8 @@ class TransportTest(ParamikoTest):
chan.send('x' * 1024)
bytes2 = self.tc.packetizer._Packetizer__sent_bytes
# tests show this is actually compressed to *52 bytes*! including packet overhead! nice!! :)
- self.assert_(bytes2 - bytes < 1024)
- self.assertEquals(52, bytes2 - bytes)
+ self.assertTrue(bytes2 - bytes < 1024)
+ self.assertEqual(52, bytes2 - bytes)
chan.close()
schan.close()
@@ -450,20 +450,20 @@ class TransportTest(ParamikoTest):
requested.append((addr, port))
self.tc._queue_incoming_channel(c)
- self.assertEquals(None, getattr(self.server, '_x11_screen_number', None))
+ self.assertEqual(None, getattr(self.server, '_x11_screen_number', None))
cookie = chan.request_x11(0, single_connection=True, handler=handler)
- self.assertEquals(0, self.server._x11_screen_number)
- self.assertEquals('MIT-MAGIC-COOKIE-1', self.server._x11_auth_protocol)
- self.assertEquals(cookie, self.server._x11_auth_cookie)
- self.assertEquals(True, self.server._x11_single_connection)
+ self.assertEqual(0, self.server._x11_screen_number)
+ self.assertEqual('MIT-MAGIC-COOKIE-1', self.server._x11_auth_protocol)
+ self.assertEqual(cookie, self.server._x11_auth_cookie)
+ self.assertEqual(True, self.server._x11_single_connection)
x11_server = self.ts.open_x11_channel(('localhost', 6093))
x11_client = self.tc.accept()
- self.assertEquals('localhost', requested[0][0])
- self.assertEquals(6093, requested[0][1])
+ self.assertEqual('localhost', requested[0][0])
+ self.assertEqual(6093, requested[0][1])
x11_server.send('hello')
- self.assertEquals(b('hello'), x11_client.recv(5))
+ self.assertEqual(b('hello'), x11_client.recv(5))
x11_server.close()
x11_client.close()
@@ -487,7 +487,7 @@ class TransportTest(ParamikoTest):
self.tc._queue_incoming_channel(c)
port = self.tc.request_port_forward('127.0.0.1', 0, handler)
- self.assertEquals(port, self.server._listen.getsockname()[1])
+ self.assertEqual(port, self.server._listen.getsockname()[1])
cs = socket.socket()
cs.connect(('127.0.0.1', port))
@@ -496,7 +496,7 @@ class TransportTest(ParamikoTest):
cch = self.tc.accept()
sch.send('hello')
- self.assertEquals(b('hello'), cch.recv(5))
+ self.assertEqual(b('hello'), cch.recv(5))
sch.close()
cch.close()
ss.close()
@@ -533,7 +533,7 @@ class TransportTest(ParamikoTest):
sch.send(cch.recv(8192))
sch.close()
- self.assertEquals(b('Hello!\n'), cs.recv(7))
+ self.assertEqual(b('Hello!\n'), cs.recv(7))
cs.close()
def test_G_stderr_select(self):
@@ -548,9 +548,9 @@ class TransportTest(ParamikoTest):
# nothing should be ready
r, w, e = select.select([chan], [], [], 0.1)
- self.assertEquals([], r)
- self.assertEquals([], w)
- self.assertEquals([], e)
+ self.assertEqual([], r)
+ self.assertEqual([], w)
+ self.assertEqual([], e)
schan.send_stderr('hello\n')
@@ -560,17 +560,17 @@ class TransportTest(ParamikoTest):
if chan in r:
break
time.sleep(0.1)
- self.assertEquals([chan], r)
- self.assertEquals([], w)
- self.assertEquals([], e)
+ self.assertEqual([chan], r)
+ self.assertEqual([], w)
+ self.assertEqual([], e)
- self.assertEquals(b('hello\n'), chan.recv_stderr(6))
+ self.assertEqual(b('hello\n'), chan.recv_stderr(6))
# and, should be dead again now
r, w, e = select.select([chan], [], [], 0.1)
- self.assertEquals([], r)
- self.assertEquals([], w)
- self.assertEquals([], e)
+ self.assertEqual([], r)
+ self.assertEqual([], w)
+ self.assertEqual([], e)
schan.close()
chan.close()
@@ -584,7 +584,7 @@ class TransportTest(ParamikoTest):
chan.invoke_shell()
schan = self.ts.accept(1.0)
- self.assertEquals(chan.send_ready(), True)
+ self.assertEqual(chan.send_ready(), True)
total = 0
K = '*' * 1024
while total < 1024 * 1024:
@@ -592,11 +592,11 @@ class TransportTest(ParamikoTest):
total += len(K)
if not chan.send_ready():
break
- self.assert_(total < 1024 * 1024)
+ self.assertTrue(total < 1024 * 1024)
schan.close()
chan.close()
- self.assertEquals(chan.send_ready(), True)
+ self.assertEqual(chan.send_ready(), True)
def test_I_rekey_deadlock(self):
"""
diff --git a/tests/test_util.py b/tests/test_util.py
index 84d5e88e..dba52236 100644
--- a/tests/test_util.py
+++ b/tests/test_util.py
@@ -103,7 +103,7 @@ class UtilTest(ParamikoTest):
global test_config_file
f = StringIO(test_config_file)
config = paramiko.util.parse_ssh_config(f)
- self.assertEquals(config._config,
+ self.assertEqual(config._config,
[{'host': ['*'], 'config': {}}, {'host': ['*'], 'config': {'identityfile': ['~/.ssh/id_rsa'], 'user': 'robey'}},
{'host': ['*.example.com'], 'config': {'user': 'bjork', 'port': '3333'}},
{'host': ['*'], 'config': {'crazy': 'something dumb '}},
@@ -131,7 +131,7 @@ class UtilTest(ParamikoTest):
hostname=host,
identityfile=[os.path.expanduser("~/.ssh/id_rsa")]
)
- self.assertEquals(
+ self.assertEqual(
paramiko.util.lookup_ssh_host_config(host, config),
values
)
@@ -139,7 +139,7 @@ class UtilTest(ParamikoTest):
def test_4_generate_key_bytes(self):
x = paramiko.util.generate_key_bytes(SHA, b('ABCDEFGH'), 'This is my secret passphrase.', 64)
hex = ''.join(['%02x' % byte_ord(c) for c in x])
- self.assertEquals(hex, '9110e2f6793b69363e58173e9436b13a5a4b339005741d5c680e505f57d871347b4239f14fb5c46e857d5e100424873ba849ac699cea98d729e57b3e84378e8b')
+ self.assertEqual(hex, '9110e2f6793b69363e58173e9436b13a5a4b339005741d5c680e505f57d871347b4239f14fb5c46e857d5e100424873ba849ac699cea98d729e57b3e84378e8b')
def test_5_host_keys(self):
f = open('hostfile.temp', 'w')
@@ -147,11 +147,11 @@ class UtilTest(ParamikoTest):
f.close()
try:
hostdict = paramiko.util.load_host_keys('hostfile.temp')
- self.assertEquals(2, len(hostdict))
- self.assertEquals(1, len(list(hostdict.values())[0]))
- self.assertEquals(1, len(list(hostdict.values())[1]))
+ self.assertEqual(2, len(hostdict))
+ self.assertEqual(1, len(list(hostdict.values())[0]))
+ self.assertEqual(1, len(list(hostdict.values())[1]))
fp = hexlify(hostdict['secure.example.com']['ssh-rsa'].get_fingerprint()).upper()
- self.assertEquals(b('E6684DB30E109B67B70FF1DC5C7F1363'), fp)
+ self.assertEqual(b('E6684DB30E109B67B70FF1DC5C7F1363'), fp)
finally:
os.unlink('hostfile.temp')
@@ -159,7 +159,7 @@ class UtilTest(ParamikoTest):
from paramiko.common import rng
# just verify that we can pull out 32 bytes and not get an exception.
x = rng.read(32)
- self.assertEquals(len(x), 32)
+ self.assertEqual(len(x), 32)
def test_7_host_config_expose_issue_33(self):
test_config_file = """
@@ -175,13 +175,13 @@ Host *
f = StringIO(test_config_file)
config = paramiko.util.parse_ssh_config(f)
host = 'www13.example.com'
- self.assertEquals(
+ self.assertEqual(
paramiko.util.lookup_ssh_host_config(host, config),
{'hostname': host, 'port': '22'}
)
def test_8_eintr_retry(self):
- self.assertEquals('foo', paramiko.util.retry_on_signal(lambda: 'foo'))
+ self.assertEqual('foo', paramiko.util.retry_on_signal(lambda: 'foo'))
# Variables that are set by raises_intr
intr_errors_remaining = [3]
@@ -192,8 +192,8 @@ Host *
intr_errors_remaining[0] -= 1
raise IOError(errno.EINTR, 'file', 'interrupted system call')
self.assertTrue(paramiko.util.retry_on_signal(raises_intr) is None)
- self.assertEquals(0, intr_errors_remaining[0])
- self.assertEquals(4, call_count[0])
+ self.assertEqual(0, intr_errors_remaining[0])
+ self.assertEqual(4, call_count[0])
def raises_ioerror_not_eintr():
raise IOError(errno.ENOENT, 'file', 'file not found')
@@ -219,7 +219,7 @@ Host equals-delimited
f = StringIO(conf)
config = paramiko.util.parse_ssh_config(f)
for host in ('space-delimited', 'equals-delimited'):
- self.assertEquals(
+ self.assertEqual(
host_config(host, config)['proxycommand'],
'foo bar=biz baz'
)
@@ -245,7 +245,7 @@ Host *
('specific', "host specific port 37 lol"),
('portonly', "host portonly port 155"),
):
- self.assertEquals(
+ self.assertEqual(
host_config(host, config)['proxycommand'],
val
)
@@ -267,7 +267,7 @@ Host *
f = StringIO(test_config_file)
config = paramiko.util.parse_ssh_config(f)
host = 'www13.example.com'
- self.assertEquals(
+ self.assertEqual(
paramiko.util.lookup_ssh_host_config(host, config),
{'hostname': host, 'port': '8080'}
)
@@ -295,7 +295,7 @@ ProxyCommand foo=bar:%h-%p
f = StringIO(test_config_file)
config = paramiko.util.parse_ssh_config(f)
- self.assertEquals(
+ self.assertEqual(
paramiko.util.lookup_ssh_host_config(host, config),
values
)
@@ -325,7 +325,7 @@ IdentityFile id_dsa22
f = StringIO(test_config_file)
config = paramiko.util.parse_ssh_config(f)
- self.assertEquals(
+ self.assertEqual(
paramiko.util.lookup_ssh_host_config(host, config),
values
)