summaryrefslogtreecommitdiff
path: root/test
diff options
context:
space:
mode:
authorQuentin Pradet <quentin.pradet@gmail.com>2020-01-25 22:24:07 +0400
committerSeth Michael Larson <sethmichaellarson@gmail.com>2020-01-25 12:24:07 -0600
commit5fa45314abd726ff9b1f01301179871aae91f7eb (patch)
tree7e2273b96f88b027a3c55cbd69f40d7ca06791cc /test
parent80852b3585f6047bb6375c73a5a21f74b74ec3f6 (diff)
downloadurllib3-5fa45314abd726ff9b1f01301179871aae91f7eb.tar.gz
Generate client certs dynamically using trustme (#1791)
Diffstat (limited to 'test')
-rw-r--r--test/with_dummyserver/test_socketlevel.py76
1 files changed, 54 insertions, 22 deletions
diff --git a/test/with_dummyserver/test_socketlevel.py b/test/with_dummyserver/test_socketlevel.py
index 74ec5363..abfd0cfb 100644
--- a/test/with_dummyserver/test_socketlevel.py
+++ b/test/with_dummyserver/test_socketlevel.py
@@ -17,13 +17,7 @@ from urllib3.util.retry import Retry
from urllib3._collections import HTTPHeaderDict
from dummyserver.testcase import SocketDummyServerTestCase, consume_socket
-from dummyserver.server import (
- DEFAULT_CERTS,
- DEFAULT_CA,
- COMBINED_CERT_AND_KEY,
- PASSWORD_KEYFILE,
- get_unreachable_address,
-)
+from dummyserver.server import DEFAULT_CERTS, DEFAULT_CA, get_unreachable_address
from .. import onlyPy3, LogRecorder
@@ -36,13 +30,19 @@ except ImportError:
from collections import OrderedDict
+import os.path
from threading import Event
import select
import socket
+import shutil
import ssl
+import tempfile
import mock
+from cryptography.hazmat.backends import default_backend
+from cryptography.hazmat.primitives import serialization
import pytest
+import trustme
from test import (
fails_on_travis_gce,
@@ -110,6 +110,40 @@ class TestClientCerts(SocketDummyServerTestCase):
Tests for client certificate support.
"""
+ @classmethod
+ def _encrypt_key(cls, private_key_pem, password):
+ private_key = serialization.load_pem_private_key(
+ private_key_pem.bytes(), password=None, backend=default_backend()
+ )
+ encrypted_key = private_key.private_bytes(
+ serialization.Encoding.PEM,
+ serialization.PrivateFormat.TraditionalOpenSSL,
+ serialization.BestAvailableEncryption(password),
+ )
+ return trustme.Blob(encrypted_key)
+
+ @classmethod
+ def setup_class(cls):
+ cls.tmpdir = tempfile.mkdtemp()
+ ca = trustme.CA()
+ cert = ca.issue_cert(u"localhost")
+ encrypted_key = cls._encrypt_key(cert.private_key_pem, b"letmein")
+
+ cls.ca_path = os.path.join(cls.tmpdir, "ca.pem")
+ cls.cert_combined_path = os.path.join(cls.tmpdir, "server.combined.pem")
+ cls.cert_path = os.path.join(cls.tmpdir, "server.pem")
+ cls.key_path = os.path.join(cls.tmpdir, "key.pem")
+ cls.password_key_path = os.path.join(cls.tmpdir, "password_key.pem")
+
+ ca.cert_pem.write_to_path(cls.ca_path)
+ cert.private_key_and_cert_chain_pem.write_to_path(cls.cert_combined_path)
+ cert.cert_chain_pems[0].write_to_path(cls.cert_path)
+ cert.private_key_pem.write_to_path(cls.key_path)
+ encrypted_key.write_to_path(cls.password_key_path)
+
+ def teardown_class(cls):
+ shutil.rmtree(cls.tmpdir)
+
def _wrap_in_ssl(self, sock):
"""
Given a single socket, wraps it in TLS.
@@ -118,9 +152,9 @@ class TestClientCerts(SocketDummyServerTestCase):
sock,
ssl_version=ssl.PROTOCOL_SSLv23,
cert_reqs=ssl.CERT_REQUIRED,
- ca_certs=DEFAULT_CA,
- certfile=DEFAULT_CERTS["certfile"],
- keyfile=DEFAULT_CERTS["keyfile"],
+ ca_certs=self.ca_path,
+ certfile=self.cert_path,
+ keyfile=self.key_path,
server_side=True,
)
@@ -158,10 +192,10 @@ class TestClientCerts(SocketDummyServerTestCase):
with HTTPSConnectionPool(
self.host,
self.port,
- cert_file=DEFAULT_CERTS["certfile"],
- key_file=DEFAULT_CERTS["keyfile"],
+ cert_file=self.cert_path,
+ key_file=self.key_path,
cert_reqs="REQUIRED",
- ca_certs=DEFAULT_CA,
+ ca_certs=self.ca_path,
) as pool:
pool.request("GET", "/", retries=0)
done_receiving.set()
@@ -202,9 +236,9 @@ class TestClientCerts(SocketDummyServerTestCase):
with HTTPSConnectionPool(
self.host,
self.port,
- cert_file=COMBINED_CERT_AND_KEY,
+ cert_file=self.cert_combined_path,
cert_reqs="REQUIRED",
- ca_certs=DEFAULT_CA,
+ ca_certs=self.ca_path,
) as pool:
pool.request("GET", "/", retries=0)
done_receiving.set()
@@ -230,7 +264,7 @@ class TestClientCerts(SocketDummyServerTestCase):
self._start_server(socket_handler)
with HTTPSConnectionPool(
- self.host, self.port, cert_reqs="REQUIRED", ca_certs=DEFAULT_CA
+ self.host, self.port, cert_reqs="REQUIRED", ca_certs=self.ca_path
) as pool:
with pytest.raises(MaxRetryError):
pool.request("GET", "/", retries=0)
@@ -277,9 +311,7 @@ class TestClientCerts(SocketDummyServerTestCase):
self._start_server(socket_handler)
ssl_context = ssl_.SSLContext(ssl_.PROTOCOL_SSLv23)
ssl_context.load_cert_chain(
- certfile=DEFAULT_CERTS["certfile"],
- keyfile=PASSWORD_KEYFILE,
- password=password,
+ certfile=self.cert_path, keyfile=self.password_key_path, password=password
)
with HTTPSConnectionPool(
@@ -287,7 +319,7 @@ class TestClientCerts(SocketDummyServerTestCase):
self.port,
ssl_context=ssl_context,
cert_reqs="REQUIRED",
- ca_certs=DEFAULT_CA,
+ ca_certs=self.ca_path,
) as pool:
pool.request("GET", "/", retries=0)
done_receiving.set()
@@ -308,8 +340,8 @@ class TestClientCerts(SocketDummyServerTestCase):
with pytest.raises(expected_error):
context.load_cert_chain(
- certfile=DEFAULT_CERTS["certfile"],
- keyfile=PASSWORD_KEYFILE,
+ certfile=self.cert_path,
+ keyfile=self.password_key_path,
password=b"letmei",
)