summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.travis.yml42
-rw-r--r--README.rst2
-rw-r--r--dev-requirements.txt7
-rw-r--r--paramiko/__init__.py61
-rw-r--r--paramiko/_version.py2
-rw-r--r--paramiko/auth_handler.py84
-rw-r--r--paramiko/ber.py9
-rw-r--r--paramiko/channel.py37
-rw-r--r--paramiko/client.py65
-rw-r--r--paramiko/config.py4
-rw-r--r--paramiko/ecdsakey.py8
-rw-r--r--paramiko/file.py2
-rw-r--r--paramiko/hostkeys.py14
-rw-r--r--paramiko/kex_ecdh_nist.py4
-rw-r--r--paramiko/kex_gex.py16
-rw-r--r--paramiko/kex_group1.py3
-rw-r--r--paramiko/kex_gss.py41
-rw-r--r--paramiko/message.py4
-rw-r--r--paramiko/packet.py22
-rw-r--r--paramiko/pkey.py23
-rw-r--r--paramiko/primes.py14
-rw-r--r--paramiko/py3compat.py8
-rw-r--r--paramiko/server.py6
-rw-r--r--paramiko/sftp_attr.py13
-rw-r--r--paramiko/sftp_client.py63
-rw-r--r--paramiko/sftp_file.py19
-rw-r--r--paramiko/sftp_server.py10
-rw-r--r--paramiko/ssh_exception.py9
-rw-r--r--paramiko/ssh_gss.py6
-rw-r--r--paramiko/transport.py131
-rw-r--r--paramiko/util.py12
-rw-r--r--paramiko/win_pageant.py2
-rw-r--r--setup.cfg5
-rw-r--r--setup.py4
-rw-r--r--sites/shared_conf.py4
-rw-r--r--sites/www/changelog.rst24
-rw-r--r--sites/www/index.rst8
-rw-r--r--sites/www/installing.rst4
-rw-r--r--tasks.py14
-rw-r--r--tests/stub_sftp.py19
-rw-r--r--tests/test_auth.py23
-rw-r--r--tests/test_buffered_pipe.py9
-rw-r--r--tests/test_channelfile.py41
-rw-r--r--tests/test_client.py198
-rw-r--r--tests/test_file.py26
-rw-r--r--tests/test_gssapi.py6
-rw-r--r--tests/test_hostkeys.py12
-rw-r--r--tests/test_kex.py78
-rw-r--r--tests/test_kex_gss.py4
-rw-r--r--tests/test_message.py16
-rw-r--r--tests/test_packetizer.py10
-rw-r--r--tests/test_pkey.py71
-rw-r--r--tests/test_sftp.py126
-rw-r--r--tests/test_sftp_big.py25
-rw-r--r--tests/test_ssh_gss.py6
-rw-r--r--tests/test_transport.py111
-rw-r--r--tests/test_util.py23
-rw-r--r--tests/util.py2
58 files changed, 917 insertions, 695 deletions
diff --git a/.travis.yml b/.travis.yml
index f8e093c9..324c5f3f 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -1,29 +1,30 @@
+dist: xenial
language: python
sudo: false
cache:
directories:
- $HOME/.cache/pip
python:
- - "2.6"
- "2.7"
- - "3.3"
- "3.4"
- "3.5"
- "3.6"
- - "3.7-dev"
- - "pypy-5.6.0"
+ - "3.7"
+ - "3.8-dev"
+ - "pypy"
+ - "pypy3"
matrix:
allow_failures:
- - python: "3.7-dev"
+ - python: "3.8-dev"
# Explicitly test against our oldest supported cryptography.io, in addition
# to whatever the latest default is.
include:
- python: 2.7
env: "OLDEST_CRYPTO=1.5"
- - python: 3.6
+ - python: 3.7
env: "OLDEST_CRYPTO=1.5"
install:
- # Ensure modern pip/etc on Python 3.3 workers (not sure WTF, but, eh)
+ # Ensure modern pip/etc to avoid some issues w/ older worker environs
- pip install pip==9.0.1 setuptools==36.6.0
# Grab a specific version of Cryptography if desired. Doing this before other
# installations ensures we don't have to do any downgrading/overriding.
@@ -31,25 +32,22 @@ install:
if [[ -n "$OLDEST_CRYPTO" ]]; then
pip install "cryptography==${OLDEST_CRYPTO}"
fi
+ # Self-install for setup.py-driven deps
+ - pip install -e .
# Dev (doc/test running) requirements
# TODO: use pipenv + whatever contexty-type stuff it has
- pip install codecov # For codecov specifically
- pip install -r dev-requirements.txt
- # Self-install for setup.py-driven deps
- - pip install -e .
-script: |
- # NOTE: the below hilarity should only exist in 2.0-2.3, 2.4+ should've gone
- # back to vague normalcy!
- if [[ $TRAVIS_PYTHON_VERSION == '2.6' || $TRAVIS_PYTHON_VERSION == '3.3' ]];
- then
- flake8
- coverage run --source=paramiko -m pytest
- else
- inv travis.blacken
- flake8
- inv coverage
- inv sites
- fi
+script:
+ # Fast syntax check failures for more rapid feedback to submitters
+ # (Travis-oriented metatask that version checks Python, installs, runs.)
+ - inv travis.blacken
+ # I have this in my git pre-push hook, but contributors probably don't
+ - flake8
+ # All (including slow) tests, w/ coverage!
+ - inv coverage
+ # Ensure documentation builds, both sites, maxxed nitpicking
+ - inv sites
notifications:
irc:
channels: "irc.freenode.org#paramiko"
diff --git a/README.rst b/README.rst
index 2f24a0a9..6e49bd68 100644
--- a/README.rst
+++ b/README.rst
@@ -22,7 +22,7 @@ What
----
"Paramiko" is a combination of the Esperanto words for "paranoid" and
-"friend". It's a module for Python 2.6+/3.3+ that implements the SSH2 protocol
+"friend". It's a module for Python 2.7/3.4+ that implements the SSH2 protocol
for secure (encrypted and authenticated) connections to remote machines. Unlike
SSL (aka TLS), SSH2 protocol does not require hierarchical certificates signed
by a powerful central authority. You may know SSH2 as the protocol that
diff --git a/dev-requirements.txt b/dev-requirements.txt
index c9fd4965..9a7be339 100644
--- a/dev-requirements.txt
+++ b/dev-requirements.txt
@@ -4,9 +4,11 @@ invocations>=1.2.0,<2.0
# NOTE: pytest-relaxed currently only works with pytest >=3, <3.3
pytest==4.6.3
pytest-relaxed==1.1.5
+# pytest-xdist for test dir watching and the inv guard task
+pytest-xdist==1.28.0
mock==2.0.0
# Linting!
-flake8==2.4.0
+flake8==3.6.0
# Coverage!
coverage==3.7.1
codecov==1.6.3
@@ -18,6 +20,3 @@ releases>=1.5,<2.0
semantic_version>=2.4,<2.5
wheel==0.24
twine==1.11.0
-# Test-matrix-oriented pins of production deps; should only exist on 2.0-2.3
-pycparser<2.19
-idna<2.8
diff --git a/paramiko/__init__.py b/paramiko/__init__.py
index 5780a9c7..b5b577d3 100644
--- a/paramiko/__init__.py
+++ b/paramiko/__init__.py
@@ -19,15 +19,6 @@
# flake8: noqa
import sys
from paramiko._version import __version__, __version_info__
-
-if sys.version_info < (2, 6):
- raise RuntimeError("You need Python 2.6+ for this module.")
-
-
-__author__ = "Jeff Forcier <jeff@bitprophet.org>"
-__license__ = "GNU Lesser General Public License (LGPL)"
-
-
from paramiko.transport import SecurityOptions, Transport
from paramiko.client import (
SSHClient,
@@ -38,7 +29,7 @@ from paramiko.client import (
)
from paramiko.auth_handler import AuthHandler
from paramiko.ssh_gss import GSSAuth, GSS_AUTH_AVAILABLE, GSS_EXCEPTIONS
-from paramiko.channel import Channel, ChannelFile
+from paramiko.channel import Channel, ChannelFile, ChannelStderrFile
from paramiko.ssh_exception import (
SSHException,
PasswordRequiredException,
@@ -94,42 +85,46 @@ from paramiko.sftp import (
from paramiko.common import io_sleep
+
+__author__ = "Jeff Forcier <jeff@bitprophet.org>"
+__license__ = "GNU Lesser General Public License (LGPL)"
+
__all__ = [
- "Transport",
- "SSHClient",
- "MissingHostKeyPolicy",
+ "Agent",
+ "AgentKey",
+ "AuthenticationException",
"AutoAddPolicy",
- "RejectPolicy",
- "WarningPolicy",
- "SecurityOptions",
- "SubsystemHandler",
+ "BadAuthenticationType",
+ "BadHostKeyException",
+ "BufferedFile",
"Channel",
- "PKey",
- "RSAKey",
+ "ChannelException",
"DSSKey",
+ "HostKeys",
"Message",
- "SSHException",
- "AuthenticationException",
+ "MissingHostKeyPolicy",
+ "PKey",
"PasswordRequiredException",
- "BadAuthenticationType",
- "ChannelException",
- "BadHostKeyException",
"ProxyCommand",
"ProxyCommandFailure",
+ "RSAKey",
+ "RejectPolicy",
"SFTP",
+ "SFTPAttributes",
+ "SFTPClient",
+ "SFTPError",
"SFTPFile",
"SFTPHandle",
- "SFTPClient",
"SFTPServer",
- "SFTPError",
- "SFTPAttributes",
"SFTPServerInterface",
- "ServerInterface",
- "BufferedFile",
- "Agent",
- "AgentKey",
- "HostKeys",
+ "SSHClient",
"SSHConfig",
- "util",
+ "SSHException",
+ "SecurityOptions",
+ "ServerInterface",
+ "SubsystemHandler",
+ "Transport",
+ "WarningPolicy",
"io_sleep",
+ "util",
]
diff --git a/paramiko/_version.py b/paramiko/_version.py
index fcdc1c31..2e797d40 100644
--- a/paramiko/_version.py
+++ b/paramiko/_version.py
@@ -1,2 +1,2 @@
-__version_info__ = (2, 3, 3)
+__version_info__ = (2, 4, 2)
__version__ = ".".join(map(str, __version_info__))
diff --git a/paramiko/auth_handler.py b/paramiko/auth_handler.py
index bbfa1712..26605a16 100644
--- a/paramiko/auth_handler.py
+++ b/paramiko/auth_handler.py
@@ -46,7 +46,6 @@ from paramiko.common import (
MSG_USERAUTH_REQUEST,
MSG_USERAUTH_SUCCESS,
MSG_USERAUTH_FAILURE,
- cMSG_USERAUTH_BANNER,
MSG_USERAUTH_BANNER,
MSG_USERAUTH_INFO_REQUEST,
MSG_USERAUTH_INFO_RESPONSE,
@@ -59,6 +58,7 @@ from paramiko.common import (
MSG_USERAUTH_GSSAPI_ERRTOK,
MSG_USERAUTH_GSSAPI_MIC,
MSG_NAMES,
+ cMSG_USERAUTH_BANNER,
)
from paramiko.message import Message
from paramiko.py3compat import bytestring
@@ -95,6 +95,9 @@ class AuthHandler(object):
self.gss_host = None
self.gss_deleg_creds = True
+ def _log(self, *args):
+ return self.transport._log(*args)
+
def is_authenticated(self):
return self.authenticated
@@ -269,7 +272,7 @@ class AuthHandler(object):
def _parse_service_accept(self, m):
service = m.get_text()
if service == "ssh-userauth":
- self.transport._log(DEBUG, "userauth is OK")
+ self._log(DEBUG, "userauth is OK")
m = Message()
m.add_byte(cMSG_USERAUTH_REQUEST)
m.add_string(self.username)
@@ -347,7 +350,7 @@ class AuthHandler(object):
self.transport.send_message(m)
else:
raise SSHException(
- "Received Package: %s" % MSG_NAMES[ptype]
+ "Received Package: {}".format(MSG_NAMES[ptype])
)
m = Message()
m.add_byte(cMSG_USERAUTH_GSSAPI_MIC)
@@ -378,7 +381,7 @@ Error Message: {}
return
else:
raise SSHException(
- "Received Package: %s" % MSG_NAMES[ptype]
+ "Received Package: {}".format(MSG_NAMES[ptype])
)
elif (
self.auth_method == "gssapi-keyex"
@@ -392,23 +395,23 @@ Error Message: {}
pass
else:
raise SSHException(
- 'Unknown auth method "%s"' % self.auth_method
+ 'Unknown auth method "{}"'.format(self.auth_method)
)
self.transport._send_message(m)
else:
- self.transport._log(
- DEBUG, 'Service request "%s" accepted (?)' % service
+ self._log(
+ DEBUG, 'Service request "{}" accepted (?)'.format(service)
)
def _send_auth_result(self, username, method, result):
# okay, send result
m = Message()
if result == AUTH_SUCCESSFUL:
- self.transport._log(INFO, "Auth granted (%s)." % method)
+ self._log(INFO, "Auth granted ({}).".format(method))
m.add_byte(cMSG_USERAUTH_SUCCESS)
self.authenticated = True
else:
- self.transport._log(INFO, "Auth rejected (%s)." % method)
+ self._log(INFO, "Auth rejected ({}).".format(method))
m.add_byte(cMSG_USERAUTH_FAILURE)
m.add_string(
self.transport.server_object.get_allowed_auths(username)
@@ -452,10 +455,11 @@ Error Message: {}
username = m.get_text()
service = m.get_text()
method = m.get_text()
- self.transport._log(
+ self._log(
DEBUG,
- "Auth request (type=%s) service=%s, username=%s"
- % (method, service, username),
+ "Auth request (type={}) service={}, username={}".format(
+ method, service, username
+ ),
)
if service != "ssh-connection":
self._disconnect_service_not_available()
@@ -463,7 +467,7 @@ Error Message: {}
if (self.auth_username is not None) and (
self.auth_username != username
):
- self.transport._log(
+ self._log(
WARNING,
"Auth rejected because the client attempted to change username in mid-flight", # noqa
)
@@ -488,9 +492,7 @@ Error Message: {}
# always treated as failure, since we don't support changing
# passwords, but collect the list of valid auth types from
# the callback anyway
- self.transport._log(
- DEBUG, "Auth request to change passwords (rejected)"
- )
+ self._log(DEBUG, "Auth request to change passwords (rejected)")
newpassword = m.get_binary()
try:
newpassword = newpassword.decode("UTF-8", "replace")
@@ -508,13 +510,13 @@ Error Message: {}
try:
key = self.transport._key_info[keytype](Message(keyblob))
except SSHException as e:
- self.transport._log(
- INFO, "Auth rejected: public key: %s" % str(e)
- )
+ self._log(INFO, "Auth rejected: public key: {}".format(str(e)))
key = None
except Exception as e:
- msg = "Auth rejected: unsupported or mangled public key ({0}: {1})" # noqa
- self.transport._log(INFO, msg.format(e.__class__.__name__, e))
+ msg = (
+ "Auth rejected: unsupported or mangled public key ({}: {})"
+ ) # noqa
+ self._log(INFO, msg.format(e.__class__.__name__, e))
key = None
if key is None:
self._disconnect_no_more_auth()
@@ -537,9 +539,7 @@ Error Message: {}
sig = Message(m.get_binary())
blob = self._get_session_blob(key, service, username)
if not key.verify_ssh_sig(blob, sig):
- self.transport._log(
- INFO, "Auth rejected: invalid signature"
- )
+ self._log(INFO, "Auth rejected: invalid signature")
result = AUTH_FAILED
elif method == "keyboard-interactive":
submethods = m.get_string()
@@ -559,7 +559,7 @@ Error Message: {}
# We can't accept more than one OID, so if the SSH client sends
# more than one, disconnect.
if mechs > 1:
- self.transport._log(
+ self._log(
INFO,
"Disconnect: Received more than one GSS-API OID mechanism",
)
@@ -568,7 +568,7 @@ Error Message: {}
mech_ok = sshgss.ssh_check_mech(desired_mech)
# if we don't support the mechanism, disconnect.
if not mech_ok:
- self.transport._log(
+ self._log(
INFO,
"Disconnect: Received an invalid GSS-API OID mechanism",
)
@@ -615,8 +615,8 @@ Error Message: {}
self._send_auth_result(username, method, result)
def _parse_userauth_success(self, m):
- self.transport._log(
- INFO, "Authentication (%s) successful!" % self.auth_method
+ self._log(
+ INFO, "Authentication ({}) successful!".format(self.auth_method)
)
self.authenticated = True
self.transport._auth_trigger()
@@ -627,21 +627,23 @@ Error Message: {}
authlist = m.get_list()
partial = m.get_boolean()
if partial:
- self.transport._log(INFO, "Authentication continues...")
- self.transport._log(DEBUG, "Methods: " + str(authlist))
+ self._log(INFO, "Authentication continues...")
+ self._log(DEBUG, "Methods: " + str(authlist))
self.transport.saved_exception = PartialAuthentication(authlist)
elif self.auth_method not in authlist:
- self.transport._log(
- DEBUG,
- "Authentication type (%s) not permitted." % self.auth_method,
- )
- self.transport._log(DEBUG, "Allowed methods: " + str(authlist))
+ for msg in (
+ "Authentication type ({}) not permitted.".format(
+ self.auth_method
+ ),
+ "Allowed methods: {}".format(authlist),
+ ):
+ self._log(DEBUG, msg)
self.transport.saved_exception = BadAuthenticationType(
"Bad authentication type", authlist
)
else:
- self.transport._log(
- INFO, "Authentication (%s) failed." % self.auth_method
+ self._log(
+ INFO, "Authentication ({}) failed.".format(self.auth_method)
)
self.authenticated = False
self.username = None
@@ -651,7 +653,7 @@ Error Message: {}
def _parse_userauth_banner(self, m):
banner = m.get_string()
self.banner = banner
- self.transport._log(INFO, "Auth banner: %s" % banner)
+ self._log(INFO, "Auth banner: {}".format(banner))
# who cares.
def _parse_userauth_info_request(self, m):
@@ -695,10 +697,8 @@ Error Message: {}
def _handle_local_gss_failure(self, e):
self.transport.saved_exception = e
- self.transport._log(DEBUG, "GSSAPI failure: %s" % str(e))
- self.transport._log(
- INFO, "Authentication (%s) failed." % self.auth_method
- )
+ self._log(DEBUG, "GSSAPI failure: {}".format(e))
+ self._log(INFO, "Authentication ({}) failed.".format(self.auth_method))
self.authenticated = False
self.username = None
if self.auth_event is not None:
diff --git a/paramiko/ber.py b/paramiko/ber.py
index 0991b0dd..92d7121e 100644
--- a/paramiko/ber.py
+++ b/paramiko/ber.py
@@ -89,9 +89,8 @@ class BER(object):
return util.inflate_long(data)
else:
# 1: boolean (00 false, otherwise true)
- raise BERException(
- "Unknown ber encoding type %d (robey is lazy)" % ident
- )
+ msg = "Unknown ber encoding type {:d} (robey is lazy)"
+ raise BERException(msg.format(ident))
@staticmethod
def decode_sequence(data):
@@ -127,7 +126,9 @@ class BER(object):
elif (type(x) is list) or (type(x) is tuple):
self.encode_tlv(0x30, self.encode_sequence(x))
else:
- raise BERException("Unknown type for encoding: %s" % repr(type(x)))
+ raise BERException(
+ "Unknown type for encoding: {!r}".format(type(x))
+ )
@staticmethod
def encode_sequence(data):
diff --git a/paramiko/channel.py b/paramiko/channel.py
index 96381f01..f2aaf26a 100644
--- a/paramiko/channel.py
+++ b/paramiko/channel.py
@@ -145,7 +145,7 @@ class Channel(ClosingContextManager):
"""
Return a string representation of this object, for debugging.
"""
- out = "<paramiko.Channel %d" % self.chanid
+ out = "<paramiko.Channel {}".format(self.chanid)
if self.closed:
out += " (closed)"
elif self.active:
@@ -153,9 +153,9 @@ class Channel(ClosingContextManager):
out += " (EOF received)"
if self.eof_sent:
out += " (EOF sent)"
- out += " (open) window=%d" % self.out_window_size
+ out += " (open) window={}".format(self.out_window_size)
if len(self.in_buffer) > 0:
- out += " in-buffer=%d" % (len(self.in_buffer),)
+ out += " in-buffer={}".format(len(self.in_buffer))
out += " -> " + repr(self.transport)
out += ">"
return out
@@ -331,7 +331,7 @@ class Channel(ClosingContextManager):
try:
self.set_environment_variable(name, value)
except SSHException as e:
- err = 'Failed to set environment variable "{0}".'
+ err = 'Failed to set environment variable "{}".'
raise SSHException(err.format(name), e)
@open_only
@@ -991,7 +991,7 @@ class Channel(ClosingContextManager):
# a window update
self.in_window_threshold = window_size // 10
self.in_window_sofar = 0
- self._log(DEBUG, "Max packet in: %d bytes" % max_packet_size)
+ self._log(DEBUG, "Max packet in: {} bytes".format(max_packet_size))
def _set_remote_channel(self, chanid, window_size, max_packet_size):
self.remote_chanid = chanid
@@ -1000,10 +1000,12 @@ class Channel(ClosingContextManager):
max_packet_size
)
self.active = 1
- self._log(DEBUG, "Max packet out: %d bytes" % self.out_max_packet_size)
+ self._log(
+ DEBUG, "Max packet out: {} bytes".format(self.out_max_packet_size)
+ )
def _request_success(self, m):
- self._log(DEBUG, "Sesch channel %d request ok" % self.chanid)
+ self._log(DEBUG, "Sesch channel {} request ok".format(self.chanid))
self.event_ready = True
self.event.set()
return
@@ -1031,7 +1033,7 @@ class Channel(ClosingContextManager):
s = m.get_binary()
if code != 1:
self._log(
- ERROR, "unknown extended_data type %d; discarding" % code
+ ERROR, "unknown extended_data type {}; discarding".format(code)
)
return
if self.combine_stderr:
@@ -1044,7 +1046,7 @@ class Channel(ClosingContextManager):
self.lock.acquire()
try:
if self.ultra_debug:
- self._log(DEBUG, "window up %d" % nbytes)
+ self._log(DEBUG, "window up {}".format(nbytes))
self.out_window_size += nbytes
self.out_buffer_cv.notifyAll()
finally:
@@ -1131,7 +1133,7 @@ class Channel(ClosingContextManager):
else:
ok = server.check_channel_forward_agent_request(self)
else:
- self._log(DEBUG, 'Unhandled channel request "%s"' % key)
+ self._log(DEBUG, 'Unhandled channel request "{}"'.format(key))
ok = False
if want_reply:
m = Message()
@@ -1153,7 +1155,7 @@ class Channel(ClosingContextManager):
self._pipe.set_forever()
finally:
self.lock.release()
- self._log(DEBUG, "EOF received (%s)", self._name)
+ self._log(DEBUG, "EOF received ({})".format(self._name))
def _handle_close(self, m):
self.lock.acquire()
@@ -1225,7 +1227,7 @@ class Channel(ClosingContextManager):
m.add_byte(cMSG_CHANNEL_EOF)
m.add_int(self.remote_chanid)
self.eof_sent = True
- self._log(DEBUG, "EOF sent (%s)", self._name)
+ self._log(DEBUG, "EOF sent ({})".format(self._name))
return m
def _close_internal(self):
@@ -1259,12 +1261,14 @@ class Channel(ClosingContextManager):
if self.closed or self.eof_received or not self.active:
return 0
if self.ultra_debug:
- self._log(DEBUG, "addwindow %d" % n)
+ self._log(DEBUG, "addwindow {}".format(n))
self.in_window_sofar += n
if self.in_window_sofar <= self.in_window_threshold:
return 0
if self.ultra_debug:
- self._log(DEBUG, "addwindow send %d" % self.in_window_sofar)
+ self._log(
+ DEBUG, "addwindow send {}".format(self.in_window_sofar)
+ )
out = self.in_window_sofar
self.in_window_sofar = 0
return out
@@ -1307,7 +1311,7 @@ class Channel(ClosingContextManager):
size = self.out_max_packet_size - 64
self.out_window_size -= size
if self.ultra_debug:
- self._log(DEBUG, "window down to %d" % self.out_window_size)
+ self._log(DEBUG, "window down to {}".format(self.out_window_size))
return size
@@ -1344,9 +1348,6 @@ class ChannelFile(BufferedFile):
class ChannelStderrFile(ChannelFile):
- def __init__(self, channel, mode="r", bufsize=-1):
- ChannelFile.__init__(self, channel, mode, bufsize)
-
def _read(self, size):
return self.channel.recv_stderr(size)
diff --git a/paramiko/client.py b/paramiko/client.py
index 49a9a301..2538d582 100644
--- a/paramiko/client.py
+++ b/paramiko/client.py
@@ -146,7 +146,9 @@ class SSHClient(ClosingContextManager):
for hostname, keys in self._host_keys.items():
for keytype, key in keys.items():
f.write(
- "%s %s %s\n" % (hostname, keytype, key.get_base64())
+ "{} {} {}\n".format(
+ hostname, keytype, key.get_base64()
+ )
)
def get_host_keys(self):
@@ -233,6 +235,7 @@ class SSHClient(ClosingContextManager):
banner_timeout=None,
auth_timeout=None,
gss_trust_dns=True,
+ passphrase=None,
):
"""
Connect to an SSH server and authenticate to it. The server's host key
@@ -273,7 +276,10 @@ class SSHClient(ClosingContextManager):
the username to authenticate as (defaults to the current local
username)
:param str password:
- a password to use for authentication or for unlocking a private key
+ Used for password authentication; is also used for private key
+ decryption if ``passphrase`` is not given.
+ :param str passphrase:
+ Used for decrypting private keys.
:param .PKey pkey: an optional private key to use for authentication
:param str key_filename:
the filename, or list of filenames, of optional private key(s)
@@ -319,6 +325,8 @@ class SSHClient(ClosingContextManager):
``gss_deleg_creds`` and ``gss_host`` arguments.
.. versionchanged:: 2.3
Added the ``gss_trust_dns`` argument.
+ .. versionchanged:: 2.4
+ Added the ``passphrase`` argument.
"""
if not sock:
errors = {}
@@ -374,7 +382,7 @@ class SSHClient(ClosingContextManager):
if port == SSH_PORT:
server_hostkey_name = hostname
else:
- server_hostkey_name = "[%s]:%d" % (hostname, port)
+ server_hostkey_name = "[{}]:{}".format(hostname, port)
our_server_keys = None
our_server_keys = self._system_host_keys.get(server_hostkey_name)
@@ -426,6 +434,7 @@ class SSHClient(ClosingContextManager):
gss_kex,
gss_deleg_creds,
t.gss_host,
+ passphrase,
)
def close(self):
@@ -563,14 +572,14 @@ class SSHClient(ClosingContextManager):
# TODO: change this to 'Loading' instead of 'Trying' sometime; probably
# when #387 is released, since this is a critical log message users are
# likely testing/filtering for (bah.)
- msg = "Trying discovered key {0} in {1}".format(
+ msg = "Trying discovered key {} in {}".format(
hexlify(key.get_fingerprint()), key_path
)
self._log(DEBUG, msg)
# Attempt to load cert if it exists.
if os.path.isfile(cert_path):
key.load_certificate(cert_path)
- self._log(DEBUG, "Adding public certificate {0}".format(cert_path))
+ self._log(DEBUG, "Adding public certificate {}".format(cert_path))
return key
def _auth(
@@ -585,6 +594,7 @@ class SSHClient(ClosingContextManager):
gss_kex,
gss_deleg_creds,
gss_host,
+ passphrase,
):
"""
Try, in order:
@@ -595,13 +605,16 @@ class SSHClient(ClosingContextManager):
(if allowed).
- Plain username/password auth, if a password was given.
- (The password might be needed to unlock a private key, or for
- two-factor authentication [for which it is required].)
+ (The password might be needed to unlock a private key [if 'passphrase'
+ isn't also given], or for two-factor authentication [for which it is
+ required].)
"""
saved_exception = None
two_factor = False
allowed_types = set()
- two_factor_types = set(["keyboard-interactive", "password"])
+ two_factor_types = {"keyboard-interactive", "password"}
+ if passphrase is None and password is not None:
+ passphrase = password
# If GSS-API support and GSS-PI Key Exchange was performed, we attempt
# authentication with gssapi-keyex.
@@ -628,7 +641,9 @@ class SSHClient(ClosingContextManager):
try:
self._log(
DEBUG,
- "Trying SSH key %s" % hexlify(pkey.get_fingerprint()),
+ "Trying SSH key {}".format(
+ hexlify(pkey.get_fingerprint())
+ ),
)
allowed_types = set(
self._transport.auth_publickey(username, pkey)
@@ -644,7 +659,7 @@ class SSHClient(ClosingContextManager):
for pkey_class in (RSAKey, DSSKey, ECDSAKey, Ed25519Key):
try:
key = self._key_from_filepath(
- key_filename, pkey_class, password
+ key_filename, pkey_class, passphrase
)
allowed_types = set(
self._transport.auth_publickey(username, key)
@@ -662,11 +677,8 @@ class SSHClient(ClosingContextManager):
for key in self._agent.get_keys():
try:
- self._log(
- DEBUG,
- "Trying SSH agent key %s"
- % hexlify(key.get_fingerprint()),
- )
+ id_ = hexlify(key.get_fingerprint())
+ self._log(DEBUG, "Trying SSH agent key {}".format(id_))
# for 2-factor auth a successfully auth'd key password
# will return an allowed 2fac auth method
allowed_types = set(
@@ -691,7 +703,7 @@ class SSHClient(ClosingContextManager):
# ~/ssh/ is for windows
for directory in [".ssh", "ssh"]:
full_path = os.path.expanduser(
- "~/%s/id_%s" % (directory, name)
+ "~/{}/id_{}".format(directory, name)
)
if os.path.isfile(full_path):
# TODO: only do this append if below did not run
@@ -705,7 +717,7 @@ class SSHClient(ClosingContextManager):
for pkey_class, filename in keyfiles:
try:
key = self._key_from_filepath(
- filename, pkey_class, password
+ filename, pkey_class, passphrase
)
# for 2-factor auth a successfully auth'd key will result
# in ['password']
@@ -774,8 +786,9 @@ class AutoAddPolicy(MissingHostKeyPolicy):
client.save_host_keys(client._host_keys_filename)
client._log(
DEBUG,
- "Adding %s host key for %s: %s"
- % (key.get_name(), hostname, hexlify(key.get_fingerprint())),
+ "Adding {} host key for {}: {}".format(
+ key.get_name(), hostname, hexlify(key.get_fingerprint())
+ ),
)
@@ -788,10 +801,13 @@ class RejectPolicy(MissingHostKeyPolicy):
def missing_host_key(self, client, hostname, key):
client._log(
DEBUG,
- "Rejecting %s host key for %s: %s"
- % (key.get_name(), hostname, hexlify(key.get_fingerprint())),
+ "Rejecting {} host key for {}: {}".format(
+ key.get_name(), hostname, hexlify(key.get_fingerprint())
+ ),
+ )
+ raise SSHException(
+ "Server {!r} not found in known_hosts".format(hostname)
)
- raise SSHException("Server %r not found in known_hosts" % hostname)
class WarningPolicy(MissingHostKeyPolicy):
@@ -802,6 +818,7 @@ class WarningPolicy(MissingHostKeyPolicy):
def missing_host_key(self, client, hostname, key):
warnings.warn(
- "Unknown %s host key for %s: %s"
- % (key.get_name(), hostname, hexlify(key.get_fingerprint()))
+ "Unknown {} host key for {}: {}".format(
+ key.get_name(), hostname, hexlify(key.get_fingerprint())
+ )
)
diff --git a/paramiko/config.py b/paramiko/config.py
index f7941e0d..21c9dab8 100644
--- a/paramiko/config.py
+++ b/paramiko/config.py
@@ -65,7 +65,7 @@ class SSHConfig(object):
match = re.match(self.SETTINGS_REGEX, line)
if not match:
- raise Exception("Unparsable line %s" % line)
+ raise Exception("Unparsable line {}".format(line))
key = match.group(1).lower()
value = match.group(2)
@@ -235,7 +235,7 @@ class SSHConfig(object):
try:
return shlex.split(host)
except ValueError:
- raise Exception("Unparsable host %s" % host)
+ raise Exception("Unparsable host {}".format(host))
class LazyFqdn(object):
diff --git a/paramiko/ecdsakey.py b/paramiko/ecdsakey.py
index ab9ec15f..b73a969e 100644
--- a/paramiko/ecdsakey.py
+++ b/paramiko/ecdsakey.py
@@ -147,14 +147,16 @@ class ECDSAKey(PKey):
)
key_types = self._ECDSA_CURVES.get_key_format_identifier_list()
cert_types = [
- "{0}-cert-v01@openssh.com".format(x) for x in key_types
+ "{}-cert-v01@openssh.com".format(x) for x in key_types
]
self._check_type_and_load_cert(
msg=msg, key_type=key_types, cert_type=cert_types
)
curvename = msg.get_text()
if curvename != self.ecdsa_curve.nist_name:
- raise SSHException("Can't handle curve of type %s" % curvename)
+ raise SSHException(
+ "Can't handle curve of type {}".format(curvename)
+ )
pointinfo = msg.get_binary()
try:
@@ -264,7 +266,7 @@ class ECDSAKey(PKey):
if bits is not None:
curve = cls._ECDSA_CURVES.get_by_key_length(bits)
if curve is None:
- raise ValueError("Unsupported key length: %d" % bits)
+ raise ValueError("Unsupported key length: {:d}".format(bits))
curve = curve.curve_class()
private_key = ec.generate_private_key(curve, backend=default_backend())
diff --git a/paramiko/file.py b/paramiko/file.py
index a6933e02..9e9f6eb8 100644
--- a/paramiko/file.py
+++ b/paramiko/file.py
@@ -341,7 +341,7 @@ class BufferedFile(ClosingContextManager):
after rounding up to an internal buffer size) are read.
:param int sizehint: desired maximum number of bytes to read.
- :returns: `list` of lines read from the file.
+ :returns: list of lines read from the file.
"""
lines = []
byte_count = 0
diff --git a/paramiko/hostkeys.py b/paramiko/hostkeys.py
index 3aac1341..d0660cc8 100644
--- a/paramiko/hostkeys.py
+++ b/paramiko/hostkeys.py
@@ -306,7 +306,7 @@ class HostKeys(MutableMapping):
salt = decodebytes(b(salt))
assert len(salt) == sha1().digest_size
hmac = HMAC(salt, b(hostname), sha1).digest()
- hostkey = "|1|%s|%s" % (u(encodebytes(salt)), u(encodebytes(hmac)))
+ hostkey = "|1|{}|{}".format(u(encodebytes(salt)), u(encodebytes(hmac)))
return hostkey.replace("\n", "")
@@ -344,10 +344,8 @@ class HostKeyEntry:
fields = line.split(" ")
if len(fields) < 3:
# Bad number of fields
- log.info(
- "Not enough fields found in known_hosts in line %s (%r)"
- % (lineno, line)
- )
+ msg = "Not enough fields found in known_hosts in line {} ({!r})"
+ log.info(msg.format(lineno, line))
return None
fields = fields[:3]
@@ -367,7 +365,7 @@ class HostKeyEntry:
elif keytype == "ssh-ed25519":
key = Ed25519Key(data=decodebytes(key))
else:
- log.info("Unable to handle key of type %s" % (keytype,))
+ log.info("Unable to handle key of type {}".format(keytype))
return None
except binascii.Error as e:
@@ -382,7 +380,7 @@ class HostKeyEntry:
included.
"""
if self.valid:
- return "%s %s %s\n" % (
+ return "{} {} {}\n".format(
",".join(self.hostnames),
self.key.get_name(),
self.key.get_base64(),
@@ -390,4 +388,4 @@ class HostKeyEntry:
return None
def __repr__(self):
- return "<HostKeyEntry %r: %r>" % (self.hostnames, self.key)
+ return "<HostKeyEntry {!r}: {!r}>".format(self.hostnames, self.key)
diff --git a/paramiko/kex_ecdh_nist.py b/paramiko/kex_ecdh_nist.py
index 496805ab..1d87442a 100644
--- a/paramiko/kex_ecdh_nist.py
+++ b/paramiko/kex_ecdh_nist.py
@@ -45,7 +45,9 @@ class KexNistp256:
return self._parse_kexecdh_init(m)
elif not self.transport.server_mode and (ptype == _MSG_KEXECDH_REPLY):
return self._parse_kexecdh_reply(m)
- raise SSHException("KexECDH asked to handle packet type %d" % ptype)
+ raise SSHException(
+ "KexECDH asked to handle packet type {:d}".format(ptype)
+ )
def _generate_key_pair(self):
self.P = ec.generate_private_key(self.curve, default_backend())
diff --git a/paramiko/kex_gex.py b/paramiko/kex_gex.py
index 371db488..fb8f01fd 100644
--- a/paramiko/kex_gex.py
+++ b/paramiko/kex_gex.py
@@ -101,9 +101,8 @@ class KexGex(object):
return self._parse_kexdh_gex_reply(m)
elif ptype == _MSG_KEXDH_GEX_REQUEST_OLD:
return self._parse_kexdh_gex_request_old(m)
- raise SSHException(
- "KexGex %s asked to handle packet type %d" % self.name, ptype
- )
+ msg = "KexGex {} asked to handle packet type {:d}"
+ raise SSHException(msg.format(self.name, ptype))
# ...internals...
@@ -151,8 +150,9 @@ class KexGex(object):
raise SSHException("Can't do server-side gex with no modulus pack")
self.transport._log(
DEBUG,
- "Picking p (%d <= %d <= %d bits)"
- % (minbits, preferredbits, maxbits),
+ "Picking p ({} <= {} <= {} bits)".format(
+ minbits, preferredbits, maxbits
+ ),
)
self.g, self.p = pack.get_modulus(minbits, preferredbits, maxbits)
m = Message()
@@ -176,7 +176,7 @@ class KexGex(object):
if pack is None:
raise SSHException("Can't do server-side gex with no modulus pack")
self.transport._log(
- DEBUG, "Picking p (~ %d bits)" % (self.preferred_bits,)
+ DEBUG, "Picking p (~ {} bits)".format(self.preferred_bits)
)
self.g, self.p = pack.get_modulus(
self.min_bits, self.preferred_bits, self.max_bits
@@ -197,9 +197,9 @@ class KexGex(object):
if (bitlen < 1024) or (bitlen > 8192):
raise SSHException(
"Server-generated gex p (don't ask) is out of range "
- "(%d bits)" % bitlen
+ "({} bits)".format(bitlen)
)
- self.transport._log(DEBUG, "Got server p (%d bits)" % bitlen)
+ self.transport._log(DEBUG, "Got server p ({} bits)".format(bitlen))
self._generate_x()
# now compute e = g^x mod p
self.e = pow(self.g, self.x, self.p)
diff --git a/paramiko/kex_group1.py b/paramiko/kex_group1.py
index e6cbc4c5..904835d7 100644
--- a/paramiko/kex_group1.py
+++ b/paramiko/kex_group1.py
@@ -73,7 +73,8 @@ class KexGroup1(object):
return self._parse_kexdh_init(m)
elif not self.transport.server_mode and (ptype == _MSG_KEXDH_REPLY):
return self._parse_kexdh_reply(m)
- raise SSHException("KexGroup1 asked to handle packet type %d" % ptype)
+ msg = "KexGroup1 asked to handle packet type {:d}"
+ raise SSHException(msg.format(ptype))
# ...internals...
diff --git a/paramiko/kex_gss.py b/paramiko/kex_gss.py
index d76bb2dd..f83a2dc4 100644
--- a/paramiko/kex_gss.py
+++ b/paramiko/kex_gss.py
@@ -131,9 +131,8 @@ class KexGSSGroup1(object):
return self._parse_kexgss_complete(m)
elif ptype == MSG_KEXGSS_ERROR:
return self._parse_kexgss_error(m)
- raise SSHException(
- "GSS KexGroup1 asked to handle packet type %d" % ptype
- )
+ msg = "GSS KexGroup1 asked to handle packet type {:d}"
+ raise SSHException(msg.format(ptype))
# ## internals...
@@ -306,9 +305,14 @@ class KexGSSGroup1(object):
err_msg = m.get_string()
m.get_string() # we don't care about the language!
raise SSHException(
- "GSS-API Error:\nMajor Status: %s\nMinor Status: %s\
- \nError Message: %s\n"
- ) % (str(maj_status), str(min_status), err_msg)
+ """GSS-API Error:
+Major Status: {}
+Minor Status: {}
+Error Message: {}
+""".format(
+ maj_status, min_status, err_msg
+ )
+ )
class KexGSSGroup14(KexGSSGroup1):
@@ -386,7 +390,8 @@ class KexGSSGex(object):
return self._parse_kexgss_complete(m)
elif ptype == MSG_KEXGSS_ERROR:
return self._parse_kexgss_error(m)
- raise SSHException("KexGex asked to handle packet type %d" % ptype)
+ msg = "KexGex asked to handle packet type {:d}"
+ raise SSHException(msg.format(ptype))
# ## internals...
@@ -440,8 +445,9 @@ class KexGSSGex(object):
raise SSHException("Can't do server-side gex with no modulus pack")
self.transport._log(
DEBUG, # noqa
- "Picking p (%d <= %d <= %d bits)"
- % (minbits, preferredbits, maxbits),
+ "Picking p ({} <= {} <= {} bits)".format(
+ minbits, preferredbits, maxbits
+ ),
)
self.g, self.p = pack.get_modulus(minbits, preferredbits, maxbits)
m = Message()
@@ -464,9 +470,11 @@ class KexGSSGex(object):
if (bitlen < 1024) or (bitlen > 8192):
raise SSHException(
"Server-generated gex p (don't ask) is out of range "
- "(%d bits)" % bitlen
+ "({} bits)".format(bitlen)
)
- self.transport._log(DEBUG, "Got server p (%d bits)" % bitlen) # noqa
+ self.transport._log(
+ DEBUG, "Got server p ({} bits)".format(bitlen)
+ ) # noqa
self._generate_x()
# now compute e = g^x mod p
self.e = pow(self.g, self.x, self.p)
@@ -645,9 +653,14 @@ class KexGSSGex(object):
err_msg = m.get_string()
m.get_string() # we don't care about the language (lang_tag)!
raise SSHException(
- "GSS-API Error:\nMajor Status: %s\nMinor Status: %s\
- \nError Message: %s\n"
- ) % (str(maj_status), str(min_status), err_msg)
+ """GSS-API Error:
+Major Status: {}
+Minor Status: {}
+Error Message: {}
+""".format(
+ maj_status, min_status, err_msg
+ )
+ )
class NullHostKey(object):
diff --git a/paramiko/message.py b/paramiko/message.py
index 869ac6c6..dead3508 100644
--- a/paramiko/message.py
+++ b/paramiko/message.py
@@ -187,7 +187,7 @@ class Message(object):
def get_list(self):
"""
- Fetch a `list` of `strings <str>` from the stream.
+ Fetch a list of `strings <str>` from the stream.
These are trivially encoded as comma-separated values in a string.
"""
@@ -281,7 +281,7 @@ class Message(object):
a single string of values separated by commas. (Yes, really, that's
how SSH2 does it.)
- :param list l: list of strings to add
+ :param l: list of strings to add
"""
self.add_string(",".join(l))
return self
diff --git a/paramiko/packet.py b/paramiko/packet.py
index b538f3ba..cb46835e 100644
--- a/paramiko/packet.py
+++ b/paramiko/packet.py
@@ -382,7 +382,7 @@ class Packetizer(object):
if cmd in MSG_NAMES:
cmd_name = MSG_NAMES[cmd]
else:
- cmd_name = "$%x" % cmd
+ cmd_name = "${:x}".format(cmd)
orig_len = len(data)
self.__write_lock.acquire()
try:
@@ -392,7 +392,7 @@ class Packetizer(object):
if self.__dump_packets:
self._log(
DEBUG,
- "Write packet <%s>, length %d" % (cmd_name, orig_len),
+ "Write packet <{}>, length {}".format(cmd_name, orig_len),
)
self._log(DEBUG, util.format_binary(packet, "OUT: "))
if self.__block_engine_out is not None:
@@ -420,10 +420,9 @@ class Packetizer(object):
)
if sent_too_much and not self.__need_rekey:
# only ask once for rekeying
+ msg = "Rekeying (hit {} packets, {} bytes sent)"
self._log(
- DEBUG,
- "Rekeying (hit %d packets, %d bytes sent)"
- % (self.__sent_packets, self.__sent_bytes),
+ DEBUG, msg.format(self.__sent_packets, self.__sent_bytes)
)
self.__received_bytes_overflow = 0
self.__received_packets_overflow = 0
@@ -476,7 +475,9 @@ class Packetizer(object):
if self.__dump_packets:
self._log(
DEBUG,
- "Got payload (%d bytes, %d padding)" % (packet_size, padding),
+ "Got payload ({} bytes, {} padding)".format(
+ packet_size, padding
+ ),
)
if self.__compress_engine_in is not None:
@@ -508,10 +509,10 @@ class Packetizer(object):
self.__received_bytes >= self.REKEY_BYTES
):
# only ask once for rekeying
+ err = "Rekeying (hit {} packets, {} bytes received)"
self._log(
DEBUG,
- "Rekeying (hit %d packets, %d bytes received)"
- % (self.__received_packets, self.__received_bytes),
+ err.format(self.__received_packets, self.__received_bytes),
)
self.__received_bytes_overflow = 0
self.__received_packets_overflow = 0
@@ -521,10 +522,11 @@ class Packetizer(object):
if cmd in MSG_NAMES:
cmd_name = MSG_NAMES[cmd]
else:
- cmd_name = "$%x" % cmd
+ cmd_name = "${:x}".format(cmd)
if self.__dump_packets:
self._log(
- DEBUG, "Read packet <%s>, length %d" % (cmd_name, len(payload))
+ DEBUG,
+ "Read packet <{}>, length {}".format(cmd_name, len(payload)),
)
return cmd, msg
diff --git a/paramiko/pkey.py b/paramiko/pkey.py
index 50e989f6..e37f7751 100644
--- a/paramiko/pkey.py
+++ b/paramiko/pkey.py
@@ -310,9 +310,10 @@ class PKey(object):
# unencryped: done
return data
# encrypted keyfile: will need a password
- if headers["proc-type"] != "4,ENCRYPTED":
+ proc_type = headers["proc-type"]
+ if proc_type != "4,ENCRYPTED":
raise SSHException(
- 'Unknown private key structure "%s"' % headers["proc-type"]
+ 'Unknown private key structure "{}"'.format(proc_type)
)
try:
encryption_type, saltstr = headers["dek-info"].split(",")
@@ -320,7 +321,7 @@ class PKey(object):
raise SSHException("Can't parse DEK-info in private key file")
if encryption_type not in self._CIPHER_TABLE:
raise SSHException(
- 'Unknown private key cipher "%s"' % encryption_type
+ 'Unknown private key cipher "{}"'.format(encryption_type)
)
# if no password was passed in,
# raise an exception pointing out that we need one
@@ -411,7 +412,7 @@ class PKey(object):
# (requires going back into per-type subclasses.)
msg.get_string()
else:
- err = "Invalid key (class: {0}, data type: {1}"
+ err = "Invalid key (class: {}, data type: {}"
raise SSHException(err.format(self.__class__.__name__, type_))
def load_certificate(self, value):
@@ -441,7 +442,7 @@ class PKey(object):
constructor = "from_string"
blob = getattr(PublicBlob, constructor)(value)
if not blob.key_type.startswith(self.get_name()):
- err = "PublicBlob type {0} incompatible with key type {1}"
+ err = "PublicBlob type {} incompatible with key type {}"
raise ValueError(err.format(blob.key_type, self.get_name()))
self.public_blob = blob
@@ -493,7 +494,7 @@ class PublicBlob(object):
"""
fields = string.split(None, 2)
if len(fields) < 2:
- msg = "Not enough fields for public blob: {0}"
+ msg = "Not enough fields for public blob: {}"
raise ValueError(msg.format(fields))
key_type = fields[0]
key_blob = decodebytes(b(fields[1]))
@@ -506,8 +507,10 @@ class PublicBlob(object):
m = Message(key_blob)
blob_type = m.get_text()
if blob_type != key_type:
- msg = "Invalid PublicBlob contents: key type={0!r}, but blob type={1!r}" # noqa
- raise ValueError(msg.format(key_type, blob_type))
+ deets = "key type={!r}, but blob type={!r}".format(
+ key_type, blob_type
+ )
+ raise ValueError("Invalid PublicBlob contents: {}".format(deets))
# All good? All good.
return cls(type_=key_type, blob=key_blob, comment=comment)
@@ -523,9 +526,9 @@ class PublicBlob(object):
return cls(type_=type_, blob=message.asbytes())
def __str__(self):
- ret = "{0} public key/certificate".format(self.key_type)
+ ret = "{} public key/certificate".format(self.key_type)
if self.comment:
- ret += "- {0}".format(self.comment)
+ ret += "- {}".format(self.comment)
return ret
def __eq__(self, other):
diff --git a/paramiko/primes.py b/paramiko/primes.py
index 22e536b8..8dff7683 100644
--- a/paramiko/primes.py
+++ b/paramiko/primes.py
@@ -61,9 +61,15 @@ class ModulusPack(object):
self.discarded = []
def _parse_modulus(self, line):
- timestamp, mod_type, tests, tries, size, generator, modulus = (
- line.split()
- )
+ (
+ timestamp,
+ mod_type,
+ tests,
+ tries,
+ size,
+ generator,
+ modulus,
+ ) = line.split()
mod_type = int(mod_type)
tests = int(tests)
tries = int(tries)
@@ -93,7 +99,7 @@ class ModulusPack(object):
bl = util.bit_length(modulus)
if (bl != size) and (bl != size + 1):
self.discarded.append(
- (modulus, "incorrectly reported bit length %d" % size)
+ (modulus, "incorrectly reported bit length {}".format(size))
)
return
if bl not in self.pack:
diff --git a/paramiko/py3compat.py b/paramiko/py3compat.py
index 314c557a..cbe20ca6 100644
--- a/paramiko/py3compat.py
+++ b/paramiko/py3compat.py
@@ -62,7 +62,7 @@ if PY2:
elif isinstance(s, buffer): # NOQA
return s
else:
- raise TypeError("Expected unicode or bytes, got %r" % s)
+ raise TypeError("Expected unicode or bytes, got {!r}".format(s))
def u(s, encoding="utf8"): # NOQA
"""cast bytes or unicode to unicode"""
@@ -73,7 +73,7 @@ if PY2:
elif isinstance(s, buffer): # NOQA
return s.decode(encoding)
else:
- raise TypeError("Expected unicode or bytes, got %r" % s)
+ raise TypeError("Expected unicode or bytes, got {!r}".format(s))
def b2s(s):
return s
@@ -148,7 +148,7 @@ else:
elif isinstance(s, str):
return s.encode(encoding)
else:
- raise TypeError("Expected unicode or bytes, got %r" % s)
+ raise TypeError("Expected unicode or bytes, got {!r}".format(s))
def u(s, encoding="utf8"):
"""cast bytes or unicode to unicode"""
@@ -157,7 +157,7 @@ else:
elif isinstance(s, str):
return s
else:
- raise TypeError("Expected unicode or bytes, got %r" % s)
+ raise TypeError("Expected unicode or bytes, got {!r}".format(s))
def b2s(s):
return s.decode() if isinstance(s, bytes) else s
diff --git a/paramiko/server.py b/paramiko/server.py
index 3eb1a170..2fe9cc19 100644
--- a/paramiko/server.py
+++ b/paramiko/server.py
@@ -226,7 +226,7 @@ class ServerInterface(object):
The default implementation always returns ``AUTH_FAILED``.
- :param list responses: list of `str` responses from the client
+ :param responses: list of `str` responses from the client
:return:
``AUTH_FAILED`` if the authentication fails; ``AUTH_SUCCESSFUL`` if
it succeeds; ``AUTH_PARTIALLY_SUCCESSFUL`` if the interactive auth
@@ -678,13 +678,13 @@ class SubsystemHandler(threading.Thread):
def _run(self):
try:
self.__transport._log(
- DEBUG, "Starting handler for subsystem %s" % self.__name
+ DEBUG, "Starting handler for subsystem {}".format(self.__name)
)
self.start_subsystem(self.__name, self.__transport, self.__channel)
except Exception as e:
self.__transport._log(
ERROR,
- 'Exception in subsystem handler for "{0}": {1}'.format(
+ 'Exception in subsystem handler for "{}": {}'.format(
self.__name, e
),
)
diff --git a/paramiko/sftp_attr.py b/paramiko/sftp_attr.py
index 5ef8ff14..f16ac746 100644
--- a/paramiko/sftp_attr.py
+++ b/paramiko/sftp_attr.py
@@ -82,7 +82,7 @@ class SFTPAttributes(object):
return attr
def __repr__(self):
- return "<SFTPAttributes: %s>" % self._debug_str()
+ return "<SFTPAttributes: {}>".format(self._debug_str())
# ...internals...
@classmethod
@@ -146,15 +146,15 @@ class SFTPAttributes(object):
def _debug_str(self):
out = "[ "
if self.st_size is not None:
- out += "size=%d " % self.st_size
+ out += "size={} ".format(self.st_size)
if (self.st_uid is not None) and (self.st_gid is not None):
- out += "uid=%d gid=%d " % (self.st_uid, self.st_gid)
+ out += "uid={} gid={} ".format(self.st_uid, self.st_gid)
if self.st_mode is not None:
out += "mode=" + oct(self.st_mode) + " "
if (self.st_atime is not None) and (self.st_mtime is not None):
- out += "atime=%d mtime=%d " % (self.st_atime, self.st_mtime)
+ out += "atime={} mtime={} ".format(self.st_atime, self.st_mtime)
for k, v in self.attr.items():
- out += '"%s"=%r ' % (str(k), v)
+ out += '"{}"={!r} '.format(str(k), v)
out += "]"
return out
@@ -227,6 +227,9 @@ class SFTPAttributes(object):
if size is None:
size = 0
+ # TODO: not sure this actually worked as expected beforehand, leaving
+ # it untouched for the time being, re: .format() upgrade, until someone
+ # has time to doublecheck
return "%s 1 %-8d %-8d %8d %-12s %s" % (
ks,
uid,
diff --git a/paramiko/sftp_client.py b/paramiko/sftp_client.py
index 425aa87d..1a9147fc 100644
--- a/paramiko/sftp_client.py
+++ b/paramiko/sftp_client.py
@@ -131,7 +131,10 @@ class SFTPClient(BaseSFTP, ClosingContextManager):
except EOFError:
raise SSHException("EOF during negotiation")
self._log(
- INFO, "Opened sftp connection (server version %d)" % server_version
+ INFO,
+ "Opened sftp connection (server version {})".format(
+ server_version
+ ),
)
@classmethod
@@ -171,6 +174,8 @@ class SFTPClient(BaseSFTP, ClosingContextManager):
for m in msg:
self._log(level, m, *args)
else:
+ # NOTE: these bits MUST continue using %-style format junk because
+ # logging.Logger.log() explicitly requires it. Grump.
# escape '%' in msg (they could come from file or directory names)
# before logging
msg = msg.replace("%", "%%")
@@ -230,7 +235,7 @@ class SFTPClient(BaseSFTP, ClosingContextManager):
.. versionadded:: 1.2
"""
path = self._adjust_cwd(path)
- self._log(DEBUG, "listdir(%r)" % path)
+ self._log(DEBUG, "listdir({!r})".format(path))
t, msg = self._request(CMD_OPENDIR, path)
if t != CMD_HANDLE:
raise SFTPError("Expected handle")
@@ -269,7 +274,7 @@ class SFTPClient(BaseSFTP, ClosingContextManager):
.. versionadded:: 1.15
"""
path = self._adjust_cwd(path)
- self._log(DEBUG, "listdir(%r)" % path)
+ self._log(DEBUG, "listdir({!r})".format(path))
t, msg = self._request(CMD_OPENDIR, path)
if t != CMD_HANDLE:
@@ -351,7 +356,7 @@ class SFTPClient(BaseSFTP, ClosingContextManager):
:raises: ``IOError`` -- if the file could not be opened.
"""
filename = self._adjust_cwd(filename)
- self._log(DEBUG, "open(%r, %r)" % (filename, mode))
+ self._log(DEBUG, "open({!r}, {!r})".format(filename, mode))
imode = 0
if ("r" in mode) or ("+" in mode):
imode |= SFTP_FLAG_READ
@@ -369,7 +374,10 @@ class SFTPClient(BaseSFTP, ClosingContextManager):
raise SFTPError("Expected handle")
handle = msg.get_binary()
self._log(
- DEBUG, "open(%r, %r) -> %s" % (filename, mode, u(hexlify(handle)))
+ DEBUG,
+ "open({!r}, {!r}) -> {}".format(
+ filename, mode, u(hexlify(handle))
+ ),
)
return SFTPFile(self, handle, mode, bufsize)
@@ -386,7 +394,7 @@ class SFTPClient(BaseSFTP, ClosingContextManager):
:raises: ``IOError`` -- if the path refers to a folder (directory)
"""
path = self._adjust_cwd(path)
- self._log(DEBUG, "remove(%r)" % path)
+ self._log(DEBUG, "remove({!r})".format(path))
self._request(CMD_REMOVE, path)
unlink = remove
@@ -411,7 +419,7 @@ class SFTPClient(BaseSFTP, ClosingContextManager):
"""
oldpath = self._adjust_cwd(oldpath)
newpath = self._adjust_cwd(newpath)
- self._log(DEBUG, "rename(%r, %r)" % (oldpath, newpath))
+ self._log(DEBUG, "rename({!r}, {!r})".format(oldpath, newpath))
self._request(CMD_RENAME, oldpath, newpath)
def posix_rename(self, oldpath, newpath):
@@ -431,7 +439,7 @@ class SFTPClient(BaseSFTP, ClosingContextManager):
"""
oldpath = self._adjust_cwd(oldpath)
newpath = self._adjust_cwd(newpath)
- self._log(DEBUG, "posix_rename(%r, %r)" % (oldpath, newpath))
+ self._log(DEBUG, "posix_rename({!r}, {!r})".format(oldpath, newpath))
self._request(
CMD_EXTENDED, "posix-rename@openssh.com", oldpath, newpath
)
@@ -446,7 +454,7 @@ class SFTPClient(BaseSFTP, ClosingContextManager):
:param int mode: permissions (posix-style) for the newly-created folder
"""
path = self._adjust_cwd(path)
- self._log(DEBUG, "mkdir(%r, %r)" % (path, mode))
+ self._log(DEBUG, "mkdir({!r}, {!r})".format(path, mode))
attr = SFTPAttributes()
attr.st_mode = mode
self._request(CMD_MKDIR, path, attr)
@@ -458,7 +466,7 @@ class SFTPClient(BaseSFTP, ClosingContextManager):
:param str path: name of the folder to remove
"""
path = self._adjust_cwd(path)
- self._log(DEBUG, "rmdir(%r)" % path)
+ self._log(DEBUG, "rmdir({!r})".format(path))
self._request(CMD_RMDIR, path)
def stat(self, path):
@@ -481,7 +489,7 @@ class SFTPClient(BaseSFTP, ClosingContextManager):
file
"""
path = self._adjust_cwd(path)
- self._log(DEBUG, "stat(%r)" % path)
+ self._log(DEBUG, "stat({!r})".format(path))
t, msg = self._request(CMD_STAT, path)
if t != CMD_ATTRS:
raise SFTPError("Expected attributes")
@@ -499,7 +507,7 @@ class SFTPClient(BaseSFTP, ClosingContextManager):
file
"""
path = self._adjust_cwd(path)
- self._log(DEBUG, "lstat(%r)" % path)
+ self._log(DEBUG, "lstat({!r})".format(path))
t, msg = self._request(CMD_LSTAT, path)
if t != CMD_ATTRS:
raise SFTPError("Expected attributes")
@@ -513,7 +521,7 @@ class SFTPClient(BaseSFTP, ClosingContextManager):
:param str dest: path of the newly created symlink
"""
dest = self._adjust_cwd(dest)
- self._log(DEBUG, "symlink(%r, %r)" % (source, dest))
+ self._log(DEBUG, "symlink({!r}, {!r})".format(source, dest))
source = bytestring(source)
self._request(CMD_SYMLINK, source, dest)
@@ -527,7 +535,7 @@ class SFTPClient(BaseSFTP, ClosingContextManager):
:param int mode: new permissions
"""
path = self._adjust_cwd(path)
- self._log(DEBUG, "chmod(%r, %r)" % (path, mode))
+ self._log(DEBUG, "chmod({!r}, {!r})".format(path, mode))
attr = SFTPAttributes()
attr.st_mode = mode
self._request(CMD_SETSTAT, path, attr)
@@ -544,7 +552,7 @@ class SFTPClient(BaseSFTP, ClosingContextManager):
:param int gid: new group id
"""
path = self._adjust_cwd(path)
- self._log(DEBUG, "chown(%r, %r, %r)" % (path, uid, gid))
+ self._log(DEBUG, "chown({!r}, {!r}, {!r})".format(path, uid, gid))
attr = SFTPAttributes()
attr.st_uid, attr.st_gid = uid, gid
self._request(CMD_SETSTAT, path, attr)
@@ -566,7 +574,7 @@ class SFTPClient(BaseSFTP, ClosingContextManager):
path = self._adjust_cwd(path)
if times is None:
times = (time.time(), time.time())
- self._log(DEBUG, "utime(%r, %r)" % (path, times))
+ self._log(DEBUG, "utime({!r}, {!r})".format(path, times))
attr = SFTPAttributes()
attr.st_atime, attr.st_mtime = times
self._request(CMD_SETSTAT, path, attr)
@@ -581,7 +589,7 @@ class SFTPClient(BaseSFTP, ClosingContextManager):
:param int size: the new size of the file
"""
path = self._adjust_cwd(path)
- self._log(DEBUG, "truncate(%r, %r)" % (path, size))
+ self._log(DEBUG, "truncate({!r}, {!r})".format(path, size))
attr = SFTPAttributes()
attr.st_size = size
self._request(CMD_SETSTAT, path, attr)
@@ -596,7 +604,7 @@ class SFTPClient(BaseSFTP, ClosingContextManager):
:return: target path, as a `str`
"""
path = self._adjust_cwd(path)
- self._log(DEBUG, "readlink(%r)" % path)
+ self._log(DEBUG, "readlink({!r})".format(path))
t, msg = self._request(CMD_READLINK, path)
if t != CMD_NAME:
raise SFTPError("Expected name response")
@@ -604,7 +612,7 @@ class SFTPClient(BaseSFTP, ClosingContextManager):
if count == 0:
return None
if count != 1:
- raise SFTPError("Readlink returned %d results" % count)
+ raise SFTPError("Readlink returned {} results".format(count))
return _to_unicode(msg.get_string())
def normalize(self, path):
@@ -620,13 +628,13 @@ class SFTPClient(BaseSFTP, ClosingContextManager):
:raises: ``IOError`` -- if the path can't be resolved on the server
"""
path = self._adjust_cwd(path)
- self._log(DEBUG, "normalize(%r)" % path)
+ self._log(DEBUG, "normalize({!r})".format(path))
t, msg = self._request(CMD_REALPATH, path)
if t != CMD_NAME:
raise SFTPError("Expected name response")
count = msg.get_int()
if count != 1:
- raise SFTPError("Realpath returned %d results" % count)
+ raise SFTPError("Realpath returned {} results".format(count))
return msg.get_text()
def chdir(self, path=None):
@@ -649,9 +657,8 @@ class SFTPClient(BaseSFTP, ClosingContextManager):
self._cwd = None
return
if not stat.S_ISDIR(self.stat(path).st_mode):
- raise SFTPError(
- errno.ENOTDIR, "%s: %s" % (os.strerror(errno.ENOTDIR), path)
- )
+ code = errno.ENOTDIR
+ raise SFTPError(code, "{}: {}".format(os.strerror(code), path))
self._cwd = b(self.normalize(path))
def getcwd(self):
@@ -713,7 +720,7 @@ class SFTPClient(BaseSFTP, ClosingContextManager):
s = self.stat(remotepath)
if s.st_size != size:
raise IOError(
- "size mismatch in put! %d != %d" % (s.st_size, size)
+ "size mismatch in put! {} != {}".format(s.st_size, size)
)
else:
s = SFTPAttributes()
@@ -796,7 +803,7 @@ class SFTPClient(BaseSFTP, ClosingContextManager):
s = os.stat(localpath)
if s.st_size != size:
raise IOError(
- "size mismatch in get! %d != %d" % (s.st_size, size)
+ "size mismatch in get! {} != {}".format(s.st_size, size)
)
# ...internals...
@@ -835,7 +842,7 @@ class SFTPClient(BaseSFTP, ClosingContextManager):
try:
t, data = self._read_packet()
except EOFError as e:
- raise SSHException("Server connection dropped: %s" % str(e))
+ raise SSHException("Server connection dropped: {}".format(e))
msg = Message(data)
num = msg.get_int()
self._lock.acquire()
@@ -843,7 +850,7 @@ class SFTPClient(BaseSFTP, ClosingContextManager):
if num not in self._expecting:
# might be response for a file that was closed before
# responses came back
- self._log(DEBUG, "Unexpected response #%d" % (num,))
+ self._log(DEBUG, "Unexpected response #{}".format(num))
if waitfor is None:
# just doing a single check
break
diff --git a/paramiko/sftp_file.py b/paramiko/sftp_file.py
index 08003e43..0104d857 100644
--- a/paramiko/sftp_file.py
+++ b/paramiko/sftp_file.py
@@ -91,7 +91,7 @@ class SFTPFile(BufferedFile):
# __del__.)
if self._closed:
return
- self.sftp._log(DEBUG, "close(%s)" % u(hexlify(self.handle)))
+ self.sftp._log(DEBUG, "close({})".format(u(hexlify(self.handle))))
if self.pipelined:
self.sftp._finish_responses(self)
BufferedFile.close(self)
@@ -293,7 +293,9 @@ class SFTPFile(BufferedFile):
:param int mode: new permissions
"""
- self.sftp._log(DEBUG, "chmod(%s, %r)" % (hexlify(self.handle), mode))
+ self.sftp._log(
+ DEBUG, "chmod({}, {!r})".format(hexlify(self.handle), mode)
+ )
attr = SFTPAttributes()
attr.st_mode = mode
self.sftp._request(CMD_FSETSTAT, self.handle, attr)
@@ -309,7 +311,8 @@ class SFTPFile(BufferedFile):
:param int gid: new group id
"""
self.sftp._log(
- DEBUG, "chown(%s, %r, %r)" % (hexlify(self.handle), uid, gid)
+ DEBUG,
+ "chown({}, {!r}, {!r})".format(hexlify(self.handle), uid, gid),
)
attr = SFTPAttributes()
attr.st_uid, attr.st_gid = uid, gid
@@ -330,7 +333,9 @@ class SFTPFile(BufferedFile):
"""
if times is None:
times = (time.time(), time.time())
- self.sftp._log(DEBUG, "utime(%s, %r)" % (hexlify(self.handle), times))
+ self.sftp._log(
+ DEBUG, "utime({}, {!r})".format(hexlify(self.handle), times)
+ )
attr = SFTPAttributes()
attr.st_atime, attr.st_mtime = times
self.sftp._request(CMD_FSETSTAT, self.handle, attr)
@@ -344,7 +349,7 @@ class SFTPFile(BufferedFile):
:param size: the new size of the file
"""
self.sftp._log(
- DEBUG, "truncate(%s, %r)" % (hexlify(self.handle), size)
+ DEBUG, "truncate({}, {!r})".format(hexlify(self.handle), size)
)
attr = SFTPAttributes()
attr.st_size = size
@@ -484,7 +489,9 @@ class SFTPFile(BufferedFile):
.. versionadded:: 1.5.4
"""
- self.sftp._log(DEBUG, "readv(%s, %r)" % (hexlify(self.handle), chunks))
+ self.sftp._log(
+ DEBUG, "readv({}, {!r})".format(hexlify(self.handle), chunks)
+ )
read_chunks = []
for offset, size in chunks:
diff --git a/paramiko/sftp_server.py b/paramiko/sftp_server.py
index 5c23ea2b..8265df96 100644
--- a/paramiko/sftp_server.py
+++ b/paramiko/sftp_server.py
@@ -138,7 +138,7 @@ class SFTPServer(BaseSFTP, SubsystemHandler):
def start_subsystem(self, name, transport, channel):
self.sock = channel
- self._log(DEBUG, "Started sftp server on channel %s" % repr(channel))
+ self._log(DEBUG, "Started sftp server on channel {!r}".format(channel))
self._send_server_version()
self.server.session_started()
while True:
@@ -238,9 +238,7 @@ class SFTPServer(BaseSFTP, SubsystemHandler):
item._pack(msg)
else:
raise Exception(
- "unknown type for {0!r} type {1!r}".format(
- item, type(item)
- )
+ "unknown type for {!r} type {!r}".format(item, type(item))
)
self._send_packet(t, msg)
@@ -249,7 +247,7 @@ class SFTPServer(BaseSFTP, SubsystemHandler):
# must be error code
self._send_status(request_number, handle)
return
- handle._set_name(b("hx%d" % self.next_handle))
+ handle._set_name(b("hx{:d}".format(self.next_handle)))
self.next_handle += 1
if folder:
self.folder_table[handle._get_name()] = handle
@@ -378,7 +376,7 @@ class SFTPServer(BaseSFTP, SubsystemHandler):
return flags
def _process(self, t, request_number, msg):
- self._log(DEBUG, "Request: %s" % CMD_NAMES[t])
+ self._log(DEBUG, "Request: {}".format(CMD_NAMES[t]))
if t == CMD_OPEN:
path = msg.get_text()
flags = self._convert_pflags(msg.get_int())
diff --git a/paramiko/ssh_exception.py b/paramiko/ssh_exception.py
index 4865288f..52bb23be 100644
--- a/paramiko/ssh_exception.py
+++ b/paramiko/ssh_exception.py
@@ -67,7 +67,7 @@ class BadAuthenticationType(AuthenticationException):
self.args = (explanation, types)
def __str__(self):
- return "{0} (allowed_types={1!r})".format(
+ return "{} (allowed_types={!r})".format(
SSHException.__str__(self), self.allowed_types
)
@@ -115,7 +115,7 @@ class BadHostKeyException(SSHException):
def __init__(self, hostname, got_key, expected_key):
message = (
- "Host key for server {0} does not match: got {1}, expected {2}"
+ "Host key for server {} does not match: got {}, expected {}"
) # noqa
message = message.format(
hostname, got_key.get_base64(), expected_key.get_base64()
@@ -139,8 +139,9 @@ class ProxyCommandFailure(SSHException):
def __init__(self, command, error):
SSHException.__init__(
self,
- '"ProxyCommand (%s)" returned non-zero exit status: %s'
- % (command, error),
+ '"ProxyCommand ({})" returned non-zero exit status: {}'.format(
+ command, error
+ ),
)
self.error = error
# for unpickling
diff --git a/paramiko/ssh_gss.py b/paramiko/ssh_gss.py
index 025d9928..eb8826e0 100644
--- a/paramiko/ssh_gss.py
+++ b/paramiko/ssh_gss.py
@@ -296,9 +296,7 @@ class _SSH_GSSAPI(_SSH_GSSAuth):
else:
token = self._gss_ctxt.step(recv_token)
except gssapi.GSSException:
- message = "{0} Target: {1}".format(
- sys.exc_info()[1], self._gss_host
- )
+ message = "{} Target: {}".format(sys.exc_info()[1], self._gss_host)
raise gssapi.GSSException(message)
self._gss_ctxt_status = self._gss_ctxt.established
return token
@@ -461,7 +459,7 @@ class _SSH_SSPI(_SSH_GSSAuth):
error, token = self._gss_ctxt.authorize(recv_token)
token = token[0].Buffer
except pywintypes.error as e:
- e.strerror += ", Target: {1}".format(e, self._gss_host)
+ e.strerror += ", Target: {}".format(e, self._gss_host)
raise
if error == 0:
diff --git a/paramiko/transport.py b/paramiko/transport.py
index 22348f87..f72eebaf 100644
--- a/paramiko/transport.py
+++ b/paramiko/transport.py
@@ -139,7 +139,7 @@ class Transport(threading.Thread, ClosingContextManager):
_DECRYPT = object()
_PROTO_ID = "2.0"
- _CLIENT_ID = "paramiko_%s" % paramiko.__version__
+ _CLIENT_ID = "paramiko_{}".format(paramiko.__version__)
# These tuples of algorithm identifiers are in preference order; do not
# reorder without reason!
@@ -369,7 +369,7 @@ class Transport(threading.Thread, ClosingContextManager):
break
else:
raise SSHException(
- "Unable to connect to %s: %s" % (hostname, reason)
+ "Unable to connect to {}: {}".format(hostname, reason)
)
# okay, normal socket-ish flow here...
threading.Thread.__init__(self)
@@ -456,17 +456,20 @@ class Transport(threading.Thread, ClosingContextManager):
"""
Returns a string representation of this object, for debugging.
"""
- out = "<paramiko.Transport at %s" % hex(long(id(self)) & xffffffff)
+ id_ = hex(long(id(self)) & xffffffff)
+ out = "<paramiko.Transport at {}".format(id_)
if not self.active:
out += " (unconnected)"
else:
if self.local_cipher != "":
- out += " (cipher %s, %d bits)" % (
+ out += " (cipher {}, {:d} bits)".format(
self.local_cipher,
self._cipher_info[self.local_cipher]["key-size"] * 8,
)
if self.is_authenticated():
- out += " (active; %d open channel(s))" % len(self._channels)
+ out += " (active; {} open channel(s))".format(
+ len(self._channels)
+ )
elif self.initial_kex_done:
out += " (connected; awaiting auth)"
else:
@@ -1106,7 +1109,7 @@ class Transport(threading.Thread, ClosingContextManager):
m.add_boolean(wait)
if data is not None:
m.add(*data)
- self._log(DEBUG, 'Sending global request "%s"' % kind)
+ self._log(DEBUG, 'Sending global request "{}"'.format(kind))
self._send_user_message(m)
if not wait:
return None
@@ -1226,15 +1229,20 @@ class Transport(threading.Thread, ClosingContextManager):
self._log(DEBUG, "Bad host key from server")
self._log(
DEBUG,
- "Expected: %s: %s"
- % (hostkey.get_name(), repr(hostkey.asbytes())),
+ "Expected: {}: {}".format(
+ hostkey.get_name(), repr(hostkey.asbytes())
+ ),
)
self._log(
DEBUG,
- "Got : %s: %s" % (key.get_name(), repr(key.asbytes())),
+ "Got : {}: {}".format(
+ key.get_name(), repr(key.asbytes())
+ ),
)
raise SSHException("Bad host key from server")
- self._log(DEBUG, "Host key verified (%s)" % hostkey.get_name())
+ self._log(
+ DEBUG, "Host key verified ({})".format(hostkey.get_name())
+ )
if (pkey is not None) or (password is not None) or gss_auth or gss_kex:
if gss_auth:
@@ -1345,7 +1353,7 @@ class Transport(threading.Thread, ClosingContextManager):
:param str username: the username to authenticate as
:return:
- `list` of auth types permissible for the next stage of
+ list of auth types permissible for the next stage of
authentication (normally empty)
:raises:
@@ -1400,7 +1408,7 @@ class Transport(threading.Thread, ClosingContextManager):
``True`` if an attempt at an automated "interactive" password auth
should be made if the server doesn't support normal password auth
:return:
- `list` of auth types permissible for the next stage of
+ list of auth types permissible for the next stage of
authentication (normally empty)
:raises:
@@ -1473,7 +1481,7 @@ class Transport(threading.Thread, ClosingContextManager):
an event to trigger when the authentication attempt is complete
(whether it was successful or not)
:return:
- `list` of auth types permissible for the next stage of
+ list of auth types permissible for the next stage of
authentication (normally empty)
:raises:
@@ -1531,7 +1539,7 @@ class Transport(threading.Thread, ClosingContextManager):
:param callable handler: a handler for responding to server questions
:param str submethods: a string list of desired submethods (optional)
:return:
- `list` of auth types permissible for the next stage of
+ list of auth types permissible for the next stage of
authentication (normally empty).
:raises: `.BadAuthenticationType` -- if public-key authentication isn't
@@ -1583,7 +1591,6 @@ class Transport(threading.Thread, ClosingContextManager):
:param bool gss_deleg_creds: Delegate credentials or not
:return: list of auth types permissible for the next stage of
authentication (normally empty)
- :rtype: list
:raises: `.BadAuthenticationType` -- if gssapi-with-mic isn't
allowed by the server (and no event was passed in)
:raises:
@@ -1607,7 +1614,7 @@ class Transport(threading.Thread, ClosingContextManager):
:param str username: The username to authenticate as.
:returns:
- a `list` of auth types permissible for the next stage of
+ a list of auth types permissible for the next stage of
authentication (normally empty)
:raises: `.BadAuthenticationType` --
if GSS-API Key Exchange was not performed (and no event was passed
@@ -1805,7 +1812,9 @@ class Transport(threading.Thread, ClosingContextManager):
raise SSHException("Unknown host key type")
if not key.verify_ssh_sig(self.H, Message(sig)):
raise SSHException(
- "Signature verification (%s) failed." % self.host_key_type
+ "Signature verification ({}) failed.".format(
+ self.host_key_type
+ )
) # noqa
self.host_key = key
@@ -1819,9 +1828,8 @@ class Transport(threading.Thread, ClosingContextManager):
# Fallback to SHA1 for kex engines that fail to specify a hex
# algorithm, or for e.g. transport tests that don't run kexinit.
hash_algo = getattr(self.kex_engine, "hash_algo", None)
- hash_select_msg = "kex engine %s specified hash_algo %r" % (
- self.kex_engine.__class__.__name__,
- hash_algo,
+ hash_select_msg = "kex engine {} specified hash_algo {!r}".format(
+ self.kex_engine.__class__.__name__, hash_algo
)
if hash_algo is None:
hash_algo = sha1
@@ -1945,14 +1953,15 @@ class Transport(threading.Thread, ClosingContextManager):
_active_threads.append(self)
tid = hex(long(id(self)) & xffffffff)
if self.server_mode:
- self._log(DEBUG, "starting thread (server mode): %s" % tid)
+ self._log(DEBUG, "starting thread (server mode): {}".format(tid))
else:
- self._log(DEBUG, "starting thread (client mode): %s" % tid)
+ self._log(DEBUG, "starting thread (client mode): {}".format(tid))
try:
try:
self.packetizer.write_all(b(self.local_version + "\r\n"))
self._log(
- DEBUG, "Local version/idstring: %s" % self.local_version
+ DEBUG,
+ "Local version/idstring: {}".format(self.local_version),
) # noqa
self._check_banner()
# The above is actually very much part of the handshake, but
@@ -1984,8 +1993,9 @@ class Transport(threading.Thread, ClosingContextManager):
if len(self._expected_packet) > 0:
if ptype not in self._expected_packet:
raise SSHException(
- "Expecting packet from %r, got %d"
- % (self._expected_packet, ptype)
+ "Expecting packet from {!r}, got {:d}".format(
+ self._expected_packet, ptype
+ )
) # noqa
self._expected_packet = tuple()
if (ptype >= 30) and (ptype <= 41):
@@ -2006,15 +2016,17 @@ class Transport(threading.Thread, ClosingContextManager):
elif chanid in self.channels_seen:
self._log(
DEBUG,
- "Ignoring message for dead channel %d"
- % chanid,
- ) # noqa
+ "Ignoring message for dead channel {:d}".format( # noqa
+ chanid
+ ),
+ )
else:
self._log(
ERROR,
- "Channel request for unknown channel %d"
- % chanid,
- ) # noqa
+ "Channel request for unknown channel {:d}".format( # noqa
+ chanid
+ ),
+ )
break
elif (
self.auth_handler is not None
@@ -2030,7 +2042,7 @@ class Transport(threading.Thread, ClosingContextManager):
# itself literally MSG_UNIMPLEMENTED, in which case, we
# just shut up to avoid causing a useless loop).
name = MSG_NAMES[ptype]
- warning = "Oops, unhandled type {0} ({1!r})".format(
+ warning = "Oops, unhandled type {} ({!r})".format(
ptype, name
)
self._log(WARNING, warning)
@@ -2050,7 +2062,7 @@ class Transport(threading.Thread, ClosingContextManager):
except socket.error as e:
if type(e.args) is tuple:
if e.args:
- emsg = "%s (%d)" % (e.args[1], e.args[0])
+ emsg = "{} ({:d})".format(e.args[1], e.args[0])
else: # empty tuple, e.g. socket.timeout
emsg = str(e) or repr(e)
else:
@@ -2091,11 +2103,11 @@ class Transport(threading.Thread, ClosingContextManager):
# Log useful, non-duplicative line re: an agreed-upon algorithm.
# Old code implied algorithms could be asymmetrical (different for
# inbound vs outbound) so we preserve that possibility.
- msg = "{0} agreed: ".format(which)
+ msg = "{} agreed: ".format(which)
if local == remote:
msg += local
else:
- msg += "local={0}, remote={1}".format(local, remote)
+ msg += "local={}, remote={}".format(local, remote)
self._log(DEBUG, msg)
# protocol stages
@@ -2137,7 +2149,7 @@ class Transport(threading.Thread, ClosingContextManager):
raise SSHException('Indecipherable protocol version "' + buf + '"')
# save this server version string for later
self.remote_version = buf
- self._log(DEBUG, "Remote version/idstring: %s" % buf)
+ self._log(DEBUG, "Remote version/idstring: {}".format(buf))
# pull off any attached comment
# NOTE: comment used to be stored in a variable and then...never used.
# since 2003. ca 877cd974b8182d26fa76d566072917ea67b64e67
@@ -2151,9 +2163,9 @@ class Transport(threading.Thread, ClosingContextManager):
version = segs[1]
client = segs[2]
if version != "1.99" and version != "2.0":
- msg = "Incompatible version ({0} instead of 2.0)"
+ msg = "Incompatible version ({} instead of 2.0)"
raise SSHException(msg.format(version))
- msg = "Connected (version {0}, client {1})".format(version, client)
+ msg = "Connected (version {}, client {})".format(version, client)
self._log(INFO, msg)
def _send_kex_init(self):
@@ -2270,7 +2282,7 @@ class Transport(threading.Thread, ClosingContextManager):
"Incompatible ssh peer (no acceptable kex algorithm)"
) # noqa
self.kex_engine = self._kex_info[agreed_kex[0]](self)
- self._log(DEBUG, "Kex agreed: %s" % agreed_kex[0])
+ self._log(DEBUG, "Kex agreed: {}".format(agreed_kex[0]))
if self.server_mode:
available_server_keys = list(
@@ -2387,7 +2399,8 @@ class Transport(threading.Thread, ClosingContextManager):
len(agreed_local_compression) == 0
or len(agreed_remote_compression) == 0
):
- msg = "Incompatible ssh server (no acceptable compression) {0!r} {1!r} {2!r}" # noqa
+ msg = "Incompatible ssh server (no acceptable compression)"
+ msg += " {!r} {!r} {!r}"
raise SSHException(
msg.format(
agreed_local_compression,
@@ -2529,15 +2542,16 @@ class Transport(threading.Thread, ClosingContextManager):
def _parse_disconnect(self, m):
code = m.get_int()
desc = m.get_text()
- self._log(INFO, "Disconnect (code %d): %s" % (code, desc))
+ self._log(INFO, "Disconnect (code {:d}): {}".format(code, desc))
def _parse_global_request(self, m):
kind = m.get_text()
- self._log(DEBUG, 'Received global request "%s"' % kind)
+ self._log(DEBUG, 'Received global request "{}"'.format(kind))
want_reply = m.get_boolean()
if not self.server_mode:
self._log(
- DEBUG, 'Rejecting "%s" global request from server.' % kind
+ DEBUG,
+ 'Rejecting "{}" global request from server.'.format(kind),
)
ok = False
elif kind == "tcpip-forward":
@@ -2592,7 +2606,7 @@ class Transport(threading.Thread, ClosingContextManager):
chan._set_remote_channel(
server_chanid, server_window_size, server_max_packet_size
)
- self._log(DEBUG, "Secsh channel %d opened." % chanid)
+ self._log(DEBUG, "Secsh channel {:d} opened.".format(chanid))
if chanid in self.channel_events:
self.channel_events[chanid].set()
del self.channel_events[chanid]
@@ -2608,8 +2622,9 @@ class Transport(threading.Thread, ClosingContextManager):
reason_text = CONNECTION_FAILED_CODE.get(reason, "(unknown code)")
self._log(
ERROR,
- "Secsh channel %d open FAILED: %s: %s"
- % (chanid, reason_str, reason_text),
+ "Secsh channel {:d} open FAILED: {}: {}".format(
+ chanid, reason_str, reason_text
+ ),
)
self.lock.acquire()
try:
@@ -2644,8 +2659,9 @@ class Transport(threading.Thread, ClosingContextManager):
origin_port = m.get_int()
self._log(
DEBUG,
- "Incoming x11 connection from %s:%d"
- % (origin_addr, origin_port),
+ "Incoming x11 connection from {}:{:d}".format(
+ origin_addr, origin_port
+ ),
)
self.lock.acquire()
try:
@@ -2659,8 +2675,9 @@ class Transport(threading.Thread, ClosingContextManager):
origin_port = m.get_int()
self._log(
DEBUG,
- "Incoming tcp forwarded connection from %s:%d"
- % (origin_addr, origin_port),
+ "Incoming tcp forwarded connection from {}:{:d}".format(
+ origin_addr, origin_port
+ ),
)
self.lock.acquire()
try:
@@ -2669,7 +2686,8 @@ class Transport(threading.Thread, ClosingContextManager):
self.lock.release()
elif not self.server_mode:
self._log(
- DEBUG, 'Rejecting "%s" channel request from server.' % kind
+ DEBUG,
+ 'Rejecting "{}" channel request from server.'.format(kind),
)
reject = True
reason = OPEN_FAILED_ADMINISTRATIVELY_PROHIBITED
@@ -2696,7 +2714,8 @@ class Transport(threading.Thread, ClosingContextManager):
)
if reason != OPEN_SUCCEEDED:
self._log(
- DEBUG, 'Rejecting "%s" channel request from client.' % kind
+ DEBUG,
+ 'Rejecting "{}" channel request from client.'.format(kind),
)
reject = True
if reject:
@@ -2730,7 +2749,9 @@ class Transport(threading.Thread, ClosingContextManager):
m.add_int(self.default_window_size)
m.add_int(self.default_max_packet_size)
self._send_message(m)
- self._log(DEBUG, "Secsh channel %d (%s) opened.", my_chanid, kind)
+ self._log(
+ DEBUG, "Secsh channel {:d} ({}) opened.".format(my_chanid, kind)
+ )
if kind == "auth-agent@openssh.com":
self._forward_agent_handler(chan)
elif kind == "x11":
@@ -2747,7 +2768,7 @@ class Transport(threading.Thread, ClosingContextManager):
m.get_boolean() # always_display
msg = m.get_string()
m.get_string() # language
- self._log(DEBUG, "Debug msg: {0}".format(util.safe_string(msg)))
+ self._log(DEBUG, "Debug msg: {}".format(util.safe_string(msg)))
def _get_subsystem_handler(self, name):
try:
@@ -2803,7 +2824,7 @@ class SecurityOptions(object):
"""
Returns a string representation of this object, for debugging.
"""
- return "<paramiko.SecurityOptions for %s>" % repr(self._transport)
+ return "<paramiko.SecurityOptions for {!r}>".format(self._transport)
def _set(self, name, orig, x):
if type(x) is list:
diff --git a/paramiko/util.py b/paramiko/util.py
index 1fb73da4..29c52bfb 100644
--- a/paramiko/util.py
+++ b/paramiko/util.py
@@ -102,19 +102,21 @@ def format_binary(data, prefix=""):
def format_binary_line(data):
- left = " ".join(["%02X" % byte_ord(c) for c in data])
- right = "".join([(".%c.." % c)[(byte_ord(c) + 63) // 95] for c in data])
- return "%-50s %s" % (left, right)
+ left = " ".join(["{:02X}".format(byte_ord(c)) for c in data])
+ right = "".join(
+ [".{:c}..".format(byte_ord(c))[(byte_ord(c) + 63) // 95] for c in data]
+ )
+ return "{:50s} {}".format(left, right)
def safe_string(s):
- out = b("")
+ out = b""
for c in s:
i = byte_ord(c)
if 32 <= i <= 127:
out += byte_chr(i)
else:
- out += b("%%%02X" % i)
+ out += b("%{:02X}".format(i))
return out
diff --git a/paramiko/win_pageant.py b/paramiko/win_pageant.py
index d6afd5bd..a550b7f3 100644
--- a/paramiko/win_pageant.py
+++ b/paramiko/win_pageant.py
@@ -44,7 +44,7 @@ win32con_WM_COPYDATA = 74
def _get_pageant_window_object():
- return ctypes.windll.user32.FindWindowA(b("Pageant"), b("Pageant"))
+ return ctypes.windll.user32.FindWindowA(b"Pageant", b"Pageant")
def can_talk_to_agent():
diff --git a/setup.cfg b/setup.cfg
index 7ff383c3..b6b2eea3 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -8,7 +8,7 @@ license_file = LICENSE
omit = paramiko/_winapi.py
[flake8]
-exclude = sites,.git,build,dist,demos,tests
+exclude = sites,.git,build,dist,demos
# NOTE: W503, E203 are concessions to black 18.0b5 and could be reinstated
# later if fixed on that end.
# NOTE: E722 seems to only have started popping up on move to flake8 3.6.0 from
@@ -22,3 +22,6 @@ max-line-length = 79
addopts = -p no:relaxed
# Loop on failure
looponfailroots = tests paramiko
+# Ignore some warnings we cannot easily handle.
+filterwarnings =
+ ignore::DeprecationWarning:pkg_resources
diff --git a/setup.py b/setup.py
index fdaf3032..4c0bac55 100644
--- a/setup.py
+++ b/setup.py
@@ -65,15 +65,13 @@ setup(
"Topic :: Security :: Cryptography",
"Programming Language :: Python",
"Programming Language :: Python :: 2",
- "Programming Language :: Python :: 2.6",
"Programming Language :: Python :: 2.7",
"Programming Language :: Python :: 3",
- "Programming Language :: Python :: 3.2",
- "Programming Language :: Python :: 3.3",
"Programming Language :: Python :: 3.4",
"Programming Language :: Python :: 3.5",
"Programming Language :: Python :: 3.6",
"Programming Language :: Python :: 3.7",
+ "Programming Language :: Python :: 3.8",
],
install_requires=[
"bcrypt>=3.1.3",
diff --git a/sites/shared_conf.py b/sites/shared_conf.py
index 4d6a7b27..7bb503ce 100644
--- a/sites/shared_conf.py
+++ b/sites/shared_conf.py
@@ -21,12 +21,12 @@ html_sidebars = {
}
# Everything intersphinx's to Python
-intersphinx_mapping = {"python": ("http://docs.python.org/2.6", None)}
+intersphinx_mapping = {"python": ("https://docs.python.org/2.7/", None)}
# Regular settings
project = "Paramiko"
year = datetime.now().year
-copyright = "%d Jeff Forcier" % year
+copyright = "{} Jeff Forcier".format(year)
master_doc = "index"
templates_path = ["_templates"]
exclude_trees = ["_build"]
diff --git a/sites/www/changelog.rst b/sites/www/changelog.rst
index 91d170c1..86419aa7 100644
--- a/sites/www/changelog.rst
+++ b/sites/www/changelog.rst
@@ -11,6 +11,7 @@ Changelog
import location of ``MutableMapping`` (used in host key management) to avoid
the old location becoming deprecated in Python 3.8. Thanks to Josh Karpel for
catch & patch.
+- :release:`2.4.2 <2018-09-18>`
- :release:`2.3.3 <2018-09-18>`
- :release:`2.2.4 <2018-09-18>`
- :release:`2.1.6 <2018-09-18>`
@@ -52,6 +53,7 @@ Changelog
- :support:`1262 backported` Add ``*.pub`` files to the MANIFEST so distributed
source packages contain some necessary test assets. Credit: Alexander
Kapshuna.
+- :release:`2.4.1 <2018-03-12>`
- :release:`2.3.2 <2018-03-12>`
- :release:`2.2.3 <2018-03-12>`
- :release:`2.1.5 <2018-03-12>`
@@ -66,10 +68,32 @@ Changelog
- :bug:`1039` Ed25519 auth key decryption raised an unexpected exception when
given a unicode password string (typical in python 3). Report by Theodor van
Nahl and fix by Pierce Lopez.
+- :release:`2.4.0 <2017-11-14>`
+- :feature:`-` Add a new ``passphrase`` kwarg to `SSHClient.connect
+ <paramiko.client.SSHClient.connect>` so users may disambiguate key-decryption
+ passphrases from password-auth passwords. (This is a backwards compatible
+ change; ``password`` will still pull double duty as a passphrase when
+ ``passphrase`` is not given.)
+- :support:`-` Update ``tearDown`` of client test suite to avoid hangs due to
+ eternally blocking ``accept()`` calls on the internal server thread (which
+ can occur when test code raises an exception before actually connecting to
+ the server.)
- :bug:`1108 (1.17+)` Rename a private method keyword argument (which was named
``async``) so that we're compatible with the upcoming Python 3.7 release
(where ``async`` is a new keyword.) Thanks to ``@vEpiphyte`` for the report.
+- :support:`1100` Updated the test suite & related docs/metadata/config to be
+ compatible with pytest instead of using the old, custom, crufty
+ unittest-based ``test.py``.
+
+ This includes marking known-slow tests (mostly the SFTP ones) so they can be
+ filtered out by ``inv test``'s default behavior; as well as other minor
+ tweaks to test collection and/or display (for example, GSSAPI tests are
+ collected, but skipped, instead of not even being collected by default as in
+ ``test.py``.)
- :support:`- backported` Include LICENSE file in wheel archives.
+- :support:`1070` Drop Python 2.6 and Python 3.3 support; now only 2.7 and 3.4+
+ are supported. If you're unable to upgrade from 2.6 or 3.3, please stick to
+ the Paramiko 2.3.x (or below) release lines.
- :release:`2.3.1 <2017-09-22>`
- :bug:`1071` Certificate support broke the no-certificate case for Ed25519
keys (symptom is an ``AttributeError`` about ``public_blob``.) This went
diff --git a/sites/www/index.rst b/sites/www/index.rst
index f0a5db8a..26961f24 100644
--- a/sites/www/index.rst
+++ b/sites/www/index.rst
@@ -1,11 +1,11 @@
Welcome to Paramiko!
====================
-Paramiko is a Python (2.6+, 3.3+) implementation of the SSHv2 protocol [#]_,
+Paramiko is a Python (2.7, 3.4+) implementation of the SSHv2 protocol [#]_,
providing both client and server functionality. While it leverages a Python C
-extension for low level cryptography
-(`Cryptography <https://cryptography.io>`_), Paramiko itself is a pure Python
-interface around SSH networking concepts.
+extension for low level cryptography (`Cryptography
+<https://cryptography.io>`_), Paramiko itself is a pure Python interface around
+SSH networking concepts.
This website covers project information for Paramiko such as the changelog,
contribution guidelines, development roadmap, news/blog, and so forth. Detailed
diff --git a/sites/www/installing.rst b/sites/www/installing.rst
index f335a9e7..e6db2dca 100644
--- a/sites/www/installing.rst
+++ b/sites/www/installing.rst
@@ -19,8 +19,8 @@ via `pip <http://pip-installer.org>`_::
$ pip install paramiko
-We currently support **Python 2.6, 2.7, 3.3+, and PyPy**. Users on Python 2.5
-or older (or 3.2 or older) are urged to upgrade.
+We currently support **Python 2.7, 3.4+, and PyPy**. Users on Python 2.6 or
+older (or 3.3 or older) are urged to upgrade.
Paramiko has only one direct hard dependency: the Cryptography library. See
:ref:`cryptography`.
diff --git a/tasks.py b/tasks.py
index 6ea0a2c4..3d2fdf65 100644
--- a/tasks.py
+++ b/tasks.py
@@ -7,6 +7,7 @@ from invocations import travis
from invocations.checks import blacken
from invocations.docs import docs, www, sites
from invocations.packaging.release import ns as release_coll, publish
+from invocations.testing import count_errors
# TODO: this screams out for the invoke missing-feature of "I just wrap task X,
@@ -39,7 +40,7 @@ def test(
# running headless? Probably?
if color:
opts += " --color=yes"
- opts += " --capture={0}".format(capture)
+ opts += " --capture={}".format(capture)
if "-m" not in opts and not include_slow:
opts += " -m 'not slow'"
if k is not None and not ("-k" in opts if opts else False):
@@ -122,7 +123,16 @@ def release(ctx, sdist=True, wheel=True, sign=True, dry_run=False, index=None):
release_coll.tasks["publish"] = release
ns = Collection(
- test, coverage, guard, release_coll, docs, www, sites, travis, blacken
+ test,
+ coverage,
+ guard,
+ release_coll,
+ docs,
+ www,
+ sites,
+ count_errors,
+ travis,
+ blacken,
)
ns.configure(
{
diff --git a/tests/stub_sftp.py b/tests/stub_sftp.py
index 100076d6..1528a0b8 100644
--- a/tests/stub_sftp.py
+++ b/tests/stub_sftp.py
@@ -21,18 +21,17 @@ A stub SFTP server for loopback SFTP testing.
"""
import os
-import sys
from paramiko import (
- ServerInterface,
- SFTPServerInterface,
- SFTPServer,
+ AUTH_SUCCESSFUL,
+ OPEN_SUCCEEDED,
SFTPAttributes,
SFTPHandle,
- SFTP_OK,
+ SFTPServer,
+ SFTPServerInterface,
SFTP_FAILURE,
- AUTH_SUCCESSFUL,
- OPEN_SUCCEEDED,
+ SFTP_OK,
+ ServerInterface,
)
from paramiko.common import o666
@@ -65,7 +64,8 @@ class StubSFTPHandle(SFTPHandle):
class StubSFTPServer(SFTPServerInterface):
# assume current folder is a fine root
- # (the tests always create and eventually delete a subfolder, so there shouldn't be any mess)
+ # (the tests always create and eventually delete a subfolder, so there
+ # shouldn't be any mess)
ROOT = os.getcwd()
def _realpath(self, path):
@@ -206,7 +206,8 @@ class StubSFTPServer(SFTPServerInterface):
# compute relative to path
abspath = os.path.join(os.path.dirname(path), target_path)
if abspath[: len(self.ROOT)] != self.ROOT:
- # this symlink isn't going to work anyway -- just break it immediately
+ # this symlink isn't going to work anyway -- just break it
+ # immediately
target_path = "<error>"
try:
os.symlink(target_path, path)
diff --git a/tests/test_auth.py b/tests/test_auth.py
index 6358a053..01fbac5b 100644
--- a/tests/test_auth.py
+++ b/tests/test_auth.py
@@ -142,7 +142,7 @@ class AuthTest(unittest.TestCase):
self.assertTrue(self.event.is_set())
self.assertTrue(self.ts.is_active())
- def test_1_bad_auth_type(self):
+ def test_bad_auth_type(self):
"""
verify that we get the right exception when an unsupported auth
type is requested.
@@ -160,7 +160,7 @@ class AuthTest(unittest.TestCase):
self.assertEqual(BadAuthenticationType, etype)
self.assertEqual(["publickey"], evalue.allowed_types)
- def test_2_bad_password(self):
+ def test_bad_password(self):
"""
verify that a bad password gets the right exception, and that a retry
with the right password works.
@@ -176,7 +176,7 @@ class AuthTest(unittest.TestCase):
self.tc.auth_password(username="slowdive", password="pygmalion")
self.verify_finished()
- def test_3_multipart_auth(self):
+ def test_multipart_auth(self):
"""
verify that multipart auth works.
"""
@@ -191,7 +191,7 @@ class AuthTest(unittest.TestCase):
self.assertEqual([], remain)
self.verify_finished()
- def test_4_interactive_auth(self):
+ def test_interactive_auth(self):
"""
verify keyboard-interactive auth works.
"""
@@ -210,7 +210,7 @@ class AuthTest(unittest.TestCase):
self.assertEqual([], remain)
self.verify_finished()
- def test_5_interactive_auth_fallback(self):
+ def test_interactive_auth_fallback(self):
"""
verify that a password auth attempt will fallback to "interactive"
if password auth isn't supported but interactive is.
@@ -221,7 +221,7 @@ class AuthTest(unittest.TestCase):
self.assertEqual([], remain)
self.verify_finished()
- def test_6_auth_utf8(self):
+ def test_auth_utf8(self):
"""
verify that utf-8 encoding happens in authentication.
"""
@@ -231,7 +231,7 @@ class AuthTest(unittest.TestCase):
self.assertEqual([], remain)
self.verify_finished()
- def test_7_auth_non_utf8(self):
+ def test_auth_non_utf8(self):
"""
verify that non-utf-8 encoded passwords can be used for broken
servers.
@@ -242,7 +242,7 @@ class AuthTest(unittest.TestCase):
self.assertEqual([], remain)
self.verify_finished()
- def test_8_auth_gets_disconnected(self):
+ def test_auth_gets_disconnected(self):
"""
verify that we catch a server disconnecting during auth, and report
it as an auth failure.
@@ -250,12 +250,13 @@ class AuthTest(unittest.TestCase):
self.start_server()
self.tc.connect(hostkey=self.public_host_key)
try:
- remain = self.tc.auth_password("bad-server", "hello")
+ self.tc.auth_password("bad-server", "hello")
except:
etype, evalue, etb = sys.exc_info()
self.assertTrue(issubclass(etype, AuthenticationException))
- def test_9_auth_non_responsive(self):
+ @slow
+ def test_auth_non_responsive(self):
"""
verify that authentication times out if server takes to long to
respond (or never responds).
@@ -264,7 +265,7 @@ class AuthTest(unittest.TestCase):
self.start_server()
self.tc.connect()
try:
- remain = self.tc.auth_password("unresponsive-server", "hello")
+ self.tc.auth_password("unresponsive-server", "hello")
except:
etype, evalue, etb = sys.exc_info()
self.assertTrue(issubclass(etype, AuthenticationException))
diff --git a/tests/test_buffered_pipe.py b/tests/test_buffered_pipe.py
index 28d6e4a2..61c99cc0 100644
--- a/tests/test_buffered_pipe.py
+++ b/tests/test_buffered_pipe.py
@@ -26,7 +26,6 @@ import unittest
from paramiko.buffered_pipe import BufferedPipe, PipeTimeout
from paramiko import pipe
-from paramiko.py3compat import b
def delay_thread(p):
@@ -42,7 +41,7 @@ def close_thread(p):
class BufferedPipeTest(unittest.TestCase):
- def test_1_buffered_pipe(self):
+ def test_buffered_pipe(self):
p = BufferedPipe()
self.assertTrue(not p.read_ready())
p.feed("hello.")
@@ -59,7 +58,7 @@ class BufferedPipeTest(unittest.TestCase):
self.assertTrue(not p.read_ready())
self.assertEqual(b"", p.read(1))
- def test_2_delay(self):
+ def test_delay(self):
p = BufferedPipe()
self.assertTrue(not p.read_ready())
threading.Thread(target=delay_thread, args=(p,)).start()
@@ -72,13 +71,13 @@ class BufferedPipeTest(unittest.TestCase):
self.assertEqual(b"b", p.read(1, 1.0))
self.assertEqual(b"", p.read(1))
- def test_3_close_while_reading(self):
+ def test_close_while_reading(self):
p = BufferedPipe()
threading.Thread(target=close_thread, args=(p,)).start()
data = p.read(1, 1.0)
self.assertEqual(b"", data)
- def test_4_or_pipe(self):
+ def test_or_pipe(self):
p = pipe.make_pipe()
p1, p2 = pipe.make_or_pipe(p)
self.assertFalse(p._set)
diff --git a/tests/test_channelfile.py b/tests/test_channelfile.py
new file mode 100644
index 00000000..ffcbea3f
--- /dev/null
+++ b/tests/test_channelfile.py
@@ -0,0 +1,41 @@
+from mock import patch, MagicMock
+
+from paramiko import Channel, ChannelFile, ChannelStderrFile
+
+
+class TestChannelFile(object):
+ @patch("paramiko.channel.ChannelFile._set_mode")
+ def test_defaults_to_unbuffered_reading(self, setmode):
+ ChannelFile(Channel(None))
+ setmode.assert_called_once_with("r", -1)
+
+ @patch("paramiko.channel.ChannelFile._set_mode")
+ def test_can_override_mode_and_bufsize(self, setmode):
+ ChannelFile(Channel(None), mode="w", bufsize=25)
+ setmode.assert_called_once_with("w", 25)
+
+ def test_read_recvs_from_channel(self):
+ chan = MagicMock()
+ cf = ChannelFile(chan)
+ cf.read(100)
+ chan.recv.assert_called_once_with(100)
+
+ def test_write_calls_channel_sendall(self):
+ chan = MagicMock()
+ cf = ChannelFile(chan, mode="w")
+ cf.write("ohai")
+ chan.sendall.assert_called_once_with(b"ohai")
+
+
+class TestChannelStderrFile(object):
+ def test_read_calls_channel_recv_stderr(self):
+ chan = MagicMock()
+ cf = ChannelStderrFile(chan)
+ cf.read(100)
+ chan.recv_stderr.assert_called_once_with(100)
+
+ def test_write_calls_channel_sendall(self):
+ chan = MagicMock()
+ cf = ChannelStderrFile(chan, mode="w")
+ cf.write("ohai")
+ chan.sendall_stderr.assert_called_once_with(b"ohai")
diff --git a/tests/test_client.py b/tests/test_client.py
index bfbd395f..26de2d37 100644
--- a/tests/test_client.py
+++ b/tests/test_client.py
@@ -20,7 +20,7 @@
Some unit tests for SSHClient.
"""
-from __future__ import with_statement
+from __future__ import with_statement, print_function
import gc
import os
@@ -33,18 +33,23 @@ import warnings
import weakref
from tempfile import mkstemp
+from pytest_relaxed import raises
+
import paramiko
from paramiko.pkey import PublicBlob
-from paramiko.common import PY2
from paramiko.ssh_exception import SSHException, AuthenticationException
from .util import _support, slow
+requires_gss_auth = unittest.skipUnless(
+ paramiko.GSS_AUTH_AVAILABLE, "GSS auth not available"
+)
+
FINGERPRINTS = {
- "ssh-dss": b"\x44\x78\xf0\xb9\xa2\x3c\xc5\x18\x20\x09\xff\x75\x5b\xc1\xd2\x6c",
- "ssh-rsa": b"\x60\x73\x38\x44\xcb\x51\x86\x65\x7f\xde\xda\xa2\x2b\x5a\x57\xd5",
- "ecdsa-sha2-nistp256": b"\x25\x19\xeb\x55\xe6\xa1\x47\xff\x4f\x38\xd2\x75\x6f\xa5\xd5\x60",
+ "ssh-dss": b"\x44\x78\xf0\xb9\xa2\x3c\xc5\x18\x20\x09\xff\x75\x5b\xc1\xd2\x6c", # noqa
+ "ssh-rsa": b"\x60\x73\x38\x44\xcb\x51\x86\x65\x7f\xde\xda\xa2\x2b\x5a\x57\xd5", # noqa
+ "ecdsa-sha2-nistp256": b"\x25\x19\xeb\x55\xe6\xa1\x47\xff\x4f\x38\xd2\x75\x6f\xa5\xd5\x60", # noqa
"ssh-ed25519": b'\xb3\xd5"\xaa\xf9u^\xe8\xcd\x0e\xea\x02\xb9)\xa2\x80',
}
@@ -107,7 +112,7 @@ class NullServer(paramiko.ServerInterface):
return True
-class SSHClientTest(unittest.TestCase):
+class ClientTest(unittest.TestCase):
def setUp(self):
self.sockl = socket.socket()
self.sockl.bind(("localhost", 0))
@@ -120,16 +125,42 @@ class SSHClientTest(unittest.TestCase):
look_for_keys=False,
)
self.event = threading.Event()
+ self.kill_event = threading.Event()
def tearDown(self):
- for attr in "tc ts socks sockl".split():
- if hasattr(self, attr):
- getattr(self, attr).close()
-
- def _run(self, allowed_keys=None, delay=0, public_blob=None):
+ # Shut down client Transport
+ if hasattr(self, "tc"):
+ self.tc.close()
+ # Shut down shared socket
+ if hasattr(self, "sockl"):
+ # Signal to server thread that it should shut down early; it checks
+ # this immediately after accept(). (In scenarios where connection
+ # actually succeeded during the test, this becomes a no-op.)
+ self.kill_event.set()
+ # Forcibly connect to server sock in case the server thread is
+ # hanging out in its accept() (e.g. if the client side of the test
+ # fails before it even gets to connecting); there's no other good
+ # way to force an accept() to exit.
+ put_a_sock_in_it = socket.socket()
+ put_a_sock_in_it.connect((self.addr, self.port))
+ put_a_sock_in_it.close()
+ # Then close "our" end of the socket (which _should_ cause the
+ # accept() to bail out, but does not, for some reason. I blame
+ # threading.)
+ self.sockl.close()
+
+ def _run(
+ self, allowed_keys=None, delay=0, public_blob=None, kill_event=None
+ ):
if allowed_keys is None:
allowed_keys = FINGERPRINTS.keys()
self.socks, addr = self.sockl.accept()
+ # If the kill event was set at this point, it indicates an early
+ # shutdown, so bail out now and don't even try setting up a Transport
+ # (which will just verbosely die.)
+ if kill_event and kill_event.is_set():
+ self.socks.close()
+ return
self.ts = paramiko.Transport(self.socks)
keypath = _support("test_rsa.key")
host_key = paramiko.RSAKey.from_private_key_file(keypath)
@@ -149,7 +180,7 @@ class SSHClientTest(unittest.TestCase):
The exception is ``allowed_keys`` which is stripped and handed to the
``NullServer`` used for testing.
"""
- run_kwargs = {}
+ run_kwargs = {"kill_event": self.kill_event}
for key in ("allowed_keys", "public_blob"):
run_kwargs[key] = kwargs.pop(key, None)
# Server setup
@@ -180,6 +211,12 @@ class SSHClientTest(unittest.TestCase):
stdin, stdout, stderr = self.tc.exec_command("yes")
schan = self.ts.accept(1.0)
+ # Nobody else tests the API of exec_command so let's do it here for
+ # now. :weary:
+ assert isinstance(stdin, paramiko.ChannelFile)
+ assert isinstance(stdout, paramiko.ChannelFile)
+ assert isinstance(stderr, paramiko.ChannelStderrFile)
+
schan.send("Hello there.\n")
schan.send_stderr("This is on stderr.\n")
schan.close()
@@ -194,13 +231,15 @@ class SSHClientTest(unittest.TestCase):
stdout.close()
stderr.close()
- def test_1_client(self):
+
+class SSHClientTest(ClientTest):
+ def test_client(self):
"""
verify that the SSHClient stuff works too.
"""
self._test_connection(password="pygmalion")
- def test_2_client_dsa(self):
+ def test_client_dsa(self):
"""
verify that SSHClient works with a DSA key.
"""
@@ -212,7 +251,7 @@ class SSHClientTest(unittest.TestCase):
"""
self._test_connection(key_filename=_support("test_rsa.key"))
- def test_2_5_client_ecdsa(self):
+ def test_client_ecdsa(self):
"""
verify that SSHClient works with an ECDSA key.
"""
@@ -221,7 +260,7 @@ class SSHClientTest(unittest.TestCase):
def test_client_ed25519(self):
self._test_connection(key_filename=_support("test_ed25519.key"))
- def test_3_multiple_key_files(self):
+ def test_multiple_key_files(self):
"""
verify that SSHClient accepts and tries multiple key files.
"""
@@ -242,7 +281,7 @@ class SSHClientTest(unittest.TestCase):
try:
self._test_connection(
key_filename=[
- _support("test_{0}.key".format(x)) for x in attempt
+ _support("test_{}.key".format(x)) for x in attempt
],
allowed_keys=[types_[x] for x in accept],
)
@@ -271,7 +310,7 @@ class SSHClientTest(unittest.TestCase):
# server-side behavior is 100% identical.)
# NOTE: only bothered whipping up one cert per overall class/family.
for type_ in ("rsa", "dss", "ecdsa_256", "ed25519"):
- cert_name = "test_{0}.key-cert.pub".format(type_)
+ cert_name = "test_{}.key-cert.pub".format(type_)
cert_path = _support(os.path.join("cert_support", cert_name))
self._test_connection(
key_filename=cert_path,
@@ -286,12 +325,12 @@ class SSHClientTest(unittest.TestCase):
# that a specific cert was found, along with regular authorization
# succeeding proving that the overall flow works.
for type_ in ("rsa", "dss", "ecdsa_256", "ed25519"):
- key_name = "test_{0}.key".format(type_)
+ key_name = "test_{}.key".format(type_)
key_path = _support(os.path.join("cert_support", key_name))
self._test_connection(
key_filename=key_path,
public_blob=PublicBlob.from_file(
- "{0}-cert.pub".format(key_path)
+ "{}-cert.pub".format(key_path)
),
)
@@ -301,7 +340,7 @@ class SSHClientTest(unittest.TestCase):
# code path (!) so we're punting too, sob.
pass
- def test_4_auto_add_policy(self):
+ def test_auto_add_policy(self):
"""
verify that SSHClient's AutoAddPolicy works.
"""
@@ -324,7 +363,7 @@ class SSHClientTest(unittest.TestCase):
new_host_key = list(self.tc.get_host_keys()[hostname].values())[0]
self.assertEqual(public_host_key, new_host_key)
- def test_5_save_host_keys(self):
+ def test_save_host_keys(self):
"""
verify that SSHClient correctly saves a known_hosts file.
"""
@@ -353,7 +392,7 @@ class SSHClientTest(unittest.TestCase):
os.unlink(localname)
- def test_6_cleanup(self):
+ def test_cleanup(self):
"""
verify that when an SSHClient is collected, its transport (and the
transport's packetizer) is closed.
@@ -406,7 +445,7 @@ class SSHClientTest(unittest.TestCase):
self.assertTrue(self.tc._transport is None)
- def test_7_banner_timeout(self):
+ def test_banner_timeout(self):
"""
verify that the SSHClient has a configurable banner timeout.
"""
@@ -425,7 +464,7 @@ class SSHClientTest(unittest.TestCase):
kwargs = dict(self.connect_kwargs, banner_timeout=0.5)
self.assertRaises(paramiko.SSHException, self.tc.connect, **kwargs)
- def test_8_auth_trickledown(self):
+ def test_auth_trickledown(self):
"""
Failed key auth doesn't prevent subsequent pw auth from succeeding
"""
@@ -445,7 +484,8 @@ class SSHClientTest(unittest.TestCase):
)
self._test_connection(**kwargs)
- def test_9_auth_timeout(self):
+ @slow
+ def test_auth_timeout(self):
"""
verify that the SSHClient has a configurable auth timeout
"""
@@ -457,25 +497,23 @@ class SSHClientTest(unittest.TestCase):
auth_timeout=0.5,
)
- def test_10_auth_trickledown_gsskex(self):
+ @requires_gss_auth
+ def test_auth_trickledown_gsskex(self):
"""
- Failed gssapi-keyex auth doesn't prevent subsequent key auth from succeeding
+ Failed gssapi-keyex doesn't prevent subsequent key from succeeding
"""
- if not paramiko.GSS_AUTH_AVAILABLE:
- return # for python 2.6 lacks skipTest
kwargs = dict(gss_kex=True, key_filename=[_support("test_rsa.key")])
self._test_connection(**kwargs)
- def test_11_auth_trickledown_gssauth(self):
+ @requires_gss_auth
+ def test_auth_trickledown_gssauth(self):
"""
- Failed gssapi-with-mic auth doesn't prevent subsequent key auth from succeeding
+ Failed gssapi-with-mic doesn't prevent subsequent key from succeeding
"""
- if not paramiko.GSS_AUTH_AVAILABLE:
- return # for python 2.6 lacks skipTest
kwargs = dict(gss_auth=True, key_filename=[_support("test_rsa.key")])
self._test_connection(**kwargs)
- def test_12_reject_policy(self):
+ def test_reject_policy(self):
"""
verify that SSHClient's RejectPolicy works.
"""
@@ -491,14 +529,14 @@ class SSHClientTest(unittest.TestCase):
**self.connect_kwargs
)
- def test_13_reject_policy_gsskex(self):
+ @requires_gss_auth
+ def test_reject_policy_gsskex(self):
"""
verify that SSHClient's RejectPolicy works,
even if gssapi-keyex was enabled but not used.
"""
- # Test for a bug present in paramiko versions released before 2017-08-01
- if not paramiko.GSS_AUTH_AVAILABLE:
- return # for python 2.6 lacks skipTest
+ # Test for a bug present in paramiko versions released before
+ # 2017-08-01
threading.Thread(target=self._run).start()
self.tc = paramiko.SSHClient()
@@ -558,10 +596,7 @@ class SSHClientTest(unittest.TestCase):
def test_host_key_negotiation_4(self):
self._client_host_key_good(paramiko.RSAKey, "test_rsa.key")
- def test_update_environment(self):
- """
- Verify that environment variables can be set by the client.
- """
+ def _setup_for_env(self):
threading.Thread(target=self._run).start()
self.tc = paramiko.SSHClient()
@@ -575,6 +610,11 @@ class SSHClientTest(unittest.TestCase):
self.assertTrue(self.event.isSet())
self.assertTrue(self.ts.is_active())
+ def test_update_environment(self):
+ """
+ Verify that environment variables can be set by the client.
+ """
+ self._setup_for_env()
target_env = {b"A": b"B", b"C": b"d"}
self.tc.exec_command("yes", environment=target_env)
@@ -582,22 +622,20 @@ class SSHClientTest(unittest.TestCase):
self.assertEqual(target_env, getattr(schan, "env", {}))
schan.close()
- # Cannot use assertRaises in context manager mode as it is not supported
- # in Python 2.6.
- try:
+ @unittest.skip("Clients normally fail silently, thus so do we, for now")
+ def test_env_update_failures(self):
+ self._setup_for_env()
+ with self.assertRaises(SSHException) as manager:
# Verify that a rejection by the server can be detected
self.tc.exec_command("yes", environment={b"INVALID_ENV": b""})
- except SSHException as e:
- self.assertTrue(
- "INVALID_ENV" in str(e),
- "Expected variable name in error message",
- )
- self.assertTrue(
- isinstance(e.args[1], SSHException),
- "Expected original SSHException in exception",
- )
- else:
- self.assertFalse(False, "SSHException was not thrown.")
+ self.assertTrue(
+ "INVALID_ENV" in str(manager.exception),
+ "Expected variable name in error message",
+ )
+ self.assertTrue(
+ isinstance(manager.exception.args[1], SSHException),
+ "Expected original SSHException in exception",
+ )
def test_missing_key_policy_accepts_classes_or_instances(self):
"""
@@ -614,3 +652,49 @@ class SSHClientTest(unittest.TestCase):
# Hand in just the class (new behavior)
client.set_missing_host_key_policy(paramiko.AutoAddPolicy)
assert isinstance(client._policy, paramiko.AutoAddPolicy)
+
+
+class PasswordPassphraseTests(ClientTest):
+ # TODO: most of these could reasonably be set up to use mocks/assertions
+ # (e.g. "gave passphrase -> expect PKey was given it as the passphrase")
+ # instead of suffering a real connection cycle.
+ # TODO: in that case, move the below to be part of an integration suite?
+
+ def test_password_kwarg_works_for_password_auth(self):
+ # Straightforward / duplicate of earlier basic password test.
+ self._test_connection(password="pygmalion")
+
+ # TODO: more granular exception pending #387; should be signaling "no auth
+ # methods available" because no key and no password
+ @raises(SSHException)
+ def test_passphrase_kwarg_not_used_for_password_auth(self):
+ # Using the "right" password in the "wrong" field shouldn't work.
+ self._test_connection(passphrase="pygmalion")
+
+ def test_passphrase_kwarg_used_for_key_passphrase(self):
+ # Straightforward again, with new passphrase kwarg.
+ self._test_connection(
+ key_filename=_support("test_rsa_password.key"),
+ passphrase="television",
+ )
+
+ def test_password_kwarg_used_for_passphrase_when_no_passphrase_kwarg_given(
+ self
+ ): # noqa
+ # Backwards compatibility: passphrase in the password field.
+ self._test_connection(
+ key_filename=_support("test_rsa_password.key"),
+ password="television",
+ )
+
+ @raises(AuthenticationException) # TODO: more granular
+ def test_password_kwarg_not_used_for_passphrase_when_passphrase_kwarg_given( # noqa
+ self
+ ):
+ # Sanity: if we're given both fields, the password field is NOT used as
+ # a passphrase.
+ self._test_connection(
+ key_filename=_support("test_rsa_password.key"),
+ password="television",
+ passphrase="wat? lol no",
+ )
diff --git a/tests/test_file.py b/tests/test_file.py
index fba14b1b..2a3da74b 100644
--- a/tests/test_file.py
+++ b/tests/test_file.py
@@ -52,7 +52,7 @@ class LoopbackFile(BufferedFile):
class BufferedFileTest(unittest.TestCase):
- def test_1_simple(self):
+ def test_simple(self):
f = LoopbackFile("r")
try:
f.write(b"hi")
@@ -69,7 +69,7 @@ class BufferedFileTest(unittest.TestCase):
pass
f.close()
- def test_2_readline(self):
+ def test_readline(self):
f = LoopbackFile("r+U")
f.write(
b"First line.\nSecond line.\r\nThird line.\n"
@@ -96,7 +96,7 @@ class BufferedFileTest(unittest.TestCase):
self.assertTrue(crlf in f.newlines)
self.assertTrue(cr_byte not in f.newlines)
- def test_3_lf(self):
+ def test_lf(self):
"""
try to trick the linefeed detector.
"""
@@ -108,7 +108,7 @@ class BufferedFileTest(unittest.TestCase):
f.close()
self.assertEqual(f.newlines, crlf)
- def test_4_write(self):
+ def test_write(self):
"""
verify that write buffering is on.
"""
@@ -120,7 +120,7 @@ class BufferedFileTest(unittest.TestCase):
self.assertEqual(f.readline(), "Incomplete line...\n")
f.close()
- def test_5_flush(self):
+ def test_flush(self):
"""
verify that flush will force a write.
"""
@@ -134,7 +134,7 @@ class BufferedFileTest(unittest.TestCase):
self.assertEqual(f.read(3), b"")
f.close()
- def test_6_buffering(self):
+ def test_buffering_flushes(self):
"""
verify that flushing happens automatically on buffer crossing.
"""
@@ -147,7 +147,7 @@ class BufferedFileTest(unittest.TestCase):
self.assertEqual(f.read(20), b"Too small. Enough.")
f.close()
- def test_7_read_all(self):
+ def test_read_all(self):
"""
verify that read(-1) returns everything left in the file.
"""
@@ -162,30 +162,30 @@ class BufferedFileTest(unittest.TestCase):
)
f.close()
- def test_8_buffering(self):
+ def test_buffering_writes(self):
"""
verify that buffered objects can be written
"""
if sys.version_info[0] == 2:
f = LoopbackFile("r+", 16)
- f.write(buffer(b"Too small."))
+ f.write(buffer(b"Too small.")) # noqa
f.close()
- def test_9_readable(self):
+ def test_readable(self):
f = LoopbackFile("r")
self.assertTrue(f.readable())
self.assertFalse(f.writable())
self.assertFalse(f.seekable())
f.close()
- def test_A_writable(self):
+ def test_writable(self):
f = LoopbackFile("w")
self.assertTrue(f.writable())
self.assertFalse(f.readable())
self.assertFalse(f.seekable())
f.close()
- def test_B_readinto(self):
+ def test_readinto(self):
data = bytearray(5)
f = LoopbackFile("r+")
f._write(b"hello")
@@ -215,7 +215,7 @@ class BufferedFileTest(unittest.TestCase):
offsets = range(0, len(data), 8)
with LoopbackFile("rb+") as f:
for offset in offsets:
- f.write(buffer(data, offset, 8))
+ f.write(buffer(data, offset, 8)) # noqa
self.assertEqual(f.read(), data)
@needs_builtin("memoryview")
diff --git a/tests/test_gssapi.py b/tests/test_gssapi.py
index 3e8c39e8..46d5bbd1 100644
--- a/tests/test_gssapi.py
+++ b/tests/test_gssapi.py
@@ -30,14 +30,14 @@ from .util import needs_gssapi
@needs_gssapi
class GSSAPITest(unittest.TestCase):
- def setup():
+ def setup(self):
# TODO: these vars should all come from os.environ or whatever the
# approved pytest method is for runtime-configuring test data.
self.krb5_mech = "1.2.840.113554.1.2.2"
self.targ_name = "hostname"
self.server_mode = False
- def test_1_pyasn1(self):
+ def test_pyasn1(self):
"""
Test the used methods of pyasn1.
"""
@@ -48,7 +48,7 @@ class GSSAPITest(unittest.TestCase):
mech, __ = decoder.decode(oid)
self.assertEquals(self.krb5_mech, mech.__str__())
- def test_2_gssapi_sspi(self):
+ def test_gssapi_sspi(self):
"""
Test the used methods of python-gssapi or sspi, sspicon from pywin32.
"""
diff --git a/tests/test_hostkeys.py b/tests/test_hostkeys.py
index 295153dd..da47362c 100644
--- a/tests/test_hostkeys.py
+++ b/tests/test_hostkeys.py
@@ -62,7 +62,7 @@ class HostKeysTest(unittest.TestCase):
def tearDown(self):
os.unlink("hostfile.temp")
- def test_1_load(self):
+ def test_load(self):
hostdict = paramiko.HostKeys("hostfile.temp")
self.assertEqual(2, len(hostdict))
self.assertEqual(1, len(list(hostdict.values())[0]))
@@ -72,7 +72,7 @@ class HostKeysTest(unittest.TestCase):
).upper()
self.assertEqual(b"E6684DB30E109B67B70FF1DC5C7F1363", fp)
- def test_2_add(self):
+ def test_add(self):
hostdict = paramiko.HostKeys("hostfile.temp")
hh = "|1|BMsIC6cUIP2zBuXR3t2LRcJYjzM=|hpkJMysjTk/+zzUUzxQEa2ieq6c="
key = paramiko.RSAKey(data=decodebytes(keyblob))
@@ -83,7 +83,7 @@ class HostKeysTest(unittest.TestCase):
self.assertEqual(b"7EC91BB336CB6D810B124B1353C32396", fp)
self.assertTrue(hostdict.check("foo.example.com", key))
- def test_3_dict(self):
+ def test_dict(self):
hostdict = paramiko.HostKeys("hostfile.temp")
self.assertTrue("secure.example.com" in hostdict)
self.assertTrue("not.example.com" not in hostdict)
@@ -98,7 +98,7 @@ class HostKeysTest(unittest.TestCase):
i += 1
self.assertEqual(2, i)
- def test_4_dict_set(self):
+ def test_dict_set(self):
hostdict = paramiko.HostKeys("hostfile.temp")
key = paramiko.RSAKey(data=decodebytes(keyblob))
key_dss = paramiko.DSSKey(data=decodebytes(keyblob_dss))
@@ -122,10 +122,10 @@ class HostKeysTest(unittest.TestCase):
def test_delitem(self):
hostdict = paramiko.HostKeys("hostfile.temp")
target = "happy.example.com"
- entry = hostdict[target] # will KeyError if not present
+ hostdict[target] # will KeyError if not present
del hostdict[target]
try:
- entry = hostdict[target]
+ hostdict[target]
except KeyError:
pass # Good
else:
diff --git a/tests/test_kex.py b/tests/test_kex.py
index 65eb9a17..69492ee2 100644
--- a/tests/test_kex.py
+++ b/tests/test_kex.py
@@ -33,8 +33,6 @@ from paramiko.kex_gex import KexGex, KexGexSHA256
from paramiko import Message
from paramiko.common import byte_chr
from paramiko.kex_ecdh_nist import KexNistp256
-from cryptography.hazmat.backends import default_backend
-from cryptography.hazmat.primitives.asymmetric import ec
def dummy_urandom(n):
@@ -42,8 +40,8 @@ def dummy_urandom(n):
def dummy_generate_key_pair(obj):
- private_key_value = 94761803665136558137557783047955027733968423115106677159790289642479432803037
- public_key_numbers = "042bdab212fa8ba1b7c843301682a4db424d307246c7e1e6083c41d9ca7b098bf30b3d63e2ec6278488c135360456cc054b3444ecc45998c08894cbc1370f5f989"
+ private_key_value = 94761803665136558137557783047955027733968423115106677159790289642479432803037 # noqa
+ public_key_numbers = "042bdab212fa8ba1b7c843301682a4db424d307246c7e1e6083c41d9ca7b098bf30b3d63e2ec6278488c135360456cc054b3444ecc45998c08894cbc1370f5f989" # noqa
public_key_numbers_obj = ec.EllipticCurvePublicNumbers.from_encoded_point(
ec.SECP256R1(), unhexlify(public_key_numbers)
)
@@ -72,7 +70,7 @@ class FakeKey(object):
class FakeModulusPack(object):
- P = 0xFFFFFFFFFFFFFFFFC90FDAA22168C234C4C6628B80DC1CD129024E088A67CC74020BBEA63B139B22514A08798E3404DDEF9519B3CD3A431B302B0A6DF25F14374FE1356D6D51C245E485B576625E7EC6F44C42E9A637ED6B0BFF5CB6F406B7EDEE386BFB5A899FA5AE9F24117C4B1FE649286651ECE65381FFFFFFFFFFFFFFFF
+ P = 0xFFFFFFFFFFFFFFFFC90FDAA22168C234C4C6628B80DC1CD129024E088A67CC74020BBEA63B139B22514A08798E3404DDEF9519B3CD3A431B302B0A6DF25F14374FE1356D6D51C245E485B576625E7EC6F44C42E9A637ED6B0BFF5CB6F406B7EDEE386BFB5A899FA5AE9F24117C4B1FE649286651ECE65381FFFFFFFFFFFFFFFF # noqa
G = 2
def get_modulus(self, min, ask, max):
@@ -113,7 +111,7 @@ class FakeTransport(object):
class KexTest(unittest.TestCase):
- K = 14730343317708716439807310032871972459448364195094179797249681733965528989482751523943515690110179031004049109375612685505881911274101441415545039654102474376472240501616988799699744135291070488314748284283496055223852115360852283821334858541043710301057312858051901453919067023103730011648890038847384890504
+ K = 14730343317708716439807310032871972459448364195094179797249681733965528989482751523943515690110179031004049109375612685505881911274101441415545039654102474376472240501616988799699744135291070488314748284283496055223852115360852283821334858541043710301057312858051901453919067023103730011648890038847384890504 # noqa
def setUp(self):
self._original_urandom = os.urandom
@@ -125,12 +123,12 @@ class KexTest(unittest.TestCase):
os.urandom = self._original_urandom
KexNistp256._generate_key_pair = self._original_generate_key_pair
- def test_1_group1_client(self):
+ def test_group1_client(self):
transport = FakeTransport()
transport.server_mode = False
kex = KexGroup1(transport)
kex.start_kex()
- x = b"1E000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D4"
+ x = b"1E000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D4" # noqa
self.assertEqual(x, hexlify(transport._message.asbytes()).upper())
self.assertEqual(
(paramiko.kex_group1._MSG_KEXDH_REPLY,), transport._expect
@@ -149,7 +147,7 @@ class KexTest(unittest.TestCase):
self.assertEqual((b"fake-host-key", b"fake-sig"), transport._verify)
self.assertTrue(transport._activated)
- def test_2_group1_server(self):
+ def test_group1_server(self):
transport = FakeTransport()
transport.server_mode = True
kex = KexGroup1(transport)
@@ -163,13 +161,13 @@ class KexTest(unittest.TestCase):
msg.rewind()
kex.parse_next(paramiko.kex_group1._MSG_KEXDH_INIT, msg)
H = b"B16BF34DD10945EDE84E9C1EF24A14BFDC843389"
- x = b"1F0000000866616B652D6B6579000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D40000000866616B652D736967"
+ x = b"1F0000000866616B652D6B6579000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D40000000866616B652D736967" # noqa
self.assertEqual(self.K, transport._K)
self.assertEqual(H, hexlify(transport._H).upper())
self.assertEqual(x, hexlify(transport._message.asbytes()).upper())
self.assertTrue(transport._activated)
- def test_3_gex_client(self):
+ def test_gex_client(self):
transport = FakeTransport()
transport.server_mode = False
kex = KexGex(transport)
@@ -185,7 +183,7 @@ class KexTest(unittest.TestCase):
msg.add_mpint(FakeModulusPack.G)
msg.rewind()
kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_GROUP, msg)
- x = b"20000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D4"
+ x = b"20000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D4" # noqa
self.assertEqual(x, hexlify(transport._message.asbytes()).upper())
self.assertEqual(
(paramiko.kex_gex._MSG_KEXDH_GEX_REPLY,), transport._expect
@@ -203,7 +201,7 @@ class KexTest(unittest.TestCase):
self.assertEqual((b"fake-host-key", b"fake-sig"), transport._verify)
self.assertTrue(transport._activated)
- def test_4_gex_old_client(self):
+ def test_gex_old_client(self):
transport = FakeTransport()
transport.server_mode = False
kex = KexGex(transport)
@@ -219,7 +217,7 @@ class KexTest(unittest.TestCase):
msg.add_mpint(FakeModulusPack.G)
msg.rewind()
kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_GROUP, msg)
- x = b"20000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D4"
+ x = b"20000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D4" # noqa
self.assertEqual(x, hexlify(transport._message.asbytes()).upper())
self.assertEqual(
(paramiko.kex_gex._MSG_KEXDH_GEX_REPLY,), transport._expect
@@ -237,7 +235,7 @@ class KexTest(unittest.TestCase):
self.assertEqual((b"fake-host-key", b"fake-sig"), transport._verify)
self.assertTrue(transport._activated)
- def test_5_gex_server(self):
+ def test_gex_server(self):
transport = FakeTransport()
transport.server_mode = True
kex = KexGex(transport)
@@ -256,7 +254,7 @@ class KexTest(unittest.TestCase):
msg.add_int(4096)
msg.rewind()
kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_REQUEST, msg)
- x = b"1F0000008100FFFFFFFFFFFFFFFFC90FDAA22168C234C4C6628B80DC1CD129024E088A67CC74020BBEA63B139B22514A08798E3404DDEF9519B3CD3A431B302B0A6DF25F14374FE1356D6D51C245E485B576625E7EC6F44C42E9A637ED6B0BFF5CB6F406B7EDEE386BFB5A899FA5AE9F24117C4B1FE649286651ECE65381FFFFFFFFFFFFFFFF0000000102"
+ x = b"1F0000008100FFFFFFFFFFFFFFFFC90FDAA22168C234C4C6628B80DC1CD129024E088A67CC74020BBEA63B139B22514A08798E3404DDEF9519B3CD3A431B302B0A6DF25F14374FE1356D6D51C245E485B576625E7EC6F44C42E9A637ED6B0BFF5CB6F406B7EDEE386BFB5A899FA5AE9F24117C4B1FE649286651ECE65381FFFFFFFFFFFFFFFF0000000102" # noqa
self.assertEqual(x, hexlify(transport._message.asbytes()).upper())
self.assertEqual(
(paramiko.kex_gex._MSG_KEXDH_GEX_INIT,), transport._expect
@@ -266,15 +264,15 @@ class KexTest(unittest.TestCase):
msg.add_mpint(12345)
msg.rewind()
kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_INIT, msg)
- K = 67592995013596137876033460028393339951879041140378510871612128162185209509220726296697886624612526735888348020498716482757677848959420073720160491114319163078862905400020959196386947926388406687288901564192071077389283980347784184487280885335302632305026248574716290537036069329724382811853044654824945750581
+ K = 67592995013596137876033460028393339951879041140378510871612128162185209509220726296697886624612526735888348020498716482757677848959420073720160491114319163078862905400020959196386947926388406687288901564192071077389283980347784184487280885335302632305026248574716290537036069329724382811853044654824945750581 # noqa
H = b"CE754197C21BF3452863B4F44D0B3951F12516EF"
- x = b"210000000866616B652D6B6579000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D40000000866616B652D736967"
+ x = b"210000000866616B652D6B6579000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D40000000866616B652D736967" # noqa
self.assertEqual(K, transport._K)
self.assertEqual(H, hexlify(transport._H).upper())
self.assertEqual(x, hexlify(transport._message.asbytes()).upper())
self.assertTrue(transport._activated)
- def test_6_gex_server_with_old_client(self):
+ def test_gex_server_with_old_client(self):
transport = FakeTransport()
transport.server_mode = True
kex = KexGex(transport)
@@ -291,7 +289,7 @@ class KexTest(unittest.TestCase):
msg.add_int(2048)
msg.rewind()
kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_REQUEST_OLD, msg)
- x = b"1F0000008100FFFFFFFFFFFFFFFFC90FDAA22168C234C4C6628B80DC1CD129024E088A67CC74020BBEA63B139B22514A08798E3404DDEF9519B3CD3A431B302B0A6DF25F14374FE1356D6D51C245E485B576625E7EC6F44C42E9A637ED6B0BFF5CB6F406B7EDEE386BFB5A899FA5AE9F24117C4B1FE649286651ECE65381FFFFFFFFFFFFFFFF0000000102"
+ x = b"1F0000008100FFFFFFFFFFFFFFFFC90FDAA22168C234C4C6628B80DC1CD129024E088A67CC74020BBEA63B139B22514A08798E3404DDEF9519B3CD3A431B302B0A6DF25F14374FE1356D6D51C245E485B576625E7EC6F44C42E9A637ED6B0BFF5CB6F406B7EDEE386BFB5A899FA5AE9F24117C4B1FE649286651ECE65381FFFFFFFFFFFFFFFF0000000102" # noqa
self.assertEqual(x, hexlify(transport._message.asbytes()).upper())
self.assertEqual(
(paramiko.kex_gex._MSG_KEXDH_GEX_INIT,), transport._expect
@@ -301,15 +299,15 @@ class KexTest(unittest.TestCase):
msg.add_mpint(12345)
msg.rewind()
kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_INIT, msg)
- K = 67592995013596137876033460028393339951879041140378510871612128162185209509220726296697886624612526735888348020498716482757677848959420073720160491114319163078862905400020959196386947926388406687288901564192071077389283980347784184487280885335302632305026248574716290537036069329724382811853044654824945750581
+ K = 67592995013596137876033460028393339951879041140378510871612128162185209509220726296697886624612526735888348020498716482757677848959420073720160491114319163078862905400020959196386947926388406687288901564192071077389283980347784184487280885335302632305026248574716290537036069329724382811853044654824945750581 # noqa
H = b"B41A06B2E59043CEFC1AE16EC31F1E2D12EC455B"
- x = b"210000000866616B652D6B6579000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D40000000866616B652D736967"
+ x = b"210000000866616B652D6B6579000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D40000000866616B652D736967" # noqa
self.assertEqual(K, transport._K)
self.assertEqual(H, hexlify(transport._H).upper())
self.assertEqual(x, hexlify(transport._message.asbytes()).upper())
self.assertTrue(transport._activated)
- def test_7_gex_sha256_client(self):
+ def test_gex_sha256_client(self):
transport = FakeTransport()
transport.server_mode = False
kex = KexGexSHA256(transport)
@@ -325,7 +323,7 @@ class KexTest(unittest.TestCase):
msg.add_mpint(FakeModulusPack.G)
msg.rewind()
kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_GROUP, msg)
- x = b"20000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D4"
+ x = b"20000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D4" # noqa
self.assertEqual(x, hexlify(transport._message.asbytes()).upper())
self.assertEqual(
(paramiko.kex_gex._MSG_KEXDH_GEX_REPLY,), transport._expect
@@ -343,7 +341,7 @@ class KexTest(unittest.TestCase):
self.assertEqual((b"fake-host-key", b"fake-sig"), transport._verify)
self.assertTrue(transport._activated)
- def test_8_gex_sha256_old_client(self):
+ def test_gex_sha256_old_client(self):
transport = FakeTransport()
transport.server_mode = False
kex = KexGexSHA256(transport)
@@ -359,7 +357,7 @@ class KexTest(unittest.TestCase):
msg.add_mpint(FakeModulusPack.G)
msg.rewind()
kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_GROUP, msg)
- x = b"20000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D4"
+ x = b"20000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D4" # noqa
self.assertEqual(x, hexlify(transport._message.asbytes()).upper())
self.assertEqual(
(paramiko.kex_gex._MSG_KEXDH_GEX_REPLY,), transport._expect
@@ -377,7 +375,7 @@ class KexTest(unittest.TestCase):
self.assertEqual((b"fake-host-key", b"fake-sig"), transport._verify)
self.assertTrue(transport._activated)
- def test_9_gex_sha256_server(self):
+ def test_gex_sha256_server(self):
transport = FakeTransport()
transport.server_mode = True
kex = KexGexSHA256(transport)
@@ -396,7 +394,7 @@ class KexTest(unittest.TestCase):
msg.add_int(4096)
msg.rewind()
kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_REQUEST, msg)
- x = b"1F0000008100FFFFFFFFFFFFFFFFC90FDAA22168C234C4C6628B80DC1CD129024E088A67CC74020BBEA63B139B22514A08798E3404DDEF9519B3CD3A431B302B0A6DF25F14374FE1356D6D51C245E485B576625E7EC6F44C42E9A637ED6B0BFF5CB6F406B7EDEE386BFB5A899FA5AE9F24117C4B1FE649286651ECE65381FFFFFFFFFFFFFFFF0000000102"
+ x = b"1F0000008100FFFFFFFFFFFFFFFFC90FDAA22168C234C4C6628B80DC1CD129024E088A67CC74020BBEA63B139B22514A08798E3404DDEF9519B3CD3A431B302B0A6DF25F14374FE1356D6D51C245E485B576625E7EC6F44C42E9A637ED6B0BFF5CB6F406B7EDEE386BFB5A899FA5AE9F24117C4B1FE649286651ECE65381FFFFFFFFFFFFFFFF0000000102" # noqa
self.assertEqual(x, hexlify(transport._message.asbytes()).upper())
self.assertEqual(
(paramiko.kex_gex._MSG_KEXDH_GEX_INIT,), transport._expect
@@ -406,15 +404,15 @@ class KexTest(unittest.TestCase):
msg.add_mpint(12345)
msg.rewind()
kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_INIT, msg)
- K = 67592995013596137876033460028393339951879041140378510871612128162185209509220726296697886624612526735888348020498716482757677848959420073720160491114319163078862905400020959196386947926388406687288901564192071077389283980347784184487280885335302632305026248574716290537036069329724382811853044654824945750581
+ K = 67592995013596137876033460028393339951879041140378510871612128162185209509220726296697886624612526735888348020498716482757677848959420073720160491114319163078862905400020959196386947926388406687288901564192071077389283980347784184487280885335302632305026248574716290537036069329724382811853044654824945750581 # noqa
H = b"CCAC0497CF0ABA1DBF55E1A3995D17F4CC31824B0E8D95CDF8A06F169D050D80"
- x = b"210000000866616B652D6B6579000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D40000000866616B652D736967"
+ x = b"210000000866616B652D6B6579000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D40000000866616B652D736967" # noqa
self.assertEqual(K, transport._K)
self.assertEqual(H, hexlify(transport._H).upper())
self.assertEqual(x, hexlify(transport._message.asbytes()).upper())
self.assertTrue(transport._activated)
- def test_10_gex_sha256_server_with_old_client(self):
+ def test_gex_sha256_server_with_old_client(self):
transport = FakeTransport()
transport.server_mode = True
kex = KexGexSHA256(transport)
@@ -431,7 +429,7 @@ class KexTest(unittest.TestCase):
msg.add_int(2048)
msg.rewind()
kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_REQUEST_OLD, msg)
- x = b"1F0000008100FFFFFFFFFFFFFFFFC90FDAA22168C234C4C6628B80DC1CD129024E088A67CC74020BBEA63B139B22514A08798E3404DDEF9519B3CD3A431B302B0A6DF25F14374FE1356D6D51C245E485B576625E7EC6F44C42E9A637ED6B0BFF5CB6F406B7EDEE386BFB5A899FA5AE9F24117C4B1FE649286651ECE65381FFFFFFFFFFFFFFFF0000000102"
+ x = b"1F0000008100FFFFFFFFFFFFFFFFC90FDAA22168C234C4C6628B80DC1CD129024E088A67CC74020BBEA63B139B22514A08798E3404DDEF9519B3CD3A431B302B0A6DF25F14374FE1356D6D51C245E485B576625E7EC6F44C42E9A637ED6B0BFF5CB6F406B7EDEE386BFB5A899FA5AE9F24117C4B1FE649286651ECE65381FFFFFFFFFFFFFFFF0000000102" # noqa
self.assertEqual(x, hexlify(transport._message.asbytes()).upper())
self.assertEqual(
(paramiko.kex_gex._MSG_KEXDH_GEX_INIT,), transport._expect
@@ -441,16 +439,16 @@ class KexTest(unittest.TestCase):
msg.add_mpint(12345)
msg.rewind()
kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_INIT, msg)
- K = 67592995013596137876033460028393339951879041140378510871612128162185209509220726296697886624612526735888348020498716482757677848959420073720160491114319163078862905400020959196386947926388406687288901564192071077389283980347784184487280885335302632305026248574716290537036069329724382811853044654824945750581
+ K = 67592995013596137876033460028393339951879041140378510871612128162185209509220726296697886624612526735888348020498716482757677848959420073720160491114319163078862905400020959196386947926388406687288901564192071077389283980347784184487280885335302632305026248574716290537036069329724382811853044654824945750581 # noqa
H = b"3DDD2AD840AD095E397BA4D0573972DC60F6461FD38A187CACA6615A5BC8ADBB"
- x = b"210000000866616B652D6B6579000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D40000000866616B652D736967"
+ x = b"210000000866616B652D6B6579000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D40000000866616B652D736967" # noqa
self.assertEqual(K, transport._K)
self.assertEqual(H, hexlify(transport._H).upper())
self.assertEqual(x, hexlify(transport._message.asbytes()).upper())
self.assertTrue(transport._activated)
- def test_11_kex_nistp256_client(self):
- K = 91610929826364598472338906427792435253694642563583721654249504912114314269754
+ def test_kex_nistp256_client(self):
+ K = 91610929826364598472338906427792435253694642563583721654249504912114314269754 # noqa
transport = FakeTransport()
transport.server_mode = False
kex = KexNistp256(transport)
@@ -463,7 +461,7 @@ class KexTest(unittest.TestCase):
msg = Message()
msg.add_string("fake-host-key")
Q_S = unhexlify(
- "043ae159594ba062efa121480e9ef136203fa9ec6b6e1f8723a321c16e62b945f573f3b822258cbcd094b9fa1c125cbfe5f043280893e66863cc0cb4dccbe70210"
+ "043ae159594ba062efa121480e9ef136203fa9ec6b6e1f8723a321c16e62b945f573f3b822258cbcd094b9fa1c125cbfe5f043280893e66863cc0cb4dccbe70210" # noqa
)
msg.add_string(Q_S)
msg.add_string("fake-sig")
@@ -475,8 +473,8 @@ class KexTest(unittest.TestCase):
self.assertEqual((b"fake-host-key", b"fake-sig"), transport._verify)
self.assertTrue(transport._activated)
- def test_12_kex_nistp256_server(self):
- K = 91610929826364598472338906427792435253694642563583721654249504912114314269754
+ def test_kex_nistp256_server(self):
+ K = 91610929826364598472338906427792435253694642563583721654249504912114314269754 # noqa
transport = FakeTransport()
transport.server_mode = True
kex = KexNistp256(transport)
@@ -488,7 +486,7 @@ class KexTest(unittest.TestCase):
# fake init
msg = Message()
Q_C = unhexlify(
- "043ae159594ba062efa121480e9ef136203fa9ec6b6e1f8723a321c16e62b945f573f3b822258cbcd094b9fa1c125cbfe5f043280893e66863cc0cb4dccbe70210"
+ "043ae159594ba062efa121480e9ef136203fa9ec6b6e1f8723a321c16e62b945f573f3b822258cbcd094b9fa1c125cbfe5f043280893e66863cc0cb4dccbe70210" # noqa
)
H = b"2EF4957AFD530DD3F05DBEABF68D724FACC060974DA9704F2AEE4C3DE861E7CA"
msg.add_string(Q_C)
diff --git a/tests/test_kex_gss.py b/tests/test_kex_gss.py
index c71ff91c..42e0a101 100644
--- a/tests/test_kex_gss.py
+++ b/tests/test_kex_gss.py
@@ -142,7 +142,7 @@ class GSSKexTest(unittest.TestCase):
stdout.close()
stderr.close()
- def test_1_gsskex_and_auth(self):
+ def test_gsskex_and_auth(self):
"""
Verify that Paramiko can handle SSHv2 GSS-API / SSPI authenticated
Diffie-Hellman Key Exchange and user authentication with the GSS-API
@@ -150,7 +150,7 @@ class GSSKexTest(unittest.TestCase):
"""
self._test_gsskex_and_auth(gss_host=None)
- def test_2_gsskex_and_auth_rekey(self):
+ def test_gsskex_and_auth_rekey(self):
"""
Verify that Paramiko can rekey.
"""
diff --git a/tests/test_message.py b/tests/test_message.py
index b843a705..57766d90 100644
--- a/tests/test_message.py
+++ b/tests/test_message.py
@@ -29,14 +29,14 @@ from paramiko.common import byte_chr, zero_byte
class MessageTest(unittest.TestCase):
__a = (
- b"\x00\x00\x00\x17\x07\x60\xe0\x90\x00\x00\x00\x01\x71\x00\x00\x00\x05\x68\x65\x6c\x6c\x6f\x00\x00\x03\xe8"
+ b"\x00\x00\x00\x17\x07\x60\xe0\x90\x00\x00\x00\x01\x71\x00\x00\x00\x05\x68\x65\x6c\x6c\x6f\x00\x00\x03\xe8" # noqa
+ b"x" * 1000
)
- __b = b"\x01\x00\xf3\x00\x3f\x00\x00\x00\x10\x68\x75\x65\x79\x2c\x64\x65\x77\x65\x79\x2c\x6c\x6f\x75\x69\x65"
- __c = b"\x00\x00\x00\x00\x00\x00\x00\x05\x00\x00\xf5\xe4\xd3\xc2\xb1\x09\x00\x00\x00\x01\x11\x00\x00\x00\x07\x00\xf5\xe4\xd3\xc2\xb1\x09\x00\x00\x00\x06\x9a\x1b\x2c\x3d\x4e\xf7"
- __d = b"\x00\x00\x00\x05\xff\x00\x00\x00\x05\x11\x22\x33\x44\x55\xff\x00\x00\x00\x0a\x00\xf0\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x03\x63\x61\x74\x00\x00\x00\x03\x61\x2c\x62"
+ __b = b"\x01\x00\xf3\x00\x3f\x00\x00\x00\x10\x68\x75\x65\x79\x2c\x64\x65\x77\x65\x79\x2c\x6c\x6f\x75\x69\x65" # noqa
+ __c = b"\x00\x00\x00\x00\x00\x00\x00\x05\x00\x00\xf5\xe4\xd3\xc2\xb1\x09\x00\x00\x00\x01\x11\x00\x00\x00\x07\x00\xf5\xe4\xd3\xc2\xb1\x09\x00\x00\x00\x06\x9a\x1b\x2c\x3d\x4e\xf7" # noqa
+ __d = b"\x00\x00\x00\x05\xff\x00\x00\x00\x05\x11\x22\x33\x44\x55\xff\x00\x00\x00\x0a\x00\xf0\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x03\x63\x61\x74\x00\x00\x00\x03\x61\x2c\x62" # noqa
- def test_1_encode(self):
+ def test_encode(self):
msg = Message()
msg.add_int(23)
msg.add_int(123789456)
@@ -62,7 +62,7 @@ class MessageTest(unittest.TestCase):
msg.add_mpint(-0x65e4d3c2b109)
self.assertEqual(msg.asbytes(), self.__c)
- def test_2_decode(self):
+ def test_decode(self):
msg = Message(self.__a)
self.assertEqual(msg.get_int(), 23)
self.assertEqual(msg.get_int(), 123789456)
@@ -84,7 +84,7 @@ class MessageTest(unittest.TestCase):
self.assertEqual(msg.get_mpint(), 0xf5e4d3c2b109)
self.assertEqual(msg.get_mpint(), -0x65e4d3c2b109)
- def test_3_add(self):
+ def test_add(self):
msg = Message()
msg.add(5)
msg.add(0x1122334455)
@@ -94,7 +94,7 @@ class MessageTest(unittest.TestCase):
msg.add(["a", "b"])
self.assertEqual(msg.asbytes(), self.__d)
- def test_4_misc(self):
+ def test_misc(self):
msg = Message(self.__d)
self.assertEqual(msg.get_adaptive_int(), 5)
self.assertEqual(msg.get_adaptive_int(), 0x1122334455)
diff --git a/tests/test_packetizer.py b/tests/test_packetizer.py
index 6920f08e..de80770e 100644
--- a/tests/test_packetizer.py
+++ b/tests/test_packetizer.py
@@ -38,7 +38,7 @@ x1f = byte_chr(0x1f)
class PacketizerTest(unittest.TestCase):
- def test_1_write(self):
+ def test_write(self):
rsock = LoopSocket()
wsock = LoopSocket()
rsock.link(wsock)
@@ -64,11 +64,11 @@ class PacketizerTest(unittest.TestCase):
# 32 + 12 bytes of MAC = 44
self.assertEqual(44, len(data))
self.assertEqual(
- b"\x43\x91\x97\xbd\x5b\x50\xac\x25\x87\xc2\xc4\x6b\xc7\xe9\x38\xc0",
+ b"\x43\x91\x97\xbd\x5b\x50\xac\x25\x87\xc2\xc4\x6b\xc7\xe9\x38\xc0", # noqa
data[:16],
)
- def test_2_read(self):
+ def test_read(self):
rsock = LoopSocket()
wsock = LoopSocket()
rsock.link(wsock)
@@ -82,7 +82,7 @@ class PacketizerTest(unittest.TestCase):
).decryptor()
p.set_inbound_cipher(decryptor, 16, sha1, 12, x1f * 20)
wsock.send(
- b"\x43\x91\x97\xbd\x5b\x50\xac\x25\x87\xc2\xc4\x6b\xc7\xe9\x38\xc0\x90\xd2\x16\x56\x0d\x71\x73\x61\x38\x7c\x4c\x3d\xfb\x97\x7d\xe2\x6e\x03\xb1\xa0\xc2\x1c\xd6\x41\x41\x4c\xb4\x59"
+ b"\x43\x91\x97\xbd\x5b\x50\xac\x25\x87\xc2\xc4\x6b\xc7\xe9\x38\xc0\x90\xd2\x16\x56\x0d\x71\x73\x61\x38\x7c\x4c\x3d\xfb\x97\x7d\xe2\x6e\x03\xb1\xa0\xc2\x1c\xd6\x41\x41\x4c\xb4\x59" # noqa
)
cmd, m = p.read_message()
self.assertEqual(100, cmd)
@@ -90,7 +90,7 @@ class PacketizerTest(unittest.TestCase):
self.assertEqual(1, m.get_int())
self.assertEqual(900, m.get_int())
- def test_3_closed(self):
+ def test_closed(self):
if sys.platform.startswith("win"): # no SIGALRM on windows
return
rsock = LoopSocket()
diff --git a/tests/test_pkey.py b/tests/test_pkey.py
index 4e84da7b..749bb5f5 100644
--- a/tests/test_pkey.py
+++ b/tests/test_pkey.py
@@ -25,7 +25,6 @@ import unittest
import os
from binascii import hexlify
from hashlib import md5
-import base64
from paramiko import RSAKey, DSSKey, ECDSAKey, Ed25519Key, Message, util
from paramiko.py3compat import StringIO, byte_chr, b, bytes, PY2
@@ -34,18 +33,18 @@ from .util import _support
# from openssh's ssh-keygen
-PUB_RSA = "ssh-rsa AAAAB3NzaC1yc2EAAAABIwAAAIEA049W6geFpmsljTwfvI1UmKWWJPNFI74+vNKTk4dmzkQY2yAMs6FhlvhlI8ysU4oj71ZsRYMecHbBbxdN79+JRFVYTKaLqjwGENeTd+yv4q+V2PvZv3fLnzApI3l7EJCqhWwJUHJ1jAkZzqDx0tyOL4uoZpww3nmE0kb3y21tH4c="
-PUB_DSS = "ssh-dss AAAAB3NzaC1kc3MAAACBAOeBpgNnfRzr/twmAQRu2XwWAp3CFtrVnug6s6fgwj/oLjYbVtjAy6pl/h0EKCWx2rf1IetyNsTxWrniA9I6HeDj65X1FyDkg6g8tvCnaNB8Xp/UUhuzHuGsMIipRxBxw9LF608EqZcj1E3ytktoW5B5OcjrkEoz3xG7C+rpIjYvAAAAFQDwz4UnmsGiSNu5iqjn3uTzwUpshwAAAIEAkxfFeY8P2wZpDjX0MimZl5wkoFQDL25cPzGBuB4OnB8NoUk/yjAHIIpEShw8V+LzouMK5CTJQo5+Ngw3qIch/WgRmMHy4kBq1SsXMjQCte1So6HBMvBPIW5SiMTmjCfZZiw4AYHK+B/JaOwaG9yRg2Ejg4Ok10+XFDxlqZo8Y+wAAACARmR7CCPjodxASvRbIyzaVpZoJ/Z6x7dAumV+ysrV1BVYd0lYukmnjO1kKBWApqpH1ve9XDQYN8zgxM4b16L21kpoWQnZtXrY3GZ4/it9kUgyB7+NwacIBlXa8cMDL7Q/69o0d54U0X/NeX5QxuYR6OMJlrkQB7oiW/P/1mwjQgE="
-PUB_ECDSA_256 = "ecdsa-sha2-nistp256 AAAAE2VjZHNhLXNoYTItbmlzdHAyNTYAAAAIbmlzdHAyNTYAAABBBJSPZm3ZWkvk/Zx8WP+fZRZ5/NBBHnGQwR6uIC6XHGPDIHuWUzIjAwA0bzqkOUffEsbLe+uQgKl5kbc/L8KA/eo="
-PUB_ECDSA_384 = "ecdsa-sha2-nistp384 AAAAE2VjZHNhLXNoYTItbmlzdHAzODQAAAAIbmlzdHAzODQAAABhBBbGibQLW9AAZiGN2hEQxWYYoFaWKwN3PKSaDJSMqmIn1Z9sgRUuw8Y/w502OGvXL/wFk0i2z50l3pWZjD7gfMH7gX5TUiCzwrQkS+Hn1U2S9aF5WJp0NcIzYxXw2r4M2A=="
-PUB_ECDSA_521 = "ecdsa-sha2-nistp521 AAAAE2VjZHNhLXNoYTItbmlzdHA1MjEAAAAIbmlzdHA1MjEAAACFBACaOaFLZGuxa5AW16qj6VLypFbLrEWrt9AZUloCMefxO8bNLjK/O5g0rAVasar1TnyHE9qj4NwzANZASWjQNbc4MAG8vzqezFwLIn/kNyNTsXNfqEko9OgHZknlj2Z79dwTJcRAL4QLcT5aND0EHZLB2fAUDXiWIb2j4rg1mwPlBMiBXA=="
+PUB_RSA = "ssh-rsa AAAAB3NzaC1yc2EAAAABIwAAAIEA049W6geFpmsljTwfvI1UmKWWJPNFI74+vNKTk4dmzkQY2yAMs6FhlvhlI8ysU4oj71ZsRYMecHbBbxdN79+JRFVYTKaLqjwGENeTd+yv4q+V2PvZv3fLnzApI3l7EJCqhWwJUHJ1jAkZzqDx0tyOL4uoZpww3nmE0kb3y21tH4c=" # noqa
+PUB_DSS = "ssh-dss AAAAB3NzaC1kc3MAAACBAOeBpgNnfRzr/twmAQRu2XwWAp3CFtrVnug6s6fgwj/oLjYbVtjAy6pl/h0EKCWx2rf1IetyNsTxWrniA9I6HeDj65X1FyDkg6g8tvCnaNB8Xp/UUhuzHuGsMIipRxBxw9LF608EqZcj1E3ytktoW5B5OcjrkEoz3xG7C+rpIjYvAAAAFQDwz4UnmsGiSNu5iqjn3uTzwUpshwAAAIEAkxfFeY8P2wZpDjX0MimZl5wkoFQDL25cPzGBuB4OnB8NoUk/yjAHIIpEShw8V+LzouMK5CTJQo5+Ngw3qIch/WgRmMHy4kBq1SsXMjQCte1So6HBMvBPIW5SiMTmjCfZZiw4AYHK+B/JaOwaG9yRg2Ejg4Ok10+XFDxlqZo8Y+wAAACARmR7CCPjodxASvRbIyzaVpZoJ/Z6x7dAumV+ysrV1BVYd0lYukmnjO1kKBWApqpH1ve9XDQYN8zgxM4b16L21kpoWQnZtXrY3GZ4/it9kUgyB7+NwacIBlXa8cMDL7Q/69o0d54U0X/NeX5QxuYR6OMJlrkQB7oiW/P/1mwjQgE=" # noqa
+PUB_ECDSA_256 = "ecdsa-sha2-nistp256 AAAAE2VjZHNhLXNoYTItbmlzdHAyNTYAAAAIbmlzdHAyNTYAAABBBJSPZm3ZWkvk/Zx8WP+fZRZ5/NBBHnGQwR6uIC6XHGPDIHuWUzIjAwA0bzqkOUffEsbLe+uQgKl5kbc/L8KA/eo=" # noqa
+PUB_ECDSA_384 = "ecdsa-sha2-nistp384 AAAAE2VjZHNhLXNoYTItbmlzdHAzODQAAAAIbmlzdHAzODQAAABhBBbGibQLW9AAZiGN2hEQxWYYoFaWKwN3PKSaDJSMqmIn1Z9sgRUuw8Y/w502OGvXL/wFk0i2z50l3pWZjD7gfMH7gX5TUiCzwrQkS+Hn1U2S9aF5WJp0NcIzYxXw2r4M2A==" # noqa
+PUB_ECDSA_521 = "ecdsa-sha2-nistp521 AAAAE2VjZHNhLXNoYTItbmlzdHA1MjEAAAAIbmlzdHA1MjEAAACFBACaOaFLZGuxa5AW16qj6VLypFbLrEWrt9AZUloCMefxO8bNLjK/O5g0rAVasar1TnyHE9qj4NwzANZASWjQNbc4MAG8vzqezFwLIn/kNyNTsXNfqEko9OgHZknlj2Z79dwTJcRAL4QLcT5aND0EHZLB2fAUDXiWIb2j4rg1mwPlBMiBXA==" # noqa
FINGER_RSA = "1024 60:73:38:44:cb:51:86:65:7f:de:da:a2:2b:5a:57:d5"
FINGER_DSS = "1024 44:78:f0:b9:a2:3c:c5:18:20:09:ff:75:5b:c1:d2:6c"
FINGER_ECDSA_256 = "256 25:19:eb:55:e6:a1:47:ff:4f:38:d2:75:6f:a5:d5:60"
FINGER_ECDSA_384 = "384 c1:8d:a0:59:09:47:41:8e:a8:a6:07:01:29:23:b4:65"
FINGER_ECDSA_521 = "521 44:58:22:52:12:33:16:0e:ce:0e:be:2c:7c:7e:cc:1e"
-SIGNED_RSA = "20:d7:8a:31:21:cb:f7:92:12:f2:a4:89:37:f5:78:af:e6:16:b6:25:b9:97:3d:a2:cd:5f:ca:20:21:73:4c:ad:34:73:8f:20:77:28:e2:94:15:08:d8:91:40:7a:85:83:bf:18:37:95:dc:54:1a:9b:88:29:6c:73:ca:38:b4:04:f1:56:b9:f2:42:9d:52:1b:29:29:b4:4f:fd:c9:2d:af:47:d2:40:76:30:f3:63:45:0c:d9:1d:43:86:0f:1c:70:e2:93:12:34:f3:ac:c5:0a:2f:14:50:66:59:f1:88:ee:c1:4a:e9:d1:9c:4e:46:f0:0e:47:6f:38:74:f1:44:a8"
+SIGNED_RSA = "20:d7:8a:31:21:cb:f7:92:12:f2:a4:89:37:f5:78:af:e6:16:b6:25:b9:97:3d:a2:cd:5f:ca:20:21:73:4c:ad:34:73:8f:20:77:28:e2:94:15:08:d8:91:40:7a:85:83:bf:18:37:95:dc:54:1a:9b:88:29:6c:73:ca:38:b4:04:f1:56:b9:f2:42:9d:52:1b:29:29:b4:4f:fd:c9:2d:af:47:d2:40:76:30:f3:63:45:0c:d9:1d:43:86:0f:1c:70:e2:93:12:34:f3:ac:c5:0a:2f:14:50:66:59:f1:88:ee:c1:4a:e9:d1:9c:4e:46:f0:0e:47:6f:38:74:f1:44:a8" # noqa
RSA_PRIVATE_OUT = """\
-----BEGIN RSA PRIVATE KEY-----
@@ -109,8 +108,8 @@ L4QLcT5aND0EHZLB2fAUDXiWIb2j4rg1mwPlBMiBXA==
x1234 = b"\x01\x02\x03\x04"
-TEST_KEY_BYTESTR_2 = "\x00\x00\x00\x07ssh-rsa\x00\x00\x00\x01#\x00\x00\x00\x81\x00\xd3\x8fV\xea\x07\x85\xa6k%\x8d<\x1f\xbc\x8dT\x98\xa5\x96$\xf3E#\xbe>\xbc\xd2\x93\x93\x87f\xceD\x18\xdb \x0c\xb3\xa1a\x96\xf8e#\xcc\xacS\x8a#\xefVlE\x83\x1epv\xc1o\x17M\xef\xdf\x89DUXL\xa6\x8b\xaa<\x06\x10\xd7\x93w\xec\xaf\xe2\xaf\x95\xd8\xfb\xd9\xbfw\xcb\x9f0)#y{\x10\x90\xaa\x85l\tPru\x8c\t\x19\xce\xa0\xf1\xd2\xdc\x8e/\x8b\xa8f\x9c0\xdey\x84\xd2F\xf7\xcbmm\x1f\x87"
-TEST_KEY_BYTESTR_3 = "\x00\x00\x00\x07ssh-rsa\x00\x00\x00\x01#\x00\x00\x00\x00ӏV\x07k%<\x1fT$E#>ғfD\x18 \x0cae#̬S#VlE\x1epvo\x17M߉DUXL<\x06\x10דw\u2bd5ٿw˟0)#y{\x10l\tPru\t\x19Π\u070e/f0yFmm\x1f"
+TEST_KEY_BYTESTR_2 = "\x00\x00\x00\x07ssh-rsa\x00\x00\x00\x01#\x00\x00\x00\x81\x00\xd3\x8fV\xea\x07\x85\xa6k%\x8d<\x1f\xbc\x8dT\x98\xa5\x96$\xf3E#\xbe>\xbc\xd2\x93\x93\x87f\xceD\x18\xdb \x0c\xb3\xa1a\x96\xf8e#\xcc\xacS\x8a#\xefVlE\x83\x1epv\xc1o\x17M\xef\xdf\x89DUXL\xa6\x8b\xaa<\x06\x10\xd7\x93w\xec\xaf\xe2\xaf\x95\xd8\xfb\xd9\xbfw\xcb\x9f0)#y{\x10\x90\xaa\x85l\tPru\x8c\t\x19\xce\xa0\xf1\xd2\xdc\x8e/\x8b\xa8f\x9c0\xdey\x84\xd2F\xf7\xcbmm\x1f\x87" # noqa
+TEST_KEY_BYTESTR_3 = "\x00\x00\x00\x07ssh-rsa\x00\x00\x00\x01#\x00\x00\x00\x00ӏV\x07k%<\x1fT$E#>ғfD\x18 \x0cae#̬S#VlE\x1epvo\x17M߉DUXL<\x06\x10דw\u2bd5ٿw˟0)#y{\x10l\tPru\t\x19Π\u070e/f0yFmm\x1f" # noqa
class KeyTest(unittest.TestCase):
@@ -131,12 +130,12 @@ class KeyTest(unittest.TestCase):
self.assertEqual(fh.readline()[:-1], "Proc-Type: 4,ENCRYPTED")
self.assertEqual(fh.readline()[0:10], "DEK-Info: ")
- def test_1_generate_key_bytes(self):
+ def test_generate_key_bytes(self):
key = util.generate_key_bytes(md5, x1234, "happy birthday", 30)
- exp = b"\x61\xE1\xF2\x72\xF4\xC1\xC4\x56\x15\x86\xBD\x32\x24\x98\xC0\xE9\x24\x67\x27\x80\xF4\x7B\xB3\x7D\xDA\x7D\x54\x01\x9E\x64"
+ exp = b"\x61\xE1\xF2\x72\xF4\xC1\xC4\x56\x15\x86\xBD\x32\x24\x98\xC0\xE9\x24\x67\x27\x80\xF4\x7B\xB3\x7D\xDA\x7D\x54\x01\x9E\x64" # noqa
self.assertEqual(exp, key)
- def test_2_load_rsa(self):
+ def test_load_rsa(self):
key = RSAKey.from_private_key_file(_support("test_rsa.key"))
self.assertEqual("ssh-rsa", key.get_name())
exp_rsa = b(FINGER_RSA.split()[1].replace(":", ""))
@@ -152,7 +151,7 @@ class KeyTest(unittest.TestCase):
key2 = RSAKey.from_private_key(s)
self.assertEqual(key, key2)
- def test_3_load_rsa_password(self):
+ def test_load_rsa_password(self):
key = RSAKey.from_private_key_file(
_support("test_rsa_password.key"), "television"
)
@@ -163,7 +162,7 @@ class KeyTest(unittest.TestCase):
self.assertEqual(PUB_RSA.split()[1], key.get_base64())
self.assertEqual(1024, key.get_bits())
- def test_4_load_dss(self):
+ def test_load_dss(self):
key = DSSKey.from_private_key_file(_support("test_dss.key"))
self.assertEqual("ssh-dss", key.get_name())
exp_dss = b(FINGER_DSS.split()[1].replace(":", ""))
@@ -179,7 +178,7 @@ class KeyTest(unittest.TestCase):
key2 = DSSKey.from_private_key(s)
self.assertEqual(key, key2)
- def test_5_load_dss_password(self):
+ def test_load_dss_password(self):
key = DSSKey.from_private_key_file(
_support("test_dss_password.key"), "television"
)
@@ -190,7 +189,7 @@ class KeyTest(unittest.TestCase):
self.assertEqual(PUB_DSS.split()[1], key.get_base64())
self.assertEqual(1024, key.get_bits())
- def test_6_compare_rsa(self):
+ def test_compare_rsa(self):
# verify that the private & public keys compare equal
key = RSAKey.from_private_key_file(_support("test_rsa.key"))
self.assertEqual(key, key)
@@ -199,7 +198,7 @@ class KeyTest(unittest.TestCase):
self.assertTrue(not pub.can_sign())
self.assertEqual(key, pub)
- def test_7_compare_dss(self):
+ def test_compare_dss(self):
# verify that the private & public keys compare equal
key = DSSKey.from_private_key_file(_support("test_dss.key"))
self.assertEqual(key, key)
@@ -208,7 +207,7 @@ class KeyTest(unittest.TestCase):
self.assertTrue(not pub.can_sign())
self.assertEqual(key, pub)
- def test_8_sign_rsa(self):
+ def test_sign_rsa(self):
# verify that the rsa private key can sign and verify
key = RSAKey.from_private_key_file(_support("test_rsa.key"))
msg = key.sign_ssh_data(b"ice weasels")
@@ -223,7 +222,7 @@ class KeyTest(unittest.TestCase):
pub = RSAKey(data=key.asbytes())
self.assertTrue(pub.verify_ssh_sig(b"ice weasels", msg))
- def test_9_sign_dss(self):
+ def test_sign_dss(self):
# verify that the dss private key can sign and verify
key = DSSKey.from_private_key_file(_support("test_dss.key"))
msg = key.sign_ssh_data(b"ice weasels")
@@ -238,19 +237,19 @@ class KeyTest(unittest.TestCase):
pub = DSSKey(data=key.asbytes())
self.assertTrue(pub.verify_ssh_sig(b"ice weasels", msg))
- def test_A_generate_rsa(self):
+ def test_generate_rsa(self):
key = RSAKey.generate(1024)
msg = key.sign_ssh_data(b"jerri blank")
msg.rewind()
self.assertTrue(key.verify_ssh_sig(b"jerri blank", msg))
- def test_B_generate_dss(self):
+ def test_generate_dss(self):
key = DSSKey.generate(1024)
msg = key.sign_ssh_data(b"jerri blank")
msg.rewind()
self.assertTrue(key.verify_ssh_sig(b"jerri blank", msg))
- def test_C_generate_ecdsa(self):
+ def test_generate_ecdsa(self):
key = ECDSAKey.generate()
msg = key.sign_ssh_data(b"jerri blank")
msg.rewind()
@@ -279,7 +278,7 @@ class KeyTest(unittest.TestCase):
self.assertEqual(key.get_bits(), 521)
self.assertEqual(key.get_name(), "ecdsa-sha2-nistp521")
- def test_10_load_ecdsa_256(self):
+ def test_load_ecdsa_256(self):
key = ECDSAKey.from_private_key_file(_support("test_ecdsa_256.key"))
self.assertEqual("ecdsa-sha2-nistp256", key.get_name())
exp_ecdsa = b(FINGER_ECDSA_256.split()[1].replace(":", ""))
@@ -295,7 +294,7 @@ class KeyTest(unittest.TestCase):
key2 = ECDSAKey.from_private_key(s)
self.assertEqual(key, key2)
- def test_11_load_ecdsa_password_256(self):
+ def test_load_ecdsa_password_256(self):
key = ECDSAKey.from_private_key_file(
_support("test_ecdsa_password_256.key"), b"television"
)
@@ -306,7 +305,7 @@ class KeyTest(unittest.TestCase):
self.assertEqual(PUB_ECDSA_256.split()[1], key.get_base64())
self.assertEqual(256, key.get_bits())
- def test_12_compare_ecdsa_256(self):
+ def test_compare_ecdsa_256(self):
# verify that the private & public keys compare equal
key = ECDSAKey.from_private_key_file(_support("test_ecdsa_256.key"))
self.assertEqual(key, key)
@@ -315,7 +314,7 @@ class KeyTest(unittest.TestCase):
self.assertTrue(not pub.can_sign())
self.assertEqual(key, pub)
- def test_13_sign_ecdsa_256(self):
+ def test_sign_ecdsa_256(self):
# verify that the rsa private key can sign and verify
key = ECDSAKey.from_private_key_file(_support("test_ecdsa_256.key"))
msg = key.sign_ssh_data(b"ice weasels")
@@ -331,7 +330,7 @@ class KeyTest(unittest.TestCase):
pub = ECDSAKey(data=key.asbytes())
self.assertTrue(pub.verify_ssh_sig(b"ice weasels", msg))
- def test_14_load_ecdsa_384(self):
+ def test_load_ecdsa_384(self):
key = ECDSAKey.from_private_key_file(_support("test_ecdsa_384.key"))
self.assertEqual("ecdsa-sha2-nistp384", key.get_name())
exp_ecdsa = b(FINGER_ECDSA_384.split()[1].replace(":", ""))
@@ -347,7 +346,7 @@ class KeyTest(unittest.TestCase):
key2 = ECDSAKey.from_private_key(s)
self.assertEqual(key, key2)
- def test_15_load_ecdsa_password_384(self):
+ def test_load_ecdsa_password_384(self):
key = ECDSAKey.from_private_key_file(
_support("test_ecdsa_password_384.key"), b"television"
)
@@ -358,7 +357,7 @@ class KeyTest(unittest.TestCase):
self.assertEqual(PUB_ECDSA_384.split()[1], key.get_base64())
self.assertEqual(384, key.get_bits())
- def test_16_compare_ecdsa_384(self):
+ def test_compare_ecdsa_384(self):
# verify that the private & public keys compare equal
key = ECDSAKey.from_private_key_file(_support("test_ecdsa_384.key"))
self.assertEqual(key, key)
@@ -367,7 +366,7 @@ class KeyTest(unittest.TestCase):
self.assertTrue(not pub.can_sign())
self.assertEqual(key, pub)
- def test_17_sign_ecdsa_384(self):
+ def test_sign_ecdsa_384(self):
# verify that the rsa private key can sign and verify
key = ECDSAKey.from_private_key_file(_support("test_ecdsa_384.key"))
msg = key.sign_ssh_data(b"ice weasels")
@@ -383,7 +382,7 @@ class KeyTest(unittest.TestCase):
pub = ECDSAKey(data=key.asbytes())
self.assertTrue(pub.verify_ssh_sig(b"ice weasels", msg))
- def test_18_load_ecdsa_521(self):
+ def test_load_ecdsa_521(self):
key = ECDSAKey.from_private_key_file(_support("test_ecdsa_521.key"))
self.assertEqual("ecdsa-sha2-nistp521", key.get_name())
exp_ecdsa = b(FINGER_ECDSA_521.split()[1].replace(":", ""))
@@ -402,7 +401,7 @@ class KeyTest(unittest.TestCase):
key2 = ECDSAKey.from_private_key(s)
self.assertEqual(key, key2)
- def test_19_load_ecdsa_password_521(self):
+ def test_load_ecdsa_password_521(self):
key = ECDSAKey.from_private_key_file(
_support("test_ecdsa_password_521.key"), b"television"
)
@@ -413,7 +412,7 @@ class KeyTest(unittest.TestCase):
self.assertEqual(PUB_ECDSA_521.split()[1], key.get_base64())
self.assertEqual(521, key.get_bits())
- def test_20_compare_ecdsa_521(self):
+ def test_compare_ecdsa_521(self):
# verify that the private & public keys compare equal
key = ECDSAKey.from_private_key_file(_support("test_ecdsa_521.key"))
self.assertEqual(key, key)
@@ -422,7 +421,7 @@ class KeyTest(unittest.TestCase):
self.assertTrue(not pub.can_sign())
self.assertEqual(key, pub)
- def test_21_sign_ecdsa_521(self):
+ def test_sign_ecdsa_521(self):
# verify that the rsa private key can sign and verify
key = ECDSAKey.from_private_key_file(_support("test_ecdsa_521.key"))
msg = key.sign_ssh_data(b"ice weasels")
@@ -487,7 +486,7 @@ class KeyTest(unittest.TestCase):
def test_ed25519_nonbytes_password(self):
# https://github.com/paramiko/paramiko/issues/1039
- key = Ed25519Key.from_private_key_file(
+ Ed25519Key.from_private_key_file(
_support("test_ed25519_password.key"),
# NOTE: not a bytes. Amusingly, the test above for same key DOES
# explicitly cast to bytes...code smell!
@@ -536,7 +535,7 @@ class KeyTest(unittest.TestCase):
# Delve into blob contents, for test purposes
msg = Message(key.public_blob.key_blob)
self.assertEqual(msg.get_text(), "ssh-rsa-cert-v01@openssh.com")
- nonce = msg.get_string()
+ msg.get_string()
e = msg.get_mpint()
n = msg.get_mpint()
self.assertEqual(e, key.public_numbers.e)
diff --git a/tests/test_sftp.py b/tests/test_sftp.py
index 87c57340..e4e18e5a 100644
--- a/tests/test_sftp.py
+++ b/tests/test_sftp.py
@@ -26,23 +26,18 @@ do test file operations in (so no existing files will be harmed).
import os
import socket
import sys
-import threading
-import unittest
import warnings
from binascii import hexlify
from tempfile import mkstemp
import pytest
-import paramiko
-import paramiko.util
from paramiko.py3compat import PY2, b, u, StringIO
from paramiko.common import o777, o600, o666, o644
from paramiko.sftp_attr import SFTPAttributes
from .util import needs_builtin
-from .stub_sftp import StubServer, StubSFTPServer
-from .util import _support, slow
+from .util import slow
ARTICLE = """
@@ -74,14 +69,21 @@ decreased compared with chicken.
# Here is how unicode characters are encoded over 1 to 6 bytes in utf-8
-# U-00000000 - U-0000007F: 0xxxxxxx
-# U-00000080 - U-000007FF: 110xxxxx 10xxxxxx
-# U-00000800 - U-0000FFFF: 1110xxxx 10xxxxxx 10xxxxxx
-# U-00010000 - U-001FFFFF: 11110xxx 10xxxxxx 10xxxxxx 10xxxxxx
-# U-00200000 - U-03FFFFFF: 111110xx 10xxxxxx 10xxxxxx 10xxxxxx 10xxxxxx
-# U-04000000 - U-7FFFFFFF: 1111110x 10xxxxxx 10xxxxxx 10xxxxxx 10xxxxxx 10xxxxxx
+# U-00000000 - U-0000007F:
+# 0xxxxxxx
+# U-00000080 - U-000007FF:
+# 110xxxxx 10xxxxxx
+# U-00000800 - U-0000FFFF:
+# 1110xxxx 10xxxxxx 10xxxxxx
+# U-00010000 - U-001FFFFF:
+# 11110xxx 10xxxxxx 10xxxxxx 10xxxxxx
+# U-00200000 - U-03FFFFFF:
+# 111110xx 10xxxxxx 10xxxxxx 10xxxxxx 10xxxxxx
+# U-04000000 - U-7FFFFFFF:
+# 1111110x 10xxxxxx 10xxxxxx 10xxxxxx 10xxxxxx 10xxxxxx
# Note that: hex(int('11000011',2)) == '0xc3'
-# Thus, the following 2-bytes sequence is not valid utf8: "invalid continuation byte"
+# Thus, the following 2-bytes sequence is not valid utf8: "invalid continuation
+# byte"
NON_UTF8_DATA = b"\xC3\xC3"
unicode_folder = u"\u00fcnic\u00f8de" if PY2 else "\u00fcnic\u00f8de"
@@ -90,7 +92,7 @@ utf8_folder = b"/\xc3\xbcnic\xc3\xb8\x64\x65"
@slow
class TestSFTP(object):
- def test_1_file(self, sftp):
+ def test_file(self, sftp):
"""
verify that we can create a file.
"""
@@ -101,7 +103,7 @@ class TestSFTP(object):
f.close()
sftp.remove(sftp.FOLDER + "/test")
- def test_2_close(self, sftp):
+ def test_close(self, sftp):
"""
Verify that SFTP session close() causes a socket error on next action.
"""
@@ -109,7 +111,7 @@ class TestSFTP(object):
with pytest.raises(socket.error, match="Socket is closed"):
sftp.open(sftp.FOLDER + "/test2", "w")
- def test_2_sftp_can_be_used_as_context_manager(self, sftp):
+ def test_sftp_can_be_used_as_context_manager(self, sftp):
"""
verify that the sftp session is closed when exiting the context manager
"""
@@ -118,7 +120,7 @@ class TestSFTP(object):
with pytest.raises(socket.error, match="Socket is closed"):
sftp.open(sftp.FOLDER + "/test2", "w")
- def test_3_write(self, sftp):
+ def test_write(self, sftp):
"""
verify that a file can be created and written, and the size is correct.
"""
@@ -129,7 +131,7 @@ class TestSFTP(object):
finally:
sftp.remove(sftp.FOLDER + "/duck.txt")
- def test_3_sftp_file_can_be_used_as_context_manager(self, sftp):
+ def test_sftp_file_can_be_used_as_context_manager(self, sftp):
"""
verify that an opened file can be used as a context manager
"""
@@ -140,7 +142,7 @@ class TestSFTP(object):
finally:
sftp.remove(sftp.FOLDER + "/duck.txt")
- def test_4_append(self, sftp):
+ def test_append(self, sftp):
"""
verify that a file can be opened for append, and tell() still works.
"""
@@ -158,7 +160,7 @@ class TestSFTP(object):
finally:
sftp.remove(sftp.FOLDER + "/append.txt")
- def test_5_rename(self, sftp):
+ def test_rename(self, sftp):
"""
verify that renaming a file works.
"""
@@ -185,7 +187,7 @@ class TestSFTP(object):
except:
pass
- def test_5a_posix_rename(self, sftp):
+ def testa_posix_rename(self, sftp):
"""Test posix-rename@openssh.com protocol extension."""
try:
# first check that the normal rename works as specified
@@ -194,20 +196,15 @@ class TestSFTP(object):
sftp.rename(sftp.FOLDER + "/a", sftp.FOLDER + "/b")
with sftp.open(sftp.FOLDER + "/a", "w") as f:
f.write("two")
- try:
+ with pytest.raises(IOError): # actual message seems generic
sftp.rename(sftp.FOLDER + "/a", sftp.FOLDER + "/b")
- self.assertTrue(
- False, "no exception when rename-ing onto existing file"
- )
- except (OSError, IOError):
- pass
# now check with the posix_rename
sftp.posix_rename(sftp.FOLDER + "/a", sftp.FOLDER + "/b")
with sftp.open(sftp.FOLDER + "/b", "r") as f:
data = u(f.read())
err = "Contents of renamed file not the same as original file"
- assert data == "two", err
+ assert "two" == data, err
finally:
try:
@@ -219,7 +216,7 @@ class TestSFTP(object):
except:
pass
- def test_6_folder(self, sftp):
+ def test_folder(self, sftp):
"""
create a temporary folder, verify that we can create a file in it, then
remove the folder and verify that we can't create a file in it anymore.
@@ -232,7 +229,7 @@ class TestSFTP(object):
with pytest.raises(IOError, match="No such file"):
sftp.open(sftp.FOLDER + "/subfolder/test")
- def test_7_listdir(self, sftp):
+ def test_listdir(self, sftp):
"""
verify that a folder can be created, a bunch of files can be placed in
it, and those files show up in sftp.listdir.
@@ -253,7 +250,7 @@ class TestSFTP(object):
sftp.remove(sftp.FOLDER + "/fish.txt")
sftp.remove(sftp.FOLDER + "/tertiary.py")
- def test_7_5_listdir_iter(self, sftp):
+ def test_listdir_iter(self, sftp):
"""
listdir_iter version of above test
"""
@@ -273,7 +270,7 @@ class TestSFTP(object):
sftp.remove(sftp.FOLDER + "/fish.txt")
sftp.remove(sftp.FOLDER + "/tertiary.py")
- def test_8_setstat(self, sftp):
+ def test_setstat(self, sftp):
"""
verify that the setstat functions (chown, chmod, utime, truncate) work.
"""
@@ -310,7 +307,7 @@ class TestSFTP(object):
finally:
sftp.remove(sftp.FOLDER + "/special")
- def test_9_fsetstat(self, sftp):
+ def test_fsetstat(self, sftp):
"""
verify that the fsetstat functions (chown, chmod, utime, truncate)
work on open files.
@@ -350,12 +347,12 @@ class TestSFTP(object):
finally:
sftp.remove(sftp.FOLDER + "/special")
- def test_A_readline_seek(self, sftp):
+ def test_readline_seek(self, sftp):
"""
- create a text file and write a bunch of text into it. then count the lines
- in the file, and seek around to retrieve particular lines. this should
- verify that read buffering and 'tell' work well together, and that read
- buffering is reset on 'seek'.
+ create a text file and write a bunch of text into it. then count the
+ lines in the file, and seek around to retrieve particular lines. this
+ should verify that read buffering and 'tell' work well together, and
+ that read buffering is reset on 'seek'.
"""
try:
with sftp.open(sftp.FOLDER + "/duck.txt", "w") as f:
@@ -375,17 +372,14 @@ class TestSFTP(object):
f.seek(pos_list[17], f.SEEK_SET)
assert f.readline()[:4] == "duck"
f.seek(pos_list[10], f.SEEK_SET)
- assert (
- f.readline()
- == "duck types were equally resistant to exogenous insulin compared with chicken.\n"
- )
+ expected = "duck types were equally resistant to exogenous insulin compared with chicken.\n" # noqa
+ assert f.readline() == expected
finally:
sftp.remove(sftp.FOLDER + "/duck.txt")
- def test_B_write_seek(self, sftp):
+ def test_write_seek(self, sftp):
"""
- create a text file, seek back and change part of it, and verify that the
- changes worked.
+ Create a text file, seek back, change it, and verify.
"""
try:
with sftp.open(sftp.FOLDER + "/testing.txt", "w") as f:
@@ -400,7 +394,7 @@ class TestSFTP(object):
finally:
sftp.remove(sftp.FOLDER + "/testing.txt")
- def test_C_symlink(self, sftp):
+ def test_symlink(self, sftp):
"""
create a symlink and then check that lstat doesn't follow it.
"""
@@ -447,7 +441,7 @@ class TestSFTP(object):
except:
pass
- def test_D_flush_seek(self, sftp):
+ def test_flush_seek(self, sftp):
"""
verify that buffered writes are automatically flushed on seek.
"""
@@ -467,7 +461,7 @@ class TestSFTP(object):
except:
pass
- def test_E_realpath(self, sftp):
+ def test_realpath(self, sftp):
"""
test that realpath is returning something non-empty and not an
error.
@@ -478,7 +472,7 @@ class TestSFTP(object):
assert len(f) > 0
assert os.path.join(pwd, sftp.FOLDER) == f
- def test_F_mkdir(self, sftp):
+ def test_mkdir(self, sftp):
"""
verify that mkdir/rmdir work.
"""
@@ -489,7 +483,7 @@ class TestSFTP(object):
with pytest.raises(IOError, match="No such file"):
sftp.rmdir(sftp.FOLDER + "/subfolder")
- def test_G_chdir(self, sftp):
+ def test_chdir(self, sftp):
"""
verify that chdir/getcwd work.
"""
@@ -525,7 +519,7 @@ class TestSFTP(object):
except:
pass
- def test_H_get_put(self, sftp):
+ def test_get_put(self, sftp):
"""
verify that get/put work.
"""
@@ -560,7 +554,7 @@ class TestSFTP(object):
os.unlink(localname)
sftp.unlink(sftp.FOLDER + "/bunny.txt")
- def test_I_check(self, sftp):
+ def test_check(self, sftp):
"""
verify that file.check() works against our own server.
(it's an sftp extension that we support, and may be the only ones who
@@ -582,14 +576,12 @@ class TestSFTP(object):
== u(hexlify(sum)).upper()
)
sum = f.check("md5", 0, 0, 510)
- assert (
- u(hexlify(sum)).upper()
- == "EB3B45B8CD55A0707D99B177544A319F373183D241432BB2157AB9E46358C4AC90370B5CADE5D90336FC1716F90B36D6"
- ) # noqa
+ expected = "EB3B45B8CD55A0707D99B177544A319F373183D241432BB2157AB9E46358C4AC90370B5CADE5D90336FC1716F90B36D6" # noqa
+ assert u(hexlify(sum)).upper() == expected
finally:
sftp.unlink(sftp.FOLDER + "/kitty.txt")
- def test_J_x_flag(self, sftp):
+ def test_x_flag(self, sftp):
"""
verify that the 'x' flag works when opening a file.
"""
@@ -604,7 +596,7 @@ class TestSFTP(object):
finally:
sftp.unlink(sftp.FOLDER + "/unusual.txt")
- def test_K_utf8(self, sftp):
+ def test_utf8(self, sftp):
"""
verify that unicode strings are encoded into utf8 correctly.
"""
@@ -620,7 +612,7 @@ class TestSFTP(object):
self.fail("exception " + str(e))
sftp.unlink(b(sftp.FOLDER) + utf8_folder)
- def test_L_utf8_chdir(self, sftp):
+ def test_utf8_chdir(self, sftp):
sftp.mkdir(sftp.FOLDER + "/" + unicode_folder)
try:
sftp.chdir(sftp.FOLDER + "/" + unicode_folder)
@@ -631,7 +623,7 @@ class TestSFTP(object):
sftp.chdir()
sftp.rmdir(sftp.FOLDER + "/" + unicode_folder)
- def test_M_bad_readv(self, sftp):
+ def test_bad_readv(self, sftp):
"""
verify that readv at the end of the file doesn't essplode.
"""
@@ -647,7 +639,7 @@ class TestSFTP(object):
finally:
sftp.unlink(sftp.FOLDER + "/zero")
- def test_N_put_without_confirm(self, sftp):
+ def test_put_without_confirm(self, sftp):
"""
verify that get/put work without confirmation.
"""
@@ -676,11 +668,11 @@ class TestSFTP(object):
os.unlink(localname)
sftp.unlink(sftp.FOLDER + "/bunny.txt")
- def test_O_getcwd(self, sftp):
+ def test_getcwd(self, sftp):
"""
verify that chdir/getcwd work.
"""
- assert sftp.getcwd() == None
+ assert sftp.getcwd() is None
root = sftp.normalize(".")
if root[-1] != "/":
root += "/"
@@ -695,7 +687,7 @@ class TestSFTP(object):
except:
pass
- def XXX_test_M_seek_append(self, sftp):
+ def test_seek_append(self, sftp):
"""
verify that seek does't affect writes during append.
@@ -733,7 +725,7 @@ class TestSFTP(object):
# appear but they're clearly emitted from subthreads that have no error
# handling. No point running it until that is fixed somehow.
@pytest.mark.skip("Doesn't prove anything right now")
- def test_N_file_with_percent(self, sftp):
+ def test_file_with_percent(self, sftp):
"""
verify that we can create a file with a '%' in the filename.
( it needs to be properly escaped by _log() )
@@ -745,7 +737,7 @@ class TestSFTP(object):
f.close()
sftp.remove(sftp.FOLDER + "/test%file")
- def test_O_non_utf8_data(self, sftp):
+ def test_non_utf8_data(self, sftp):
"""Test write() and read() of non utf8 data"""
try:
with sftp.open("%s/nonutf8data" % sftp.FOLDER, "w") as f:
@@ -775,7 +767,7 @@ class TestSFTP(object):
try:
with sftp.open("%s/write_buffer" % sftp.FOLDER, "wb") as f:
for offset in range(0, len(data), 8):
- f.write(buffer(data, offset, 8))
+ f.write(buffer(data, offset, 8)) # noqa
with sftp.open("%s/write_buffer" % sftp.FOLDER, "rb") as f:
assert f.read() == data
diff --git a/tests/test_sftp_big.py b/tests/test_sftp_big.py
index 9df566e8..fc556faf 100644
--- a/tests/test_sftp_big.py
+++ b/tests/test_sftp_big.py
@@ -23,12 +23,10 @@ a real actual sftp server is contacted, and a new folder is created there to
do test file operations in (so no existing files will be harmed).
"""
-import os
import random
import struct
import sys
import time
-import unittest
from paramiko.common import o660
@@ -37,7 +35,7 @@ from .util import slow
@slow
class TestBigSFTP(object):
- def test_1_lots_of_files(self, sftp):
+ def test_lots_of_files(self, sftp):
"""
create a bunch of files over the same session.
"""
@@ -65,7 +63,7 @@ class TestBigSFTP(object):
except:
pass
- def test_2_big_file(self, sftp):
+ def test_big_file(self, sftp):
"""
write a 1MB file with no buffering.
"""
@@ -96,7 +94,7 @@ class TestBigSFTP(object):
finally:
sftp.remove("%s/hongry.txt" % sftp.FOLDER)
- def test_3_big_file_pipelined(self, sftp):
+ def test_big_file_pipelined(self, sftp):
"""
write a 1MB file, with no linefeeds, using pipelining.
"""
@@ -122,7 +120,8 @@ class TestBigSFTP(object):
file_size = f.stat().st_size
f.prefetch(file_size)
- # read on odd boundaries to make sure the bytes aren't getting scrambled
+ # read on odd boundaries to make sure the bytes aren't getting
+ # scrambled
n = 0
k2blob = kblob + kblob
chunk = 629
@@ -140,7 +139,7 @@ class TestBigSFTP(object):
finally:
sftp.remove("%s/hongry.txt" % sftp.FOLDER)
- def test_4_prefetch_seek(self, sftp):
+ def test_prefetch_seek(self, sftp):
kblob = bytes().join([struct.pack(">H", n) for n in range(512)])
try:
with sftp.open("%s/hongry.txt" % sftp.FOLDER, "wb") as f:
@@ -180,7 +179,7 @@ class TestBigSFTP(object):
finally:
sftp.remove("%s/hongry.txt" % sftp.FOLDER)
- def test_5_readv_seek(self, sftp):
+ def test_readv_seek(self, sftp):
kblob = bytes().join([struct.pack(">H", n) for n in range(512)])
try:
with sftp.open("%s/hongry.txt" % sftp.FOLDER, "wb") as f:
@@ -220,7 +219,7 @@ class TestBigSFTP(object):
finally:
sftp.remove("%s/hongry.txt" % sftp.FOLDER)
- def test_6_lots_of_prefetching(self, sftp):
+ def test_lots_of_prefetching(self, sftp):
"""
prefetch a 1MB file a bunch of times, discarding the file object
without using it, to verify that paramiko doesn't get confused.
@@ -255,7 +254,7 @@ class TestBigSFTP(object):
finally:
sftp.remove("%s/hongry.txt" % sftp.FOLDER)
- def test_7_prefetch_readv(self, sftp):
+ def test_prefetch_readv(self, sftp):
"""
verify that prefetch and readv don't conflict with each other.
"""
@@ -296,7 +295,7 @@ class TestBigSFTP(object):
finally:
sftp.remove("%s/hongry.txt" % sftp.FOLDER)
- def test_8_large_readv(self, sftp):
+ def test_large_readv(self, sftp):
"""
verify that a very large readv is broken up correctly and still
returned as a single blob.
@@ -325,7 +324,7 @@ class TestBigSFTP(object):
finally:
sftp.remove("%s/hongry.txt" % sftp.FOLDER)
- def test_9_big_file_big_buffer(self, sftp):
+ def test_big_file_big_buffer(self, sftp):
"""
write a 1MB file, with no linefeeds, and a big buffer.
"""
@@ -342,7 +341,7 @@ class TestBigSFTP(object):
finally:
sftp.remove("%s/hongry.txt" % sftp.FOLDER)
- def test_A_big_file_renegotiate(self, sftp):
+ def test_big_file_renegotiate(self, sftp):
"""
write a 1MB file, forcing key renegotiation in the middle.
"""
diff --git a/tests/test_ssh_gss.py b/tests/test_ssh_gss.py
index b6b50152..d8c151f9 100644
--- a/tests/test_ssh_gss.py
+++ b/tests/test_ssh_gss.py
@@ -139,16 +139,16 @@ class GSSAuthTest(unittest.TestCase):
stdout.close()
stderr.close()
- def test_1_gss_auth(self):
+ def test_gss_auth(self):
"""
Verify that Paramiko can handle SSHv2 GSS-API / SSPI authentication
(gssapi-with-mic) in client and server mode.
"""
self._test_connection(allow_agent=False, look_for_keys=False)
- def test_2_auth_trickledown(self):
+ def test_auth_trickledown(self):
"""
- Failed gssapi-with-mic auth doesn't prevent subsequent key auth from succeeding
+ Failed gssapi-with-mic doesn't prevent subsequent key from succeeding
"""
self.hostname = (
"this_host_does_not_exists_and_causes_a_GSSAPI-exception"
diff --git a/tests/test_transport.py b/tests/test_transport.py
index 2b8ee3bc..ad267e28 100644
--- a/tests/test_transport.py
+++ b/tests/test_transport.py
@@ -28,36 +28,32 @@ import socket
import time
import threading
import random
-from hashlib import sha1
import unittest
from mock import Mock
from paramiko import (
- Transport,
- SecurityOptions,
- ServerInterface,
- RSAKey,
- DSSKey,
- SSHException,
+ AuthHandler,
ChannelException,
+ DSSKey,
Packetizer,
- Channel,
- AuthHandler,
+ RSAKey,
+ SSHException,
+ SecurityOptions,
+ ServerInterface,
+ Transport,
)
from paramiko import AUTH_FAILED, AUTH_SUCCESSFUL
from paramiko import OPEN_SUCCEEDED, OPEN_FAILED_ADMINISTRATIVELY_PROHIBITED
from paramiko.common import (
- MSG_KEXINIT,
- cMSG_CHANNEL_WINDOW_ADJUST,
- cMSG_UNIMPLEMENTED,
+ DEFAULT_MAX_PACKET_SIZE,
+ DEFAULT_WINDOW_SIZE,
+ MAX_WINDOW_SIZE,
MIN_PACKET_SIZE,
MIN_WINDOW_SIZE,
- MAX_WINDOW_SIZE,
- DEFAULT_WINDOW_SIZE,
- DEFAULT_MAX_PACKET_SIZE,
- MSG_NAMES,
- MSG_UNIMPLEMENTED,
+ MSG_KEXINIT,
MSG_USERAUTH_SUCCESS,
+ cMSG_CHANNEL_WINDOW_ADJUST,
+ cMSG_UNIMPLEMENTED,
)
from paramiko.py3compat import bytes, byte_chr
from paramiko.message import Message
@@ -185,7 +181,7 @@ class TransportTest(unittest.TestCase):
self.assertTrue(event.is_set())
self.assertTrue(self.ts.is_active())
- def test_1_security_options(self):
+ def test_security_options(self):
o = self.tc.get_security_options()
self.assertEqual(type(o), SecurityOptions)
self.assertTrue(("aes256-cbc", "blowfish-cbc") != o.ciphers)
@@ -202,7 +198,7 @@ class TransportTest(unittest.TestCase):
except TypeError:
pass
- def test_1b_security_options_reset(self):
+ def testb_security_options_reset(self):
o = self.tc.get_security_options()
# should not throw any exceptions
o.ciphers = o.ciphers
@@ -211,17 +207,17 @@ class TransportTest(unittest.TestCase):
o.kex = o.kex
o.compression = o.compression
- def test_2_compute_key(self):
- self.tc.K = 123281095979686581523377256114209720774539068973101330872763622971399429481072519713536292772709507296759612401802191955568143056534122385270077606457721553469730659233569339356140085284052436697480759510519672848743794433460113118986816826624865291116513647975790797391795651716378444844877749505443714557929
- self.tc.H = b"\x0C\x83\x07\xCD\xE6\x85\x6F\xF3\x0B\xA9\x36\x84\xEB\x0F\x04\xC2\x52\x0E\x9E\xD3"
+ def test_compute_key(self):
+ self.tc.K = 123281095979686581523377256114209720774539068973101330872763622971399429481072519713536292772709507296759612401802191955568143056534122385270077606457721553469730659233569339356140085284052436697480759510519672848743794433460113118986816826624865291116513647975790797391795651716378444844877749505443714557929 # noqa
+ self.tc.H = b"\x0C\x83\x07\xCD\xE6\x85\x6F\xF3\x0B\xA9\x36\x84\xEB\x0F\x04\xC2\x52\x0E\x9E\xD3" # noqa
self.tc.session_id = self.tc.H
key = self.tc._compute_key("C", 32)
self.assertEqual(
- b"207E66594CA87C44ECCBA3B3CD39FDDB378E6FDB0F97C54B2AA0CFBF900CD995",
+ b"207E66594CA87C44ECCBA3B3CD39FDDB378E6FDB0F97C54B2AA0CFBF900CD995", # noqa
hexlify(key).upper(),
)
- def test_3_simple(self):
+ def test_simple(self):
"""
verify that we can establish an ssh link with ourselves across the
loopback sockets. this is hardly "simple" but it's simpler than the
@@ -249,7 +245,7 @@ class TransportTest(unittest.TestCase):
self.assertEqual(True, self.tc.is_authenticated())
self.assertEqual(True, self.ts.is_authenticated())
- def test_3a_long_banner(self):
+ def testa_long_banner(self):
"""
verify that a long banner doesn't mess up the handshake.
"""
@@ -268,7 +264,7 @@ class TransportTest(unittest.TestCase):
self.assertTrue(event.is_set())
self.assertTrue(self.ts.is_active())
- def test_4_special(self):
+ def test_special(self):
"""
verify that the client can demand odd handshake settings, and can
renegotiate keys in mid-stream.
@@ -289,7 +285,7 @@ class TransportTest(unittest.TestCase):
self.ts.send_ignore(1024)
@slow
- def test_5_keepalive(self):
+ def test_keepalive(self):
"""
verify that the keepalive will be sent.
"""
@@ -299,7 +295,7 @@ class TransportTest(unittest.TestCase):
time.sleep(2)
self.assertEqual("keepalive@lag.net", self.server._global_request)
- def test_6_exec_command(self):
+ def test_exec_command(self):
"""
verify that exec_command() does something reasonable.
"""
@@ -343,7 +339,7 @@ class TransportTest(unittest.TestCase):
self.assertEqual("This is on stderr.\n", f.readline())
self.assertEqual("", f.readline())
- def test_6a_channel_can_be_used_as_context_manager(self):
+ def testa_channel_can_be_used_as_context_manager(self):
"""
verify that exec_command() does something reasonable.
"""
@@ -359,7 +355,7 @@ class TransportTest(unittest.TestCase):
self.assertEqual("Hello there.\n", f.readline())
self.assertEqual("", f.readline())
- def test_7_invoke_shell(self):
+ def test_invoke_shell(self):
"""
verify that invoke_shell() does something reasonable.
"""
@@ -373,18 +369,18 @@ class TransportTest(unittest.TestCase):
chan.close()
self.assertEqual("", f.readline())
- def test_8_channel_exception(self):
+ def test_channel_exception(self):
"""
verify that ChannelException is thrown for a bad open-channel request.
"""
self.setup_test_server()
try:
- chan = self.tc.open_channel("bogus")
+ self.tc.open_channel("bogus")
self.fail("expected exception")
except ChannelException as e:
self.assertTrue(e.code == OPEN_FAILED_ADMINISTRATIVELY_PROHIBITED)
- def test_9_exit_status(self):
+ def test_exit_status(self):
"""
verify that get_exit_status() works.
"""
@@ -413,7 +409,7 @@ class TransportTest(unittest.TestCase):
self.assertEqual(23, chan.recv_exit_status())
chan.close()
- def test_A_select(self):
+ def test_select(self):
"""
verify that select() on a channel works.
"""
@@ -468,7 +464,7 @@ class TransportTest(unittest.TestCase):
# ...and now is closed.
self.assertEqual(True, p._closed)
- def test_B_renegotiate(self):
+ def test_renegotiate(self):
"""
verify that a transport can correctly renegotiate mid-stream.
"""
@@ -492,7 +488,7 @@ class TransportTest(unittest.TestCase):
schan.close()
- def test_C_compression(self):
+ def test_compression(self):
"""
verify that zlib compression is basically working.
"""
@@ -510,14 +506,15 @@ class TransportTest(unittest.TestCase):
bytes2 = self.tc.packetizer._Packetizer__sent_bytes
block_size = self.tc._cipher_info[self.tc.local_cipher]["block-size"]
mac_size = self.tc._mac_info[self.tc.local_mac]["size"]
- # tests show this is actually compressed to *52 bytes*! including packet overhead! nice!! :)
+ # tests show this is actually compressed to *52 bytes*! including
+ # packet overhead! nice!! :)
self.assertTrue(bytes2 - bytes < 1024)
self.assertEqual(16 + block_size + mac_size, bytes2 - bytes)
chan.close()
schan.close()
- def test_D_x11(self):
+ def test_x11(self):
"""
verify that an x11 port can be requested and opened.
"""
@@ -555,7 +552,7 @@ class TransportTest(unittest.TestCase):
chan.close()
schan.close()
- def test_E_reverse_port_forwarding(self):
+ def test_reverse_port_forwarding(self):
"""
verify that a client can ask the server to open a reverse port for
forwarding.
@@ -563,7 +560,7 @@ class TransportTest(unittest.TestCase):
self.setup_test_server()
chan = self.tc.open_session()
chan.exec_command("yes")
- schan = self.ts.accept(1.0)
+ self.ts.accept(1.0)
requested = []
@@ -594,7 +591,7 @@ class TransportTest(unittest.TestCase):
self.tc.cancel_port_forward("127.0.0.1", port)
self.assertTrue(self.server._listen is None)
- def test_F_port_forwarding(self):
+ def test_port_forwarding(self):
"""
verify that a client can forward new connections from a locally-
forwarded port.
@@ -602,7 +599,7 @@ class TransportTest(unittest.TestCase):
self.setup_test_server()
chan = self.tc.open_session()
chan.exec_command("yes")
- schan = self.ts.accept(1.0)
+ self.ts.accept(1.0)
# open a port on the "server" that the client will ask to forward to.
greeting_server = socket.socket()
@@ -626,7 +623,7 @@ class TransportTest(unittest.TestCase):
self.assertEqual(b"Hello!\n", cs.recv(7))
cs.close()
- def test_G_stderr_select(self):
+ def test_stderr_select(self):
"""
verify that select() on a channel works even if only stderr is
receiving data.
@@ -665,7 +662,7 @@ class TransportTest(unittest.TestCase):
schan.close()
chan.close()
- def test_H_send_ready(self):
+ def test_send_ready(self):
"""
verify that send_ready() indicates when a send would not block.
"""
@@ -689,9 +686,10 @@ class TransportTest(unittest.TestCase):
chan.close()
self.assertEqual(chan.send_ready(), True)
- def test_I_rekey_deadlock(self):
+ def test_rekey_deadlock(self):
"""
- Regression test for deadlock when in-transit messages are received after MSG_KEXINIT is sent
+ Regression test for deadlock when in-transit messages are received
+ after MSG_KEXINIT is sent
Note: When this test fails, it may leak threads.
"""
@@ -714,12 +712,15 @@ class TransportTest(unittest.TestCase):
# MSG_KEXINIT to the remote host.
#
# On the remote host (using any SSH implementation):
- # 5. The MSG_CHANNEL_DATA is received, and MSG_CHANNEL_WINDOW_ADJUST is sent.
- # 6. The MSG_KEXINIT is received, and a corresponding MSG_KEXINIT is sent.
+ # 5. The MSG_CHANNEL_DATA is received, and MSG_CHANNEL_WINDOW_ADJUST
+ # is sent.
+ # 6. The MSG_KEXINIT is received, and a corresponding MSG_KEXINIT is
+ # sent.
#
# In the main thread:
# 7. The user's program calls Channel.send().
- # 8. Channel.send acquires Channel.lock, then calls Transport._send_user_message().
+ # 8. Channel.send acquires Channel.lock, then calls
+ # Transport._send_user_message().
# 9. Transport._send_user_message waits for Transport.clear_to_send
# to be set (i.e., it waits for re-keying to complete).
# Channel.lock is still held.
@@ -854,7 +855,7 @@ class TransportTest(unittest.TestCase):
schan.close()
chan.close()
- def test_J_sanitze_packet_size(self):
+ def test_sanitze_packet_size(self):
"""
verify that we conform to the rfc of packet and window sizes.
"""
@@ -865,7 +866,7 @@ class TransportTest(unittest.TestCase):
]:
self.assertEqual(self.tc._sanitize_packet_size(val), correct)
- def test_K_sanitze_window_size(self):
+ def test_sanitze_window_size(self):
"""
verify that we conform to the rfc of packet and window sizes.
"""
@@ -877,7 +878,7 @@ class TransportTest(unittest.TestCase):
self.assertEqual(self.tc._sanitize_window_size(val), correct)
@slow
- def test_L_handshake_timeout(self):
+ def test_handshake_timeout(self):
"""
verify that we can get a hanshake timeout.
"""
@@ -914,7 +915,7 @@ class TransportTest(unittest.TestCase):
password="pygmalion",
)
- def test_M_select_after_close(self):
+ def test_select_after_close(self):
"""
verify that select works when a channel is already closed.
"""
@@ -969,11 +970,11 @@ class TransportTest(unittest.TestCase):
# send() accepts buffer instances
sent = 0
while sent < len(data):
- sent += chan.send(buffer(data, sent, 8))
+ sent += chan.send(buffer(data, sent, 8)) # noqa
self.assertEqual(sfile.read(len(data)), data)
# sendall() accepts a buffer instance
- chan.sendall(buffer(data))
+ chan.sendall(buffer(data)) # noqa
self.assertEqual(sfile.read(len(data)), data)
@needs_builtin("memoryview")
diff --git a/tests/test_util.py b/tests/test_util.py
index 36d69de7..465e75b8 100644
--- a/tests/test_util.py
+++ b/tests/test_util.py
@@ -26,9 +26,11 @@ import os
from hashlib import sha1
import unittest
+import paramiko
import paramiko.util
+from paramiko import SSHConfig
from paramiko.util import lookup_ssh_host_config as host_config, safe_string
-from paramiko.py3compat import StringIO, byte_ord, b
+from paramiko.py3compat import StringIO, byte_ord
# Note some lines in this configuration have trailing spaces on purpose
@@ -62,16 +64,12 @@ BGQ3GQ/Fc7SX6gkpXkwcZryoi4kNFhHu5LvHcZPdxXV1D+uTMfGS1eyd2Yz/DoNWXNAl8TI0cAsW\
"""
-# for test 1:
-from paramiko import *
-
-
class UtilTest(unittest.TestCase):
def test_import(self):
"""
verify that all the classes can be imported from paramiko.
"""
- symbols = list(globals().keys())
+ symbols = paramiko.__all__
self.assertTrue("Transport" in symbols)
self.assertTrue("SSHClient" in symbols)
self.assertTrue("MissingHostKeyPolicy" in symbols)
@@ -172,7 +170,7 @@ class UtilTest(unittest.TestCase):
hex = "".join(["%02x" % byte_ord(c) for c in x])
self.assertEqual(
hex,
- "9110e2f6793b69363e58173e9436b13a5a4b339005741d5c680e505f57d871347b4239f14fb5c46e857d5e100424873ba849ac699cea98d729e57b3e84378e8b",
+ "9110e2f6793b69363e58173e9436b13a5a4b339005741d5c680e505f57d871347b4239f14fb5c46e857d5e100424873ba849ac699cea98d729e57b3e84378e8b", # noqa
)
def test_host_keys(self):
@@ -419,8 +417,7 @@ IdentityFile something_%l_using_fqdn
f = StringIO(test_config_file)
config = paramiko.util.parse_ssh_config(f)
self.assertEqual(
- config.get_hostnames(),
- set(["*", "*.example.com", "spoo.example.com"]),
+ config.get_hostnames(), {"*", "*.example.com", "spoo.example.com"}
)
def test_quoted_host_names(self):
@@ -508,12 +505,12 @@ Host param3 parara
self.assertRaises(Exception, conf._get_hosts, host)
def test_safe_string(self):
- vanilla = b("vanilla")
- has_bytes = b("has \7\3 bytes")
+ vanilla = b"vanilla"
+ has_bytes = b"has \7\3 bytes"
safe_vanilla = safe_string(vanilla)
safe_has_bytes = safe_string(has_bytes)
- expected_bytes = b("has %07%03 bytes")
- err = "{0!r} != {1!r}"
+ expected_bytes = b"has %07%03 bytes"
+ err = "{!r} != {!r}"
msg = err.format(safe_vanilla, vanilla)
assert safe_vanilla == vanilla, msg
msg = err.format(safe_has_bytes, expected_bytes)
diff --git a/tests/util.py b/tests/util.py
index c1ba4b2c..4ca02374 100644
--- a/tests/util.py
+++ b/tests/util.py
@@ -20,7 +20,7 @@ def needs_builtin(name):
"""
Skip decorated test if builtin name does not exist.
"""
- reason = "Test requires a builtin '{0}'".format(name)
+ reason = "Test requires a builtin '{}'".format(name)
return pytest.mark.skipif(not hasattr(builtins, name), reason=reason)