1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
|
import logging
from urllib import urlencode
from urlparse import parse_qs
from urlparse import urlsplit
import time
from saml2.cipher import AES
from saml2.httputil import Response
from saml2.httputil import make_cookie
from saml2.httputil import Redirect
from saml2.httputil import Unauthorized
from saml2.httputil import parse_cookie
__author__ = 'rolandh'
logger = logging.getLogger(__name__)
class AuthnFailure(Exception):
pass
class UserAuthnMethod(object):
def __init__(self, srv):
self.srv = srv
def __call__(self, *args, **kwargs):
raise NotImplemented
def authenticated_as(self, **kwargs):
raise NotImplemented
def verify(self, **kwargs):
raise NotImplemented
def url_encode_params(params=None):
if not isinstance(params, dict):
raise Exception("You must pass in a dictionary!")
params_list = []
for k, v in params.items():
if isinstance(v, list):
params_list.extend([(k, x) for x in v])
else:
params_list.append((k, v))
return urlencode(params_list)
def create_return_url(base, query, **kwargs):
"""
Add a query string plus extra parameters to a base URL which may contain
a query part already.
:param base: redirect_uri may contain a query part, no fragment allowed.
:param query: Old query part as a string
:param kwargs: extra query parameters
:return:
"""
part = urlsplit(base)
if part.fragment:
raise ValueError("Base URL contained parts it shouldn't")
for key, values in parse_qs(query).items():
if key in kwargs:
if isinstance(kwargs[key], basestring):
kwargs[key] = [kwargs[key]]
kwargs[key].extend(values)
else:
kwargs[key] = values
if part.query:
for key, values in parse_qs(part.query).items():
if key in kwargs:
if isinstance(kwargs[key], basestring):
kwargs[key] = [kwargs[key]]
kwargs[key].extend(values)
else:
kwargs[key] = values
_pre = base.split("?")[0]
else:
_pre = base
logger.debug("kwargs: %s" % kwargs)
return "%s?%s" % (_pre, url_encode_params(kwargs))
class UsernamePasswordMako(UserAuthnMethod):
"""Do user authentication using the normal username password form
using Mako as template system"""
cookie_name = "userpassmako"
def __init__(self, srv, mako_template, template_lookup, pwd, return_to):
"""
:param srv: The server instance
:param mako_template: Which Mako template to use
:param pwd: Username/password dictionary like database
:param return_to: Where to send the user after authentication
:return:
"""
UserAuthnMethod.__init__(self, srv)
self.mako_template = mako_template
self.template_lookup = template_lookup
self.passwd = pwd
self.return_to = return_to
self.active = {}
self.query_param = "upm_answer"
self.aes = AES(srv.iv)
def __call__(self, cookie=None, policy_url=None, logo_url=None,
query="", **kwargs):
"""
Put up the login form
"""
if cookie:
headers = [cookie]
else:
headers = []
resp = Response(headers=headers)
argv = {"login": "",
"password": "",
"action": "verify",
"policy_url": policy_url,
"logo_url": logo_url,
"query": query}
logger.info("do_authentication argv: %s" % argv)
mte = self.template_lookup.get_template(self.mako_template)
resp.message = mte.render(**argv)
return resp
def verify(self, request, **kwargs):
"""
Verifies that the given username and password was correct
:param request: Either the query part of a URL a urlencoded
body of a HTTP message or a parse such.
:param kwargs: Catch whatever else is sent.
:return: redirect back to where ever the base applications
wants the user after authentication.
"""
logger.debug("verify(%s)" % request)
if isinstance(request, basestring):
_dict = parse_qs(request)
elif isinstance(request, dict):
_dict = request
else:
raise ValueError("Wrong type of input")
logger.debug("dict: %s" % _dict)
logger.debug("passwd: %s" % self.passwd)
# verify username and password
try:
assert _dict["password"][0] == self.passwd[_dict["login"][0]]
timestamp = str(int(time.mktime(time.gmtime())))
info = self.aes.encrypt(self.srv.symkey,
"::".join([_dict["login"][0], timestamp]))
self.active[info] = timestamp
cookie = make_cookie(self.cookie_name, info, self.srv.seed,
expire=0, domain="", path="")
return_to = create_return_url(self.return_to, _dict["query"][0],
**{self.query_param: "true"})
resp = Redirect(return_to, headers=[cookie])
except (AssertionError, KeyError):
resp = Unauthorized("Unknown user or wrong password")
return resp
def authenticated_as(self, cookie=None, **kwargs):
if cookie is None:
return None
else:
logger.debug("kwargs: %s" % kwargs)
try:
info, timestamp = parse_cookie(self.cookie_name,
self.srv.seed, cookie)
if self.active[info] == timestamp:
uid, _ts = self.aes.decrypt(self.srv.symkey,
info).split("::")
if timestamp == _ts:
return {"uid": uid}
except Exception:
pass
return None
def done(self, areq):
try:
_ = areq[self.query_param]
return False
except KeyError:
return True
class SocialService(UserAuthnMethod):
def __init__(self, social):
self.social = social
def __call__(self, server_env, cookie=None, sid="", query="", **kwargs):
return self.social.begin(server_env, cookie, sid, query)
def callback(self, server_env, cookie=None, sid="", query="", **kwargs):
return self.social.callback(server_env, cookie, sid, query, **kwargs)
class AuthnMethodChooser(object):
def __init__(self, methods=None):
self.methods = methods
def __call__(self, **kwargs):
if not self.methods:
raise Exception("No authentication methods defined")
elif len(self.methods) == 1:
return self.methods[0]
else:
pass # TODO
|