diff options
Diffstat (limited to 'keystonemiddleware/tests/unit/client_fixtures.py')
-rw-r--r-- | keystonemiddleware/tests/unit/client_fixtures.py | 109 |
1 files changed, 109 insertions, 0 deletions
diff --git a/keystonemiddleware/tests/unit/client_fixtures.py b/keystonemiddleware/tests/unit/client_fixtures.py index 27ba482..53ecff5 100644 --- a/keystonemiddleware/tests/unit/client_fixtures.py +++ b/keystonemiddleware/tests/unit/client_fixtures.py @@ -12,9 +12,17 @@ # License for the specific language governing permissions and limitations # under the License. +import base64 +import datetime +import hashlib import os +import ssl import uuid +from cryptography.hazmat.primitives.asymmetric import rsa +from cryptography.hazmat.primitives import hashes +from cryptography.hazmat.primitives.serialization import Encoding +from cryptography import x509 import fixtures from keystoneauth1 import fixture from oslo_serialization import jsonutils @@ -69,6 +77,28 @@ class Examples(fixtures.Fixture): self.v3_APP_CRED_EMPTY_ACCESS_RULES = 'c75905c307f04fdd9979126582d7aae' self.v3_APP_CRED_MATCHING_RULES = 'ad49decc7106489d95ca9ed874b6cb66' + self.v3_OAUTH2_CREDENTIAL = uuid.uuid4().hex + self.V3_OAUTH2_MTLS_CERTIFICATE = self._create_pem_certificate( + self._create_dn( + country_name='jp', + state_or_province_name='kanagawa', + locality_name='kawasaki', + organization_name='fujitsu', + organizational_unit_name='test', + common_name='root' + ) + ) + self.V3_OAUTH2_MTLS_CERTIFICATE_DIFF = self._create_pem_certificate( + self._create_dn( + country_name='jp', + state_or_province_name='kanagawa', + locality_name='kawasaki', + organization_name='fujitsu', + organizational_unit_name='test', + common_name='diff' + ) + ) + # JSON responses keyed by token ID self.TOKEN_RESPONSES = {} @@ -417,8 +447,87 @@ class Examples(fixtures.Fixture): svc.add_endpoint('public', 'https://swift.openstack.example.org') self.TOKEN_RESPONSES[self.v3_APP_CRED_MATCHING_RULES] = token + # oauth2 credential token + cert_pem = ssl.DER_cert_to_PEM_cert(self.V3_OAUTH2_MTLS_CERTIFICATE) + thumb_sha256 = hashlib.sha256(cert_pem.encode('ascii')).digest() + cert_thumb = base64.urlsafe_b64encode(thumb_sha256).decode('ascii') + + token = fixture.V3Token( + methods=['oauth2_credential'], + user_id=USER_ID, + user_name=USER_NAME, + project_id=PROJECT_ID, + oauth2_thumbprint=cert_thumb, + ) + self.TOKEN_RESPONSES[self.v3_OAUTH2_CREDENTIAL] = token + self.JSON_TOKEN_RESPONSES = dict([(k, jsonutils.dumps(v)) for k, v in self.TOKEN_RESPONSES.items()]) + def _create_dn( + self, + common_name=None, + locality_name=None, + state_or_province_name=None, + organization_name=None, + organizational_unit_name=None, + country_name=None, + street_address=None, + domain_component=None, + user_id=None, + email_address=None, + ): + oid = x509.NameOID + attr = x509.NameAttribute + dn = [] + if common_name: + dn.append(attr(oid.COMMON_NAME, common_name)) + if locality_name: + dn.append(attr(oid.LOCALITY_NAME, locality_name)) + if state_or_province_name: + dn.append(attr(oid.STATE_OR_PROVINCE_NAME, state_or_province_name)) + if organization_name: + dn.append(attr(oid.ORGANIZATION_NAME, organization_name)) + if organizational_unit_name: + dn.append( + attr( + oid.ORGANIZATIONAL_UNIT_NAME, + organizational_unit_name)) + if country_name: + dn.append(attr(oid.COUNTRY_NAME, country_name)) + if street_address: + dn.append(attr(oid.STREET_ADDRESS, street_address)) + if domain_component: + dn.append(attr(oid.DOMAIN_COMPONENT, domain_component)) + if user_id: + dn.append(attr(oid.USER_ID, user_id)) + if email_address: + dn.append(attr(oid.EMAIL_ADDRESS, email_address)) + return x509.Name(dn) + + def _create_certificate(self, subject_dn, ca=None, ca_key=None): + private_key = rsa.generate_private_key( + public_exponent=65537, + key_size=2048, + ) + issuer = ca.subject if ca else subject_dn + if not ca_key: + ca_key = private_key + today = datetime.datetime.today() + cert = x509.CertificateBuilder( + issuer_name=issuer, + subject_name=subject_dn, + public_key=private_key.public_key(), + serial_number=x509.random_serial_number(), + not_valid_before=today, + not_valid_after=today + datetime.timedelta(365, 0, 0), + ).sign(ca_key, hashes.SHA256()) + + return cert, private_key + + def _create_pem_certificate(self, subject_dn, ca=None, ca_key=None): + cert, _ = self._create_certificate(subject_dn, ca=ca, ca_key=ca_key) + return cert.public_bytes(Encoding.PEM) + EXAMPLES_RESOURCE = testresources.FixtureResource(Examples()) |