diff options
| author | Quentin Pradet <quentin.pradet@gmail.com> | 2020-01-25 22:24:07 +0400 |
|---|---|---|
| committer | Seth Michael Larson <sethmichaellarson@gmail.com> | 2020-01-25 12:24:07 -0600 |
| commit | 5fa45314abd726ff9b1f01301179871aae91f7eb (patch) | |
| tree | 7e2273b96f88b027a3c55cbd69f40d7ca06791cc /test | |
| parent | 80852b3585f6047bb6375c73a5a21f74b74ec3f6 (diff) | |
| download | urllib3-5fa45314abd726ff9b1f01301179871aae91f7eb.tar.gz | |
Generate client certs dynamically using trustme (#1791)
Diffstat (limited to 'test')
| -rw-r--r-- | test/with_dummyserver/test_socketlevel.py | 76 |
1 files changed, 54 insertions, 22 deletions
diff --git a/test/with_dummyserver/test_socketlevel.py b/test/with_dummyserver/test_socketlevel.py index 74ec5363..abfd0cfb 100644 --- a/test/with_dummyserver/test_socketlevel.py +++ b/test/with_dummyserver/test_socketlevel.py @@ -17,13 +17,7 @@ from urllib3.util.retry import Retry from urllib3._collections import HTTPHeaderDict from dummyserver.testcase import SocketDummyServerTestCase, consume_socket -from dummyserver.server import ( - DEFAULT_CERTS, - DEFAULT_CA, - COMBINED_CERT_AND_KEY, - PASSWORD_KEYFILE, - get_unreachable_address, -) +from dummyserver.server import DEFAULT_CERTS, DEFAULT_CA, get_unreachable_address from .. import onlyPy3, LogRecorder @@ -36,13 +30,19 @@ except ImportError: from collections import OrderedDict +import os.path from threading import Event import select import socket +import shutil import ssl +import tempfile import mock +from cryptography.hazmat.backends import default_backend +from cryptography.hazmat.primitives import serialization import pytest +import trustme from test import ( fails_on_travis_gce, @@ -110,6 +110,40 @@ class TestClientCerts(SocketDummyServerTestCase): Tests for client certificate support. """ + @classmethod + def _encrypt_key(cls, private_key_pem, password): + private_key = serialization.load_pem_private_key( + private_key_pem.bytes(), password=None, backend=default_backend() + ) + encrypted_key = private_key.private_bytes( + serialization.Encoding.PEM, + serialization.PrivateFormat.TraditionalOpenSSL, + serialization.BestAvailableEncryption(password), + ) + return trustme.Blob(encrypted_key) + + @classmethod + def setup_class(cls): + cls.tmpdir = tempfile.mkdtemp() + ca = trustme.CA() + cert = ca.issue_cert(u"localhost") + encrypted_key = cls._encrypt_key(cert.private_key_pem, b"letmein") + + cls.ca_path = os.path.join(cls.tmpdir, "ca.pem") + cls.cert_combined_path = os.path.join(cls.tmpdir, "server.combined.pem") + cls.cert_path = os.path.join(cls.tmpdir, "server.pem") + cls.key_path = os.path.join(cls.tmpdir, "key.pem") + cls.password_key_path = os.path.join(cls.tmpdir, "password_key.pem") + + ca.cert_pem.write_to_path(cls.ca_path) + cert.private_key_and_cert_chain_pem.write_to_path(cls.cert_combined_path) + cert.cert_chain_pems[0].write_to_path(cls.cert_path) + cert.private_key_pem.write_to_path(cls.key_path) + encrypted_key.write_to_path(cls.password_key_path) + + def teardown_class(cls): + shutil.rmtree(cls.tmpdir) + def _wrap_in_ssl(self, sock): """ Given a single socket, wraps it in TLS. @@ -118,9 +152,9 @@ class TestClientCerts(SocketDummyServerTestCase): sock, ssl_version=ssl.PROTOCOL_SSLv23, cert_reqs=ssl.CERT_REQUIRED, - ca_certs=DEFAULT_CA, - certfile=DEFAULT_CERTS["certfile"], - keyfile=DEFAULT_CERTS["keyfile"], + ca_certs=self.ca_path, + certfile=self.cert_path, + keyfile=self.key_path, server_side=True, ) @@ -158,10 +192,10 @@ class TestClientCerts(SocketDummyServerTestCase): with HTTPSConnectionPool( self.host, self.port, - cert_file=DEFAULT_CERTS["certfile"], - key_file=DEFAULT_CERTS["keyfile"], + cert_file=self.cert_path, + key_file=self.key_path, cert_reqs="REQUIRED", - ca_certs=DEFAULT_CA, + ca_certs=self.ca_path, ) as pool: pool.request("GET", "/", retries=0) done_receiving.set() @@ -202,9 +236,9 @@ class TestClientCerts(SocketDummyServerTestCase): with HTTPSConnectionPool( self.host, self.port, - cert_file=COMBINED_CERT_AND_KEY, + cert_file=self.cert_combined_path, cert_reqs="REQUIRED", - ca_certs=DEFAULT_CA, + ca_certs=self.ca_path, ) as pool: pool.request("GET", "/", retries=0) done_receiving.set() @@ -230,7 +264,7 @@ class TestClientCerts(SocketDummyServerTestCase): self._start_server(socket_handler) with HTTPSConnectionPool( - self.host, self.port, cert_reqs="REQUIRED", ca_certs=DEFAULT_CA + self.host, self.port, cert_reqs="REQUIRED", ca_certs=self.ca_path ) as pool: with pytest.raises(MaxRetryError): pool.request("GET", "/", retries=0) @@ -277,9 +311,7 @@ class TestClientCerts(SocketDummyServerTestCase): self._start_server(socket_handler) ssl_context = ssl_.SSLContext(ssl_.PROTOCOL_SSLv23) ssl_context.load_cert_chain( - certfile=DEFAULT_CERTS["certfile"], - keyfile=PASSWORD_KEYFILE, - password=password, + certfile=self.cert_path, keyfile=self.password_key_path, password=password ) with HTTPSConnectionPool( @@ -287,7 +319,7 @@ class TestClientCerts(SocketDummyServerTestCase): self.port, ssl_context=ssl_context, cert_reqs="REQUIRED", - ca_certs=DEFAULT_CA, + ca_certs=self.ca_path, ) as pool: pool.request("GET", "/", retries=0) done_receiving.set() @@ -308,8 +340,8 @@ class TestClientCerts(SocketDummyServerTestCase): with pytest.raises(expected_error): context.load_cert_chain( - certfile=DEFAULT_CERTS["certfile"], - keyfile=PASSWORD_KEYFILE, + certfile=self.cert_path, + keyfile=self.password_key_path, password=b"letmei", ) |
