diff options
author | Matus Valo <matusvalo@users.noreply.github.com> | 2020-11-08 11:56:48 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-11-08 11:56:48 +0100 |
commit | b3f371333ff7df793628809e8abdbf43fd4391ba (patch) | |
tree | b813fcf01c340362a9f7f1b2bb0cffbfb01fc4a2 | |
parent | 8e5ddd41b776f31faea6a47fcc757b76c567b007 (diff) | |
download | py-amqp-b3f371333ff7df793628809e8abdbf43fd4391ba.tar.gz |
Reintroduce ca_certs and ciphers parameters of SSLTransport._wrap_socket_sni() (#344)
This fixes issue introduced in commit: 53d677754b4e820acf673711532c1a1dc8e57124
-rw-r--r-- | amqp/transport.py | 8 | ||||
-rw-r--r-- | t/unit/test_transport.py | 138 |
2 files changed, 136 insertions, 10 deletions
diff --git a/amqp/transport.py b/amqp/transport.py index 3eec88f..b183120 100644 --- a/amqp/transport.py +++ b/amqp/transport.py @@ -354,9 +354,9 @@ class SSLTransport(_AbstractTransport): def _wrap_socket_sni(self, sock, keyfile=None, certfile=None, server_side=False, cert_reqs=ssl.CERT_NONE, - do_handshake_on_connect=False, + ca_certs=None, do_handshake_on_connect=False, suppress_ragged_eofs=True, server_hostname=None, - ssl_version=ssl.PROTOCOL_TLS): + ciphers=None, ssl_version=ssl.PROTOCOL_TLS): """Socket wrap with SNI headers. stdlib `ssl.SSLContext.wrap_socket` method augmented with support for @@ -373,6 +373,10 @@ class SSLTransport(_AbstractTransport): context = ssl.SSLContext(ssl_version) if certfile is not None: context.load_cert_chain(certfile, keyfile) + if ca_certs is not None: + context.load_verify_locations(ca_certs) + if ciphers: + context.set_ciphers(ciphers) if cert_reqs != ssl.CERT_NONE: context.check_hostname = True # Set SNI headers if supported diff --git a/t/unit/test_transport.py b/t/unit/test_transport.py index d94a520..5eec1ab 100644 --- a/t/unit/test_transport.py +++ b/t/unit/test_transport.py @@ -4,7 +4,7 @@ import re import socket import struct from struct import pack -from unittest.mock import ANY, MagicMock, Mock, call, patch +from unittest.mock import ANY, MagicMock, Mock, call, patch, sentinel import pytest @@ -638,14 +638,136 @@ class test_SSLTransport: ctx.wrap_socket.assert_called_with(sock, f=1) def test_wrap_socket_sni(self): + # testing default values of _wrap_socket_sni() sock = Mock() - with patch('ssl.SSLContext.wrap_socket') as mock_ssl_wrap: - self.t._wrap_socket_sni(sock) - mock_ssl_wrap.assert_called_with(sock=sock, - server_side=False, - do_handshake_on_connect=False, - suppress_ragged_eofs=True, - server_hostname=None) + with patch( + 'ssl.SSLContext.wrap_socket', + return_value=sentinel.WRAPPED_SOCKET) as mock_ssl_wrap: + ret = self.t._wrap_socket_sni(sock) + + mock_ssl_wrap.assert_called_with(sock=sock, + server_side=False, + do_handshake_on_connect=False, + suppress_ragged_eofs=True, + server_hostname=None) + + assert ret == sentinel.WRAPPED_SOCKET + + def test_wrap_socket_sni_certfile(self): + # testing _wrap_socket_sni() with parameters certfile and keyfile + sock = Mock() + with patch( + 'ssl.SSLContext.wrap_socket', + return_value=sentinel.WRAPPED_SOCKET) as mock_ssl_wrap, \ + patch('ssl.SSLContext.load_cert_chain') as mock_load_cert_chain: + ret = self.t._wrap_socket_sni( + sock, keyfile=sentinel.KEYFILE, certfile=sentinel.CERTFILE) + + mock_load_cert_chain.assert_called_with( + sentinel.CERTFILE, sentinel.KEYFILE) + mock_ssl_wrap.assert_called_with(sock=sock, + server_side=False, + do_handshake_on_connect=False, + suppress_ragged_eofs=True, + server_hostname=None) + + assert ret == sentinel.WRAPPED_SOCKET + + def test_wrap_socket_ca_certs(self): + # testing _wrap_socket_sni() with parameter ca_certs + sock = Mock() + with patch( + 'ssl.SSLContext.wrap_socket', + return_value=sentinel.WRAPPED_SOCKET + ) as mock_ssl_wrap, patch( + 'ssl.SSLContext.load_verify_locations' + ) as mock_load_verify_locations: + ret = self.t._wrap_socket_sni(sock, ca_certs=sentinel.CA_CERTS) + + mock_load_verify_locations.assert_called_with(sentinel.CA_CERTS) + mock_ssl_wrap.assert_called_with(sock=sock, + server_side=False, + do_handshake_on_connect=False, + suppress_ragged_eofs=True, + server_hostname=None) + + assert ret == sentinel.WRAPPED_SOCKET + + def test_wrap_socket_ciphers(self): + # testing _wrap_socket_sni() with parameter ciphers + sock = Mock() + with patch( + 'ssl.SSLContext.wrap_socket', + return_value=sentinel.WRAPPED_SOCKET) as mock_ssl_wrap, \ + patch('ssl.SSLContext.set_ciphers') as mock_set_ciphers: + ret = self.t._wrap_socket_sni(sock, ciphers=sentinel.CIPHERS) + + mock_set_ciphers.assert_called_with(sentinel.CIPHERS) + mock_ssl_wrap.assert_called_with(sock=sock, + server_side=False, + do_handshake_on_connect=False, + suppress_ragged_eofs=True, + server_hostname=None) + assert ret == sentinel.WRAPPED_SOCKET + + def test_wrap_socket_sni_cert_reqs(self): + # testing _wrap_socket_sni() with parameter cert_reqs + sock = Mock() + with patch('ssl.SSLContext') as mock_ssl_context_class: + wrap_socket_method_mock = mock_ssl_context_class().wrap_socket + wrap_socket_method_mock.return_value = sentinel.WRAPPED_SOCKET + ret = self.t._wrap_socket_sni(sock, cert_reqs=sentinel.CERT_REQS) + + wrap_socket_method_mock.assert_called_with( + sock=sock, + server_side=False, + do_handshake_on_connect=False, + suppress_ragged_eofs=True, + server_hostname=None + ) + assert mock_ssl_context_class().check_hostname is True + assert ret == sentinel.WRAPPED_SOCKET + + def test_wrap_socket_sni_setting_sni_header(self): + # testing _wrap_socket_sni() with setting SNI header + sock = Mock() + with patch('ssl.SSLContext') as mock_ssl_context_class, \ + patch('ssl.HAS_SNI', new=True): + # SSL module supports SNI + wrap_socket_method_mock = mock_ssl_context_class().wrap_socket + wrap_socket_method_mock.return_value = sentinel.WRAPPED_SOCKET + ret = self.t._wrap_socket_sni( + sock, cert_reqs=sentinel.CERT_REQS, + server_hostname=sentinel.SERVER_HOSTNAME + ) + wrap_socket_method_mock.assert_called_with( + sock=sock, + server_side=False, + do_handshake_on_connect=False, + suppress_ragged_eofs=True, + server_hostname=sentinel.SERVER_HOSTNAME + ) + assert mock_ssl_context_class().verify_mode == sentinel.CERT_REQS + assert ret == sentinel.WRAPPED_SOCKET + + with patch('ssl.SSLContext') as mock_ssl_context_class, \ + patch('ssl.HAS_SNI', new=False): + # SSL module does not support SNI + wrap_socket_method_mock = mock_ssl_context_class().wrap_socket + wrap_socket_method_mock.return_value = sentinel.WRAPPED_SOCKET + ret = self.t._wrap_socket_sni( + sock, cert_reqs=sentinel.CERT_REQS, + server_hostname=sentinel.SERVER_HOSTNAME + ) + wrap_socket_method_mock.assert_called_with( + sock=sock, + server_side=False, + do_handshake_on_connect=False, + suppress_ragged_eofs=True, + server_hostname=sentinel.SERVER_HOSTNAME + ) + assert mock_ssl_context_class().verify_mode != sentinel.CERT_REQS + assert ret == sentinel.WRAPPED_SOCKET def test_shutdown_transport(self): self.t.sock = None |