summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatus Valo <matusvalo@users.noreply.github.com>2020-11-08 11:56:48 +0100
committerGitHub <noreply@github.com>2020-11-08 11:56:48 +0100
commitb3f371333ff7df793628809e8abdbf43fd4391ba (patch)
treeb813fcf01c340362a9f7f1b2bb0cffbfb01fc4a2
parent8e5ddd41b776f31faea6a47fcc757b76c567b007 (diff)
downloadpy-amqp-b3f371333ff7df793628809e8abdbf43fd4391ba.tar.gz
Reintroduce ca_certs and ciphers parameters of SSLTransport._wrap_socket_sni() (#344)
This fixes issue introduced in commit: 53d677754b4e820acf673711532c1a1dc8e57124
-rw-r--r--amqp/transport.py8
-rw-r--r--t/unit/test_transport.py138
2 files changed, 136 insertions, 10 deletions
diff --git a/amqp/transport.py b/amqp/transport.py
index 3eec88f..b183120 100644
--- a/amqp/transport.py
+++ b/amqp/transport.py
@@ -354,9 +354,9 @@ class SSLTransport(_AbstractTransport):
def _wrap_socket_sni(self, sock, keyfile=None, certfile=None,
server_side=False, cert_reqs=ssl.CERT_NONE,
- do_handshake_on_connect=False,
+ ca_certs=None, do_handshake_on_connect=False,
suppress_ragged_eofs=True, server_hostname=None,
- ssl_version=ssl.PROTOCOL_TLS):
+ ciphers=None, ssl_version=ssl.PROTOCOL_TLS):
"""Socket wrap with SNI headers.
stdlib `ssl.SSLContext.wrap_socket` method augmented with support for
@@ -373,6 +373,10 @@ class SSLTransport(_AbstractTransport):
context = ssl.SSLContext(ssl_version)
if certfile is not None:
context.load_cert_chain(certfile, keyfile)
+ if ca_certs is not None:
+ context.load_verify_locations(ca_certs)
+ if ciphers:
+ context.set_ciphers(ciphers)
if cert_reqs != ssl.CERT_NONE:
context.check_hostname = True
# Set SNI headers if supported
diff --git a/t/unit/test_transport.py b/t/unit/test_transport.py
index d94a520..5eec1ab 100644
--- a/t/unit/test_transport.py
+++ b/t/unit/test_transport.py
@@ -4,7 +4,7 @@ import re
import socket
import struct
from struct import pack
-from unittest.mock import ANY, MagicMock, Mock, call, patch
+from unittest.mock import ANY, MagicMock, Mock, call, patch, sentinel
import pytest
@@ -638,14 +638,136 @@ class test_SSLTransport:
ctx.wrap_socket.assert_called_with(sock, f=1)
def test_wrap_socket_sni(self):
+ # testing default values of _wrap_socket_sni()
sock = Mock()
- with patch('ssl.SSLContext.wrap_socket') as mock_ssl_wrap:
- self.t._wrap_socket_sni(sock)
- mock_ssl_wrap.assert_called_with(sock=sock,
- server_side=False,
- do_handshake_on_connect=False,
- suppress_ragged_eofs=True,
- server_hostname=None)
+ with patch(
+ 'ssl.SSLContext.wrap_socket',
+ return_value=sentinel.WRAPPED_SOCKET) as mock_ssl_wrap:
+ ret = self.t._wrap_socket_sni(sock)
+
+ mock_ssl_wrap.assert_called_with(sock=sock,
+ server_side=False,
+ do_handshake_on_connect=False,
+ suppress_ragged_eofs=True,
+ server_hostname=None)
+
+ assert ret == sentinel.WRAPPED_SOCKET
+
+ def test_wrap_socket_sni_certfile(self):
+ # testing _wrap_socket_sni() with parameters certfile and keyfile
+ sock = Mock()
+ with patch(
+ 'ssl.SSLContext.wrap_socket',
+ return_value=sentinel.WRAPPED_SOCKET) as mock_ssl_wrap, \
+ patch('ssl.SSLContext.load_cert_chain') as mock_load_cert_chain:
+ ret = self.t._wrap_socket_sni(
+ sock, keyfile=sentinel.KEYFILE, certfile=sentinel.CERTFILE)
+
+ mock_load_cert_chain.assert_called_with(
+ sentinel.CERTFILE, sentinel.KEYFILE)
+ mock_ssl_wrap.assert_called_with(sock=sock,
+ server_side=False,
+ do_handshake_on_connect=False,
+ suppress_ragged_eofs=True,
+ server_hostname=None)
+
+ assert ret == sentinel.WRAPPED_SOCKET
+
+ def test_wrap_socket_ca_certs(self):
+ # testing _wrap_socket_sni() with parameter ca_certs
+ sock = Mock()
+ with patch(
+ 'ssl.SSLContext.wrap_socket',
+ return_value=sentinel.WRAPPED_SOCKET
+ ) as mock_ssl_wrap, patch(
+ 'ssl.SSLContext.load_verify_locations'
+ ) as mock_load_verify_locations:
+ ret = self.t._wrap_socket_sni(sock, ca_certs=sentinel.CA_CERTS)
+
+ mock_load_verify_locations.assert_called_with(sentinel.CA_CERTS)
+ mock_ssl_wrap.assert_called_with(sock=sock,
+ server_side=False,
+ do_handshake_on_connect=False,
+ suppress_ragged_eofs=True,
+ server_hostname=None)
+
+ assert ret == sentinel.WRAPPED_SOCKET
+
+ def test_wrap_socket_ciphers(self):
+ # testing _wrap_socket_sni() with parameter ciphers
+ sock = Mock()
+ with patch(
+ 'ssl.SSLContext.wrap_socket',
+ return_value=sentinel.WRAPPED_SOCKET) as mock_ssl_wrap, \
+ patch('ssl.SSLContext.set_ciphers') as mock_set_ciphers:
+ ret = self.t._wrap_socket_sni(sock, ciphers=sentinel.CIPHERS)
+
+ mock_set_ciphers.assert_called_with(sentinel.CIPHERS)
+ mock_ssl_wrap.assert_called_with(sock=sock,
+ server_side=False,
+ do_handshake_on_connect=False,
+ suppress_ragged_eofs=True,
+ server_hostname=None)
+ assert ret == sentinel.WRAPPED_SOCKET
+
+ def test_wrap_socket_sni_cert_reqs(self):
+ # testing _wrap_socket_sni() with parameter cert_reqs
+ sock = Mock()
+ with patch('ssl.SSLContext') as mock_ssl_context_class:
+ wrap_socket_method_mock = mock_ssl_context_class().wrap_socket
+ wrap_socket_method_mock.return_value = sentinel.WRAPPED_SOCKET
+ ret = self.t._wrap_socket_sni(sock, cert_reqs=sentinel.CERT_REQS)
+
+ wrap_socket_method_mock.assert_called_with(
+ sock=sock,
+ server_side=False,
+ do_handshake_on_connect=False,
+ suppress_ragged_eofs=True,
+ server_hostname=None
+ )
+ assert mock_ssl_context_class().check_hostname is True
+ assert ret == sentinel.WRAPPED_SOCKET
+
+ def test_wrap_socket_sni_setting_sni_header(self):
+ # testing _wrap_socket_sni() with setting SNI header
+ sock = Mock()
+ with patch('ssl.SSLContext') as mock_ssl_context_class, \
+ patch('ssl.HAS_SNI', new=True):
+ # SSL module supports SNI
+ wrap_socket_method_mock = mock_ssl_context_class().wrap_socket
+ wrap_socket_method_mock.return_value = sentinel.WRAPPED_SOCKET
+ ret = self.t._wrap_socket_sni(
+ sock, cert_reqs=sentinel.CERT_REQS,
+ server_hostname=sentinel.SERVER_HOSTNAME
+ )
+ wrap_socket_method_mock.assert_called_with(
+ sock=sock,
+ server_side=False,
+ do_handshake_on_connect=False,
+ suppress_ragged_eofs=True,
+ server_hostname=sentinel.SERVER_HOSTNAME
+ )
+ assert mock_ssl_context_class().verify_mode == sentinel.CERT_REQS
+ assert ret == sentinel.WRAPPED_SOCKET
+
+ with patch('ssl.SSLContext') as mock_ssl_context_class, \
+ patch('ssl.HAS_SNI', new=False):
+ # SSL module does not support SNI
+ wrap_socket_method_mock = mock_ssl_context_class().wrap_socket
+ wrap_socket_method_mock.return_value = sentinel.WRAPPED_SOCKET
+ ret = self.t._wrap_socket_sni(
+ sock, cert_reqs=sentinel.CERT_REQS,
+ server_hostname=sentinel.SERVER_HOSTNAME
+ )
+ wrap_socket_method_mock.assert_called_with(
+ sock=sock,
+ server_side=False,
+ do_handshake_on_connect=False,
+ suppress_ragged_eofs=True,
+ server_hostname=sentinel.SERVER_HOSTNAME
+ )
+ assert mock_ssl_context_class().verify_mode != sentinel.CERT_REQS
+ assert ret == sentinel.WRAPPED_SOCKET
def test_shutdown_transport(self):
self.t.sock = None