summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authortpazderka <tomas.pazderka@nic.cz>2014-11-12 11:51:33 +0100
committertpazderka <tomas.pazderka@nic.cz>2014-12-10 15:36:25 +0100
commit0f7768b0618b637f5803018ff021d462da294e90 (patch)
tree8695db4007ec37956612ed0bfea9eb3ba23982c7
parenteaac71d3f288a4ef9baa3916a04ec735d29cebcc (diff)
downloadpysaml2-0f7768b0618b637f5803018ff021d462da294e90.tar.gz
Added backward support for Metadata
-rw-r--r--src/saml2/mdstore.py72
-rw-r--r--tests/test_30_mdstore_old.py270
2 files changed, 312 insertions, 30 deletions
diff --git a/src/saml2/mdstore.py b/src/saml2/mdstore.py
index dd38abe8..e2c6abe5 100644
--- a/src/saml2/mdstore.py
+++ b/src/saml2/mdstore.py
@@ -803,36 +803,48 @@ class MetadataStore(object):
self.metadata[key] = _md
def imp(self, spec):
- for item in spec:
- try:
- key = item['class']
- except (KeyError, AttributeError):
- raise SAMLError("Misconfiguration in metadata %s" % item)
- mod, clas = key.rsplit('.', 1)
- try:
- mod = import_module(mod)
- MDloader = getattr(mod, clas)
- except (ImportError, AttributeError):
- raise SAMLError("Unknown metadata loader %s" % key)
-
- # Separately handle MDExtern
- if MDloader == MetaDataExtern:
- item['http'] = self.http
- item['security'] = self.security
-
- for key in item['metadata']:
- # Separately handle MetaDataFile and directory
- if MDloader == MetaDataFile and os.path.isdir(key[0]):
- files = [f for f in listdir(key[0]) if isfile(join(key[0], f))]
- for fil in files:
- _fil = join(key[0], fil)
- _md = MetaDataFile(self.onts, self.attrc, _fil)
- _md.load()
- self.metadata[_fil] = _md
- return
- _md = MDloader(self.onts, self.attrc, *key)
- _md.load()
- self.metadata[key[0]] = _md
+ # This serves as a backwards compatibility
+ if type(spec) is dict:
+ # Old style...
+ for key, vals in spec.items():
+ for val in vals:
+ if isinstance(val, dict):
+ if not self.check_validity:
+ val["check_validity"] = False
+ self.load(key, **val)
+ else:
+ self.load(key, val)
+ else:
+ for item in spec:
+ try:
+ key = item['class']
+ except (KeyError, AttributeError):
+ raise SAMLError("Misconfiguration in metadata %s" % item)
+ mod, clas = key.rsplit('.', 1)
+ try:
+ mod = import_module(mod)
+ MDloader = getattr(mod, clas)
+ except (ImportError, AttributeError):
+ raise SAMLError("Unknown metadata loader %s" % key)
+
+ # Separately handle MDExtern
+ if MDloader == MetaDataExtern:
+ item['http'] = self.http
+ item['security'] = self.security
+
+ for key in item['metadata']:
+ # Separately handle MetaDataFile and directory
+ if MDloader == MetaDataFile and os.path.isdir(key[0]):
+ files = [f for f in listdir(key[0]) if isfile(join(key[0], f))]
+ for fil in files:
+ _fil = join(key[0], fil)
+ _md = MetaDataFile(self.onts, self.attrc, _fil)
+ _md.load()
+ self.metadata[_fil] = _md
+ return
+ _md = MDloader(self.onts, self.attrc, *key)
+ _md.load()
+ self.metadata[key[0]] = _md
def service(self, entity_id, typ, service, binding=None):
known_entity = False
diff --git a/tests/test_30_mdstore_old.py b/tests/test_30_mdstore_old.py
new file mode 100644
index 00000000..0f3d3d04
--- /dev/null
+++ b/tests/test_30_mdstore_old.py
@@ -0,0 +1,270 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+import datetime
+import re
+from urllib import quote_plus
+from saml2.httpbase import HTTPBase
+
+from saml2.mdstore import MetadataStore, MetaDataMDX
+from saml2.mdstore import destinations
+from saml2.mdstore import name
+
+from saml2 import md
+from saml2 import sigver
+from saml2 import BINDING_SOAP
+from saml2 import BINDING_HTTP_REDIRECT
+from saml2 import BINDING_HTTP_POST
+from saml2 import BINDING_HTTP_ARTIFACT
+from saml2 import saml
+from saml2 import config
+from saml2.attribute_converter import ac_factory
+from saml2.attribute_converter import d_to_local_name
+
+from saml2.extension import mdui
+from saml2.extension import idpdisc
+from saml2.extension import dri
+from saml2.extension import mdattr
+from saml2.extension import ui
+from saml2.s_utils import UnknownPrincipal
+import xmldsig
+import xmlenc
+
+from pathutils import full_path
+
+sec_config = config.Config()
+#sec_config.xmlsec_binary = sigver.get_xmlsec_binary(["/opt/local/bin"])
+
+ONTS = {
+ saml.NAMESPACE: saml,
+ mdui.NAMESPACE: mdui,
+ mdattr.NAMESPACE: mdattr,
+ dri.NAMESPACE: dri,
+ ui.NAMESPACE: ui,
+ idpdisc.NAMESPACE: idpdisc,
+ md.NAMESPACE: md,
+ xmldsig.NAMESPACE: xmldsig,
+ xmlenc.NAMESPACE: xmlenc
+}
+
+ATTRCONV = ac_factory(full_path("attributemaps"))
+
+METADATACONF = {
+ "1": {
+ "local": [full_path("swamid-1.0.xml")]
+ },
+ "2": {
+ "local": [full_path("InCommon-metadata.xml")]
+ },
+ "3": {
+ "local": [full_path("extended.xml")]
+ },
+ "7": {
+ "local": [full_path("metadata_sp_1.xml"),
+ full_path("InCommon-metadata.xml")],
+ "remote": [
+ {"url": "https://kalmar2.org/simplesaml/module.php/aggregator/?id=kalmarcentral2&set=saml2",
+ "cert": full_path("kalmar2.pem")}]
+ },
+ "4": {
+ "local": [full_path("metadata_example.xml")]
+ },
+ "5": {
+ "local": [full_path("metadata.aaitest.xml")]
+ },
+ "8": {
+ "mdfile": [full_path("swamid.md")]
+ },
+ "9": {
+ "local": [full_path("metadata")]
+ }
+}
+
+
+def _eq(l1, l2):
+ return set(l1) == set(l2)
+
+
+def _fix_valid_until(xmlstring):
+ new_date = datetime.datetime.now() + datetime.timedelta(days=1)
+ new_date = new_date.strftime("%Y-%m-%dT%H:%M:%SZ")
+ return re.sub(r' validUntil=".*?"', ' validUntil="%s"' % new_date,
+ xmlstring)
+
+
+def test_swami_1():
+ UMU_IDP = 'https://idp.umu.se/saml2/idp/metadata.php'
+ mds = MetadataStore(ONTS.values(), ATTRCONV, sec_config,
+ disable_ssl_certificate_validation=True)
+
+ mds.imp(METADATACONF["1"])
+ assert len(mds) == 1 # One source
+ idps = mds.with_descriptor("idpsso")
+ assert idps.keys()
+ idpsso = mds.single_sign_on_service(UMU_IDP)
+ assert len(idpsso) == 1
+ assert destinations(idpsso) == [
+ 'https://idp.umu.se/saml2/idp/SSOService.php']
+
+ _name = name(mds[UMU_IDP])
+ assert _name == u'UmeƄ University (SAML2)'
+ certs = mds.certs(UMU_IDP, "idpsso", "signing")
+ assert len(certs) == 1
+
+ sps = mds.with_descriptor("spsso")
+ assert len(sps) == 108
+
+ wants = mds.attribute_requirement('https://connect8.sunet.se/shibboleth')
+ lnamn = [d_to_local_name(mds.attrc, attr) for attr in wants["optional"]]
+ assert _eq(lnamn, ['eduPersonPrincipalName', 'mail', 'givenName', 'sn',
+ 'eduPersonScopedAffiliation'])
+
+ wants = mds.attribute_requirement('https://beta.lobber.se/shibboleth')
+ assert wants["required"] == []
+ lnamn = [d_to_local_name(mds.attrc, attr) for attr in wants["optional"]]
+ assert _eq(lnamn, ['eduPersonPrincipalName', 'mail', 'givenName', 'sn',
+ 'eduPersonScopedAffiliation', 'eduPersonEntitlement'])
+
+
+def test_incommon_1():
+ mds = MetadataStore(ONTS.values(), ATTRCONV, sec_config,
+ disable_ssl_certificate_validation=True)
+
+ mds.imp(METADATACONF["2"])
+
+ print mds.entities()
+ assert mds.entities() > 1700
+ idps = mds.with_descriptor("idpsso")
+ print idps.keys()
+ assert len(idps) > 300 # ~ 18%
+ try:
+ _ = mds.single_sign_on_service('urn:mace:incommon:uiuc.edu')
+ except UnknownPrincipal:
+ pass
+
+ idpsso = mds.single_sign_on_service('urn:mace:incommon:alaska.edu')
+ assert len(idpsso) == 1
+ print idpsso
+ assert destinations(idpsso) == [
+ 'https://idp.alaska.edu/idp/profile/SAML2/Redirect/SSO']
+
+ sps = mds.with_descriptor("spsso")
+
+ acs_sp = []
+ for nam, desc in sps.items():
+ if "attribute_consuming_service" in desc:
+ acs_sp.append(nam)
+
+ assert len(acs_sp) == 0
+
+ # Look for attribute authorities
+ aas = mds.with_descriptor("attribute_authority")
+
+ print aas.keys()
+ assert len(aas) == 180
+
+
+def test_ext_2():
+ mds = MetadataStore(ONTS.values(), ATTRCONV, sec_config,
+ disable_ssl_certificate_validation=True)
+
+ mds.imp(METADATACONF["3"])
+ # No specific binding defined
+
+ ents = mds.with_descriptor("spsso")
+ for binding in [BINDING_SOAP, BINDING_HTTP_POST, BINDING_HTTP_ARTIFACT,
+ BINDING_HTTP_REDIRECT]:
+ assert mds.single_logout_service(ents.keys()[0], binding, "spsso")
+
+
+def test_example():
+ mds = MetadataStore(ONTS.values(), ATTRCONV, sec_config,
+ disable_ssl_certificate_validation=True)
+
+ mds.imp(METADATACONF["4"])
+ assert len(mds.keys()) == 1
+ idps = mds.with_descriptor("idpsso")
+
+ assert idps.keys() == [
+ 'http://xenosmilus.umdc.umu.se/simplesaml/saml2/idp/metadata.php']
+ certs = mds.certs(
+ 'http://xenosmilus.umdc.umu.se/simplesaml/saml2/idp/metadata.php',
+ "idpsso", "signing")
+ assert len(certs) == 1
+
+
+def test_switch_1():
+ mds = MetadataStore(ONTS.values(), ATTRCONV, sec_config,
+ disable_ssl_certificate_validation=True)
+
+ mds.imp(METADATACONF["5"])
+ assert len(mds.keys()) > 160
+ idps = mds.with_descriptor("idpsso")
+ print idps.keys()
+ idpsso = mds.single_sign_on_service(
+ 'https://aai-demo-idp.switch.ch/idp/shibboleth')
+ assert len(idpsso) == 1
+ print idpsso
+ assert destinations(idpsso) == [
+ 'https://aai-demo-idp.switch.ch/idp/profile/SAML2/Redirect/SSO']
+ assert len(idps) > 30
+ aas = mds.with_descriptor("attribute_authority")
+ print aas.keys()
+ aad = aas['https://aai-demo-idp.switch.ch/idp/shibboleth']
+ print aad.keys()
+ assert len(aad["attribute_authority_descriptor"]) == 1
+ assert len(aad["idpsso_descriptor"]) == 1
+
+ sps = mds.with_descriptor("spsso")
+ dual = [eid for eid, ent in idps.items() if eid in sps]
+ print len(dual)
+ assert len(dual) == 0
+
+
+def test_metadata_file():
+ sec_config.xmlsec_binary = sigver.get_xmlsec_binary(["/opt/local/bin"])
+ mds = MetadataStore(ONTS.values(), ATTRCONV, sec_config,
+ disable_ssl_certificate_validation=True)
+
+ mds.imp(METADATACONF["8"])
+ print len(mds.keys())
+ assert len(mds.keys()) == 560
+
+
+def test_mdx_service():
+ sec_config.xmlsec_binary = sigver.get_xmlsec_binary(["/opt/local/bin"])
+ http = HTTPBase(verify=False, ca_bundle=None)
+
+ mdx = MetaDataMDX(quote_plus, ONTS.values(), ATTRCONV,
+ "http://pyff-test.nordu.net",
+ sec_config, None, http)
+ foo = mdx.service("https://idp.umu.se/saml2/idp/metadata.php",
+ "idpsso_descriptor", "single_sign_on_service")
+
+ assert len(foo) == 1
+ assert foo.keys()[0] == BINDING_HTTP_REDIRECT
+
+
+def test_mdx_certs():
+ sec_config.xmlsec_binary = sigver.get_xmlsec_binary(["/opt/local/bin"])
+ http = HTTPBase(verify=False, ca_bundle=None)
+
+ mdx = MetaDataMDX(quote_plus, ONTS.values(), ATTRCONV,
+ "http://pyff-test.nordu.net",
+ sec_config, None, http)
+ foo = mdx.certs("https://idp.umu.se/saml2/idp/metadata.php", "idpsso")
+
+ assert len(foo) == 1
+
+
+def test_load_local_dir():
+ sec_config.xmlsec_binary = sigver.get_xmlsec_binary(["/opt/local/bin"])
+ mds = MetadataStore(ONTS.values(), ATTRCONV, sec_config,
+ disable_ssl_certificate_validation=True)
+
+ mds.imp(METADATACONF["9"])
+ print mds
+ assert len(mds) == 3 # Three sources
+ assert len(mds.keys()) == 4 # number of idps
+
+if __name__ == "__main__":
+ test_mdx_certs()