summaryrefslogtreecommitdiff
path: root/baserock_openid_provider/openid_provider/views.py
blob: 1b8ef6d57d7c5e2b3d32a1458835c64fa9d0f486 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
# -*- coding: utf-8 -*-
# some code from http://www.djangosnippets.org/snippets/310/ by simon
# and from examples/djopenid from python-openid-2.2.4
import urlparse
import logging
from urllib import urlencode, quote

from django.conf import settings
from django.core.urlresolvers import reverse
from django.http import HttpResponse, HttpResponseRedirect, QueryDict
from django.shortcuts import render_to_response
from django.template import RequestContext
from django.utils.translation import ugettext as _

from django.utils.encoding import smart_str
try:
    from django.views.decorators.csrf import csrf_exempt
except ImportError:
    from django.contrib.csrf.middleware import csrf_exempt

from django.contrib.auth import REDIRECT_FIELD_NAME

from openid.association import default_negotiator, encrypted_negotiator
from openid.consumer.discover import OPENID_IDP_2_0_TYPE, OPENID_2_0_TYPE
from openid.extensions import sreg, ax
from openid.server.server import Server, BROWSER_REQUEST_MODES
from openid.yadis.constants import YADIS_CONTENT_TYPE

from openid_provider import conf
from openid_provider.utils import add_sreg_data, add_ax_data, get_store, \
    trust_root_validation, get_trust_session_key, prep_response
from openid_provider.models import TrustedRoot

logger = logging.getLogger(__name__)


# Special URL which means 'let the user choose whichever identity'.
IDENTIFIER_SELECT_URL = 'http://specs.openid.net/auth/2.0/identifier_select'


@csrf_exempt
def openid_server(request):
    """
    This view is the actual OpenID server - running at the URL pointed to by 
    the <link rel="openid.server"> tag. 
    """
    logger.debug('server request %s: %s',
                 request.method, request.POST or request.GET)
    server = openid_get_server(request)

    if not request.is_secure():
        # if request is not secure allow only encrypted association sessions
        server.negotiator = encrypted_negotiator

    # Clear AuthorizationInfo session var, if it is set
    if request.session.get('AuthorizationInfo', None):
        del request.session['AuthorizationInfo']

    if request.method == 'GET':
        querydict = dict(request.GET.items())
    elif request.method == 'POST':
        querydict = dict(request.POST.items())
    else:
        return HTTPResponseNotAllowed(['GET', 'POST'])

    orequest = server.decodeRequest(querydict)
    if not orequest:
        orequest = server.decodeRequest(request.session.get('OPENID_REQUEST', None))
        if orequest:
            # remove session stored data:
            del request.session['OPENID_REQUEST']
        else:
            # not request, render info page:
            data = {
                'host': request.build_absolute_uri('/'),
                'xrds_location': request.build_absolute_uri(
                    reverse('openid-provider-xrds')),
            }
            logger.debug('invalid request, sending info: %s', data)
            return render_to_response('openid_provider/server.html',
                                      data,
                                      context_instance=RequestContext(request))

    if orequest.mode in BROWSER_REQUEST_MODES:
        if not request.user.is_authenticated():
            logger.debug('no local authentication, sending landing page')
            return landing_page(request, orequest)

        openid = openid_is_authorized(request, orequest.identity,
                                      orequest.trust_root)

        # verify return_to:
        trust_root_valid = trust_root_validation(orequest)
        validated = False

        if conf.FAILED_DISCOVERY_AS_VALID:
            if trust_root_valid == 'DISCOVERY_FAILED':
                validated = True
        else:
            # if in decide already took place, set as valid:
            if request.session.get(get_trust_session_key(orequest), False):
                validated = True

        if openid is not None and (validated or trust_root_valid == 'Valid'):
            if orequest.identity == IDENTIFIER_SELECT_URL:
                id_url = request.build_absolute_uri(
                    reverse('openid-provider-identity', args=[openid.openid]))
            else:
                # We must return exactly the identity URL that was requested,
                # otherwise the openid.server module raises an error.
                id_url = orequest.identity

            oresponse = orequest.answer(True, identity=id_url)
            logger.debug('orequest.answer(True, identity="%s")', id_url)
        elif orequest.immediate:
            logger.debug('checkid_immediate mode not supported')
            raise Exception('checkid_immediate mode not supported')
        else:
            request.session['OPENID_REQUEST'] = orequest.message.toPostArgs()
            request.session['OPENID_TRUSTROOT_VALID'] = trust_root_valid
            logger.debug(
                'Set OPENID_REQUEST to %s in session %s',
                request.session['OPENID_REQUEST'], request.session)
            logger.debug(
                'Set OPENID_TRUSTROOT_VALID to %s in session %s',
                request.session['OPENID_TRUSTROOT_VALID'], request.session)
            logger.debug('redirecting to decide page')
            return HttpResponseRedirect(reverse('openid-provider-decide'))
    else:
        oresponse = server.handleRequest(orequest)
    if request.user.is_authenticated():
        add_sreg_data(request, orequest, oresponse)
        if conf.AX_EXTENSION:
            add_ax_data(request, orequest, oresponse)

    return prep_response(request, orequest, oresponse, server)

def openid_xrds(request, identity=False, id=None):
    if identity:
        types = [OPENID_2_0_TYPE]
    else:
        types = [OPENID_IDP_2_0_TYPE, sreg.ns_uri]
        if conf.AX_EXTENSION:
            types.append(ax.AXMessage.ns_uri)
    endpoints = [request.build_absolute_uri(reverse('openid-provider-root'))]
    return render_to_response('openid_provider/xrds.xml', {
        'host': request.build_absolute_uri('/'),
        'types': types,
        'endpoints': endpoints,
    }, context_instance=RequestContext(request), content_type=YADIS_CONTENT_TYPE)


def url_for_openid(request, openid):
    return request.build_absolute_uri(
        reverse('openid-provider-identity', args=[openid.openid]))


def openid_not_found_error_message(request, identity_url):
    ids = request.user.openid_set
    if ids.count() == 0:
        message = "You have no OpenIDs configured. Contact the administrator."
    else:
        id_urls = [url_for_openid(request, id) for id in ids.iterator()]
        id_urls = ', '.join(id_urls)
        if ids.count() != 1:
            message = "You somehow have multiple OpenIDs: " + id_urls
        else:
            message = "Your OpenID URL is: " + id_urls
    return "You do not have the OpenID '%s'. %s" % (identity_url, message)


def openid_decide(request):
    """
    The page that asks the user if they really want to sign in to the site, and
    lets them add the consumer to their trusted whitelist.
    # If user is logged in, ask if they want to trust this trust_root
    # If they are NOT logged in, show the landing page
    """
    server = openid_get_server(request)
    orequest = server.decodeRequest(request.session.get('OPENID_REQUEST'))
    trust_root_valid = request.session.get('OPENID_TRUSTROOT_VALID')

    logger.debug('Got OPENID_REQUEST %s, OPENID_TRUSTROOT_VALID %s from '
                  'session %s', orequest, trust_root_valid, request.session)

    if not request.user.is_authenticated():
        return landing_page(request, orequest)

    if orequest is None:
        # This isn't normal, but can occur if the user uses the 'back' button
        # or if the session data is otherwise lost for some reason.
        return error_page(
            request, "I've lost track of your session now. Sorry! Please go "
                     "back to the site you are logging in to with a Baserock "
                     "OpenID and, if you're not yet logged in, try again.")

    openid = openid_get_identity(request, orequest.identity)
    if openid is None:
        # User should only ever have one OpenID, created for them when they
        # registered.
        message = openid_not_found_error_message(request, orequest.identity)
        return error_page(request, message)

    if request.method == 'POST' and request.POST.get('decide_page', False):
        if request.POST.get('allow', False):
            TrustedRoot.objects.get_or_create(
                openid=openid, trust_root=orequest.trust_root)
            if not conf.FAILED_DISCOVERY_AS_VALID:
                request.session[get_trust_session_key(orequest)] = True
            return HttpResponseRedirect(reverse('openid-provider-root'))

        oresponse = orequest.answer(False)
        logger.debug('orequest.answer(False)')
        return prep_response(request, orequest, oresponse)

    return render_to_response('openid_provider/decide.html', {
        'title': _('Trust this site?'),
        'trust_root': orequest.trust_root,
        'trust_root_valid': trust_root_valid,
        'return_to': orequest.return_to,
        'identity': orequest.identity,
    }, context_instance=RequestContext(request))

def error_page(request, msg):
    return render_to_response('openid_provider/error.html', {
        'title': _('Error'),
        'msg': msg,
    }, context_instance=RequestContext(request))

class SafeQueryDict(QueryDict):
    """
    A custom QueryDict class that implements a urlencode method
    knowing how to excempt some characters as safe.

    Backported from Django 1.3
    """
    def urlencode(self, safe=None):
        output = []
        if safe:
            encode = lambda k, v: '%s=%s' % ((quote(k, safe), quote(v, safe)))
        else:
            encode = lambda k, v: urlencode({k: v})
        for k, list_ in self.lists():
            k = smart_str(k, self.encoding)
            output.extend([encode(k, smart_str(v, self.encoding))
                           for v in list_])
        return '&'.join(output)

def landing_page(request, orequest, login_url=None,
                 redirect_field_name=REDIRECT_FIELD_NAME):
    """
    The page shown when the user attempts to sign in somewhere using OpenID 
    but is not authenticated with the site. For idproxy.net, a message telling
    them to log in manually is displayed.
    """
    request.session['OPENID_REQUEST'] = orequest.message.toPostArgs()
    logger.debug(
        'Set OPENID_REQUEST to %s in session %s',
        request.session['OPENID_REQUEST'], request.session)
    if not login_url:
        login_url = settings.LOGIN_URL
    path = request.get_full_path()
    login_url_parts = list(urlparse.urlparse(login_url))
    if redirect_field_name:
        querystring = SafeQueryDict(login_url_parts[4], mutable=True)
        querystring[redirect_field_name] = path
        login_url_parts[4] = querystring.urlencode(safe='/')
    return HttpResponseRedirect(urlparse.urlunparse(login_url_parts))

def openid_is_authorized(request, identity_url, trust_root):
    """
    Check that they own the given identity URL, and that the trust_root is 
    in their whitelist of trusted sites.
    """
    if not request.user.is_authenticated():
        return None

    openid = openid_get_identity(request, identity_url)
    if openid is None:
        return None

    if openid.trustedroot_set.filter(trust_root=trust_root).count() < 1:
        return None

    return openid


def url_is_equivalent(a, b):
    """
    Test if two URLs are equivalent OpenIDs.
    """
    return a.rstrip('/') == b.rstrip('/')


def openid_get_identity(request, identity_url):
    """
    Select openid based on claim (identity_url).
    If none was claimed identity_url will be
    'http://specs.openid.net/auth/2.0/identifier_select'
    - in that case return default one
    - if user has no default one, return any
    - in other case return None!
    """
    logger.debug('Looking for %s in user %s set of OpenIDs %s',
                 identity_url, request.user, request.user.openid_set)
    for openid in request.user.openid_set.iterator():
        if url_is_equivalent(identity_url, url_for_openid(request, openid)):
            return openid
    if identity_url == IDENTIFIER_SELECT_URL:
        # no claim was made, choose user default openid:
        openids = request.user.openid_set.filter(default=True)
        if openids.count() == 1:
            return openids[0]
        if request.user.openid_set.count() > 0:
            return request.user.openid_set.all()[0]
    return None


def openid_get_server(request):
    return Server(
        get_store(request),
        op_endpoint=request.build_absolute_uri(
            reverse('openid-provider-root')))