#!/usr/bin/env python import argparse import logging import os import re from Cookie import SimpleCookie import sp_conf from urlparse import parse_qs from saml2 import BINDING_HTTP_REDIRECT from saml2 import time_util from saml2.httputil import NotFound from saml2.httputil import Redirect from saml2.httputil import Response from saml2.httputil import Unauthorized # from saml2.httputil import ServiceError from saml2.metadata import create_metadata_string logger = logging.getLogger("saml2.SP") args = None # ----------------------------------------------------------------------------- def dict_to_table(ava, lev=0, width=1): txt = [f'\n'] for prop, valarr in ava.items(): txt.append("\n") if isinstance(valarr, basestring): txt.append(f"\n") try: txt.append(f"\n") except AttributeError: txt.append(f"\n") elif isinstance(valarr, list): i = 0 n = len(valarr) for val in valarr: if not i: txt.append(f"\n") if isinstance(val, dict): txt.append("\n") else: try: txt.append(f"\n") except AttributeError: txt.append(f"\n") if n > 1: txt.append("\n") n -= 1 i += 1 elif isinstance(valarr, dict): txt.append(f"\n") txt.append("\n") txt.append("\n") txt.append("
{str(prop)}{valarr.encode('utf8')}{valarr}{prop}\n") else: txt.append("
\n") txt.extend(dict_to_table(val, lev + 1, width - 1)) txt.append("{val.encode('utf8')}{val}
{prop}\n") txt.extend(dict_to_table(valarr, lev + 1, width - 1)) txt.append("
\n") return txt def _expiration(timeout, tformat=None): if timeout == "now": return time_util.instant(tformat) else: # validity time should match lifetime of assertions return time_util.in_a_while(minutes=timeout, format=tformat) def delete_cookie(environ, name): kaka = environ.get("HTTP_COOKIE", "") if kaka: cookie_obj = SimpleCookie(kaka) morsel = cookie_obj.get(name, None) cookie = SimpleCookie() cookie[name] = morsel cookie[name]["expires"] = _expiration("now", "%a, %d-%b-%Y %H:%M:%S CET") return tuple(cookie.output().split(": ", 1)) return None # ---------------------------------------------------------------------------- # noinspection PyUnusedLocal def whoami(environ, start_response, user): nameid = environ["repoze.who.identity"]["login"] ava = environ["repoze.who.identity"]["user"] if not nameid: return not_authn(environ, start_response) if ava: response = ["

Your identity is supposed to be

"] response.extend(dict_to_table(ava)) else: response = ["

The system did not return any information about you

"] response.extend("Logout") resp = Response(response) return resp(environ, start_response) # noinspection PyUnusedLocal def not_found(environ, start_response): """Called if no URL matches.""" resp = NotFound("Not Found") return resp(environ, start_response) # noinspection PyUnusedLocal def not_authn(environ, start_response): resp = Unauthorized("Unknown user") return resp(environ, start_response) # noinspection PyUnusedLocal def slo(environ, start_response, user): # so here I might get either a LogoutResponse or a LogoutRequest client = environ["repoze.who.plugins"]["saml2auth"] sc = client.saml_client if "QUERY_STRING" in environ: query = parse_qs(environ["QUERY_STRING"]) logger.info("query: %s", query) try: response = sc.parse_logout_request_response(query["SAMLResponse"][0], binding=BINDING_HTTP_REDIRECT) if response: logger.info("LOGOUT response parsed OK") except KeyError: # return error reply response = None if response is None: request = sc.lo headers = [] delco = delete_cookie(environ, "pysaml2") if delco: headers.append(delco) resp = Redirect("/done", headers=headers) return resp(environ, start_response) # noinspection PyUnusedLocal def logout(environ, start_response, user): # This is where it starts when a user wants to log out client = environ["repoze.who.plugins"]["saml2auth"] subject_id = environ["repoze.who.identity"]["repoze.who.userid"] logger.info("[logout] subject_id: '%s'", subject_id) target = "/done" # What if more than one _dict = client.saml_client.global_logout(subject_id) logger.info("[logout] global_logout > %s", _dict) rem = environ["repoze.who.plugins"][client.rememberer_name] rem.forget(environ, subject_id) for key, item in _dict.items(): if isinstance(item, tuple): binding, htargs = item else: # result from logout, should be OK pass resp = Redirect("Successful Logout", headers=[("Location", target)]) return resp(environ, start_response) # else: # resp = ServiceError("Failed to logout from identity services") # start_response("500 Internal Server Error") # return [] # noinspection PyUnusedLocal def done(environ, start_response, user): # remove cookie and stored info logger.info("[done] environ: %s", environ) subject_id = environ["repoze.who.identity"]["repoze.who.userid"] client = environ["repoze.who.plugins"]["saml2auth"] logger.info("[logout done] remaining subjects: %s", client.saml_client.users.subjects()) start_response("200 OK", [("Content-Type", "text/html")]) return ["

You are now logged out from this service

"] # ---------------------------------------------------------------------------- # map urls to functions urls = [ (r"whoami$", whoami), (r"logout$", logout), (r"done$", done), (r"slo$", slo), (r"^$", whoami), ] # ---------------------------------------------------------------------------- def metadata(environ, start_response): try: path = args.path if path is None or len(path) == 0: path = os.path.dirname(os.path.abspath(__file__)) if path[-1] != "/": path += "/" metadata = create_metadata_string( f"{path}sp_conf.py", None, args.valid, args.cert, args.keyfile, args.id, args.name, args.sign ) start_response("200 OK", [("Content-Type", "text/xml")]) return metadata except Exception as ex: logger.error("An error occured while creating metadata: %s", ex.message) return not_found(environ, start_response) def application(environ, start_response): """ The main WSGI application. Dispatch the current request to the functions from above and store the regular expression captures in the WSGI environment as `myapp.url_args` so that the functions from above can access the url placeholders. If nothing matches, call the `not_found` function. :param environ: The HTTP application environment :param start_response: The application to run when the handling of the request is done :return: The response as a list of lines """ path = environ.get("PATH_INFO", "").lstrip("/") logger.info(" PATH: %s", path) if path == "metadata": return metadata(environ, start_response) user = environ.get("REMOTE_USER", "") if not user: user = environ.get("repoze.who.identity", "") logger.info("repoze.who.identity: '%s'", user) else: logger.info("REMOTE_USER: '%s'", user) # logger.info(logging.Logger.manager.loggerDict) for regex, callback in urls: if user: match = re.search(regex, path) if match is not None: try: environ["myapp.url_args"] = match.groups()[0] except IndexError: environ["myapp.url_args"] = path return callback(environ, start_response, user) else: return not_authn(environ, start_response) return not_found(environ, start_response) # ---------------------------------------------------------------------------- from repoze.who.config import make_middleware_with_config app_with_auth = make_middleware_with_config(application, {"here": "."}, "./who.ini", log_file="repoze_who.log") # ---------------------------------------------------------------------------- HOST = sp_conf.HOST PORT = sp_conf.PORT # allow uwsgi or gunicorn mount # by moving some initialization out of __name__ == '__main__' section. # uwsgi -s 0.0.0.0:8087 --protocol http --callable app_with_auth --module idp if __name__ == "__main__": # make_metadata arguments parser = argparse.ArgumentParser() parser.add_argument("-p", dest="path", help="Path to configuration file.") parser.add_argument( "-v", dest="valid", default="4", help="How long, in days, the metadata is valid from the time of creation" ) parser.add_argument("-c", dest="cert", help="certificate") parser.add_argument("-i", dest="id", help="The ID of the entities descriptor in the metadata") parser.add_argument("-k", dest="keyfile", help="A file with a key to sign the metadata with") parser.add_argument("-n", dest="name") parser.add_argument("-s", dest="sign", action="store_true", help="sign the metadata") args = parser.parse_args() from wsgiref.simple_server import make_server srv = make_server(HOST, PORT, app_with_auth) print(f"SP listening on {HOST}:{PORT}") srv.serve_forever()