summaryrefslogtreecommitdiff
path: root/keystonemiddleware/tests/unit/client_fixtures.py
diff options
context:
space:
mode:
Diffstat (limited to 'keystonemiddleware/tests/unit/client_fixtures.py')
-rw-r--r--keystonemiddleware/tests/unit/client_fixtures.py109
1 files changed, 109 insertions, 0 deletions
diff --git a/keystonemiddleware/tests/unit/client_fixtures.py b/keystonemiddleware/tests/unit/client_fixtures.py
index 27ba482..53ecff5 100644
--- a/keystonemiddleware/tests/unit/client_fixtures.py
+++ b/keystonemiddleware/tests/unit/client_fixtures.py
@@ -12,9 +12,17 @@
# License for the specific language governing permissions and limitations
# under the License.
+import base64
+import datetime
+import hashlib
import os
+import ssl
import uuid
+from cryptography.hazmat.primitives.asymmetric import rsa
+from cryptography.hazmat.primitives import hashes
+from cryptography.hazmat.primitives.serialization import Encoding
+from cryptography import x509
import fixtures
from keystoneauth1 import fixture
from oslo_serialization import jsonutils
@@ -69,6 +77,28 @@ class Examples(fixtures.Fixture):
self.v3_APP_CRED_EMPTY_ACCESS_RULES = 'c75905c307f04fdd9979126582d7aae'
self.v3_APP_CRED_MATCHING_RULES = 'ad49decc7106489d95ca9ed874b6cb66'
+ self.v3_OAUTH2_CREDENTIAL = uuid.uuid4().hex
+ self.V3_OAUTH2_MTLS_CERTIFICATE = self._create_pem_certificate(
+ self._create_dn(
+ country_name='jp',
+ state_or_province_name='kanagawa',
+ locality_name='kawasaki',
+ organization_name='fujitsu',
+ organizational_unit_name='test',
+ common_name='root'
+ )
+ )
+ self.V3_OAUTH2_MTLS_CERTIFICATE_DIFF = self._create_pem_certificate(
+ self._create_dn(
+ country_name='jp',
+ state_or_province_name='kanagawa',
+ locality_name='kawasaki',
+ organization_name='fujitsu',
+ organizational_unit_name='test',
+ common_name='diff'
+ )
+ )
+
# JSON responses keyed by token ID
self.TOKEN_RESPONSES = {}
@@ -417,8 +447,87 @@ class Examples(fixtures.Fixture):
svc.add_endpoint('public', 'https://swift.openstack.example.org')
self.TOKEN_RESPONSES[self.v3_APP_CRED_MATCHING_RULES] = token
+ # oauth2 credential token
+ cert_pem = ssl.DER_cert_to_PEM_cert(self.V3_OAUTH2_MTLS_CERTIFICATE)
+ thumb_sha256 = hashlib.sha256(cert_pem.encode('ascii')).digest()
+ cert_thumb = base64.urlsafe_b64encode(thumb_sha256).decode('ascii')
+
+ token = fixture.V3Token(
+ methods=['oauth2_credential'],
+ user_id=USER_ID,
+ user_name=USER_NAME,
+ project_id=PROJECT_ID,
+ oauth2_thumbprint=cert_thumb,
+ )
+ self.TOKEN_RESPONSES[self.v3_OAUTH2_CREDENTIAL] = token
+
self.JSON_TOKEN_RESPONSES = dict([(k, jsonutils.dumps(v)) for k, v in
self.TOKEN_RESPONSES.items()])
+ def _create_dn(
+ self,
+ common_name=None,
+ locality_name=None,
+ state_or_province_name=None,
+ organization_name=None,
+ organizational_unit_name=None,
+ country_name=None,
+ street_address=None,
+ domain_component=None,
+ user_id=None,
+ email_address=None,
+ ):
+ oid = x509.NameOID
+ attr = x509.NameAttribute
+ dn = []
+ if common_name:
+ dn.append(attr(oid.COMMON_NAME, common_name))
+ if locality_name:
+ dn.append(attr(oid.LOCALITY_NAME, locality_name))
+ if state_or_province_name:
+ dn.append(attr(oid.STATE_OR_PROVINCE_NAME, state_or_province_name))
+ if organization_name:
+ dn.append(attr(oid.ORGANIZATION_NAME, organization_name))
+ if organizational_unit_name:
+ dn.append(
+ attr(
+ oid.ORGANIZATIONAL_UNIT_NAME,
+ organizational_unit_name))
+ if country_name:
+ dn.append(attr(oid.COUNTRY_NAME, country_name))
+ if street_address:
+ dn.append(attr(oid.STREET_ADDRESS, street_address))
+ if domain_component:
+ dn.append(attr(oid.DOMAIN_COMPONENT, domain_component))
+ if user_id:
+ dn.append(attr(oid.USER_ID, user_id))
+ if email_address:
+ dn.append(attr(oid.EMAIL_ADDRESS, email_address))
+ return x509.Name(dn)
+
+ def _create_certificate(self, subject_dn, ca=None, ca_key=None):
+ private_key = rsa.generate_private_key(
+ public_exponent=65537,
+ key_size=2048,
+ )
+ issuer = ca.subject if ca else subject_dn
+ if not ca_key:
+ ca_key = private_key
+ today = datetime.datetime.today()
+ cert = x509.CertificateBuilder(
+ issuer_name=issuer,
+ subject_name=subject_dn,
+ public_key=private_key.public_key(),
+ serial_number=x509.random_serial_number(),
+ not_valid_before=today,
+ not_valid_after=today + datetime.timedelta(365, 0, 0),
+ ).sign(ca_key, hashes.SHA256())
+
+ return cert, private_key
+
+ def _create_pem_certificate(self, subject_dn, ca=None, ca_key=None):
+ cert, _ = self._create_certificate(subject_dn, ca=ca, ca_key=ca_key)
+ return cert.public_bytes(Encoding.PEM)
+
EXAMPLES_RESOURCE = testresources.FixtureResource(Examples())