summaryrefslogtreecommitdiff
path: root/tests/test_75_mongodb.py
blob: 51ef67c9f174bd05d3d0cc82c3c3dd3a102ea0f7 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
from contextlib import closing

from pymongo.errors import ConnectionFailure
from pymongo.errors import ServerSelectionTimeoutError
import pytest

from saml2 import BINDING_HTTP_POST
from saml2.authn_context import INTERNETPROTOCOLPASSWORD
from saml2.client import Saml2Client
from saml2.mongo_store import EptidMDB
from saml2.server import Server


__author__ = "rolandh"


AUTHN = {"class_ref": INTERNETPROTOCOLPASSWORD, "authn_auth": "http://www.example.com/login"}


def _eq(l1, l2):
    return set(l1) == set(l2)


@pytest.mark.mongo
def test_flow():
    sp = Saml2Client(config_file="servera_conf")
    try:
        with closing(Server(config_file="idp_conf_mdb")) as idp1:
            with closing(Server(config_file="idp_conf_mdb")) as idp2:
                # clean out database
                idp1.ident.mdb.db.drop()

                # -- dummy request ---
                req_id, orig_req = sp.create_authn_request(idp1.config.entityid)

                # == Create an AuthnRequest response

                rinfo = idp1.response_args(orig_req, [BINDING_HTTP_POST])

                # name_id = idp1.ident.transient_nameid("id12", rinfo["sp_entity_id"])
                resp = idp1.create_authn_response(
                    {
                        "eduPersonEntitlement": "Short stop",
                        "surName": "Jeter",
                        "givenName": "Derek",
                        "mail": "derek.jeter@nyy.mlb.com",
                        "title": "The man",
                    },
                    userid="jeter",
                    authn=AUTHN,
                    **rinfo
                )

                # What's stored away is the assertion
                a_info = idp2.session_db.get_assertion(resp.assertion.id)
                # Make sure what I got back from MongoDB is the same as I put in
                assert a_info["assertion"] == resp.assertion

                # By subject
                nid = resp.assertion.subject.name_id
                _assertion = idp2.session_db.get_assertions_by_subject(nid)
                assert len(_assertion) == 1
                assert _assertion[0] == resp.assertion

                nids = idp2.ident.find_nameid("jeter")
                assert len(nids) == 1
    except ConnectionFailure:
        pass


@pytest.mark.mongo
def test_eptid_mongo_db():
    try:
        edb = EptidMDB("secret", "idp")
    except ConnectionFailure:
        pass
    else:
        try:
            e1 = edb.get("idp_entity_id", "sp_entity_id", "user_id", "some other data")
        except ServerSelectionTimeoutError:
            pass
        else:
            print(e1)
            assert e1.startswith("idp_entity_id!sp_entity_id!")
            e2 = edb.get("idp_entity_id", "sp_entity_id", "user_id", "some other data")
            assert e1 == e2

            e3 = edb.get("idp_entity_id", "sp_entity_id", "user_2", "some other data")
            print(e3)
            assert e1 != e3

            e4 = edb.get("idp_entity_id", "sp_entity_id2", "user_id", "some other data")
            assert e4 != e1
            assert e4 != e3


if __name__ == "__main__":
    test_flow()