diff options
Diffstat (limited to 'baserock_openid_provider/openid_provider/views.py')
-rw-r--r-- | baserock_openid_provider/openid_provider/views.py | 323 |
1 files changed, 0 insertions, 323 deletions
diff --git a/baserock_openid_provider/openid_provider/views.py b/baserock_openid_provider/openid_provider/views.py deleted file mode 100644 index 1b8ef6d5..00000000 --- a/baserock_openid_provider/openid_provider/views.py +++ /dev/null @@ -1,323 +0,0 @@ -# -*- coding: utf-8 -*- -# some code from http://www.djangosnippets.org/snippets/310/ by simon -# and from examples/djopenid from python-openid-2.2.4 -import urlparse -import logging -from urllib import urlencode, quote - -from django.conf import settings -from django.core.urlresolvers import reverse -from django.http import HttpResponse, HttpResponseRedirect, QueryDict -from django.shortcuts import render_to_response -from django.template import RequestContext -from django.utils.translation import ugettext as _ - -from django.utils.encoding import smart_str -try: - from django.views.decorators.csrf import csrf_exempt -except ImportError: - from django.contrib.csrf.middleware import csrf_exempt - -from django.contrib.auth import REDIRECT_FIELD_NAME - -from openid.association import default_negotiator, encrypted_negotiator -from openid.consumer.discover import OPENID_IDP_2_0_TYPE, OPENID_2_0_TYPE -from openid.extensions import sreg, ax -from openid.server.server import Server, BROWSER_REQUEST_MODES -from openid.yadis.constants import YADIS_CONTENT_TYPE - -from openid_provider import conf -from openid_provider.utils import add_sreg_data, add_ax_data, get_store, \ - trust_root_validation, get_trust_session_key, prep_response -from openid_provider.models import TrustedRoot - -logger = logging.getLogger(__name__) - - -# Special URL which means 'let the user choose whichever identity'. -IDENTIFIER_SELECT_URL = 'http://specs.openid.net/auth/2.0/identifier_select' - - -@csrf_exempt -def openid_server(request): - """ - This view is the actual OpenID server - running at the URL pointed to by - the <link rel="openid.server"> tag. - """ - logger.debug('server request %s: %s', - request.method, request.POST or request.GET) - server = openid_get_server(request) - - if not request.is_secure(): - # if request is not secure allow only encrypted association sessions - server.negotiator = encrypted_negotiator - - # Clear AuthorizationInfo session var, if it is set - if request.session.get('AuthorizationInfo', None): - del request.session['AuthorizationInfo'] - - if request.method == 'GET': - querydict = dict(request.GET.items()) - elif request.method == 'POST': - querydict = dict(request.POST.items()) - else: - return HTTPResponseNotAllowed(['GET', 'POST']) - - orequest = server.decodeRequest(querydict) - if not orequest: - orequest = server.decodeRequest(request.session.get('OPENID_REQUEST', None)) - if orequest: - # remove session stored data: - del request.session['OPENID_REQUEST'] - else: - # not request, render info page: - data = { - 'host': request.build_absolute_uri('/'), - 'xrds_location': request.build_absolute_uri( - reverse('openid-provider-xrds')), - } - logger.debug('invalid request, sending info: %s', data) - return render_to_response('openid_provider/server.html', - data, - context_instance=RequestContext(request)) - - if orequest.mode in BROWSER_REQUEST_MODES: - if not request.user.is_authenticated(): - logger.debug('no local authentication, sending landing page') - return landing_page(request, orequest) - - openid = openid_is_authorized(request, orequest.identity, - orequest.trust_root) - - # verify return_to: - trust_root_valid = trust_root_validation(orequest) - validated = False - - if conf.FAILED_DISCOVERY_AS_VALID: - if trust_root_valid == 'DISCOVERY_FAILED': - validated = True - else: - # if in decide already took place, set as valid: - if request.session.get(get_trust_session_key(orequest), False): - validated = True - - if openid is not None and (validated or trust_root_valid == 'Valid'): - if orequest.identity == IDENTIFIER_SELECT_URL: - id_url = request.build_absolute_uri( - reverse('openid-provider-identity', args=[openid.openid])) - else: - # We must return exactly the identity URL that was requested, - # otherwise the openid.server module raises an error. - id_url = orequest.identity - - oresponse = orequest.answer(True, identity=id_url) - logger.debug('orequest.answer(True, identity="%s")', id_url) - elif orequest.immediate: - logger.debug('checkid_immediate mode not supported') - raise Exception('checkid_immediate mode not supported') - else: - request.session['OPENID_REQUEST'] = orequest.message.toPostArgs() - request.session['OPENID_TRUSTROOT_VALID'] = trust_root_valid - logger.debug( - 'Set OPENID_REQUEST to %s in session %s', - request.session['OPENID_REQUEST'], request.session) - logger.debug( - 'Set OPENID_TRUSTROOT_VALID to %s in session %s', - request.session['OPENID_TRUSTROOT_VALID'], request.session) - logger.debug('redirecting to decide page') - return HttpResponseRedirect(reverse('openid-provider-decide')) - else: - oresponse = server.handleRequest(orequest) - if request.user.is_authenticated(): - add_sreg_data(request, orequest, oresponse) - if conf.AX_EXTENSION: - add_ax_data(request, orequest, oresponse) - - return prep_response(request, orequest, oresponse, server) - -def openid_xrds(request, identity=False, id=None): - if identity: - types = [OPENID_2_0_TYPE] - else: - types = [OPENID_IDP_2_0_TYPE, sreg.ns_uri] - if conf.AX_EXTENSION: - types.append(ax.AXMessage.ns_uri) - endpoints = [request.build_absolute_uri(reverse('openid-provider-root'))] - return render_to_response('openid_provider/xrds.xml', { - 'host': request.build_absolute_uri('/'), - 'types': types, - 'endpoints': endpoints, - }, context_instance=RequestContext(request), content_type=YADIS_CONTENT_TYPE) - - -def url_for_openid(request, openid): - return request.build_absolute_uri( - reverse('openid-provider-identity', args=[openid.openid])) - - -def openid_not_found_error_message(request, identity_url): - ids = request.user.openid_set - if ids.count() == 0: - message = "You have no OpenIDs configured. Contact the administrator." - else: - id_urls = [url_for_openid(request, id) for id in ids.iterator()] - id_urls = ', '.join(id_urls) - if ids.count() != 1: - message = "You somehow have multiple OpenIDs: " + id_urls - else: - message = "Your OpenID URL is: " + id_urls - return "You do not have the OpenID '%s'. %s" % (identity_url, message) - - -def openid_decide(request): - """ - The page that asks the user if they really want to sign in to the site, and - lets them add the consumer to their trusted whitelist. - # If user is logged in, ask if they want to trust this trust_root - # If they are NOT logged in, show the landing page - """ - server = openid_get_server(request) - orequest = server.decodeRequest(request.session.get('OPENID_REQUEST')) - trust_root_valid = request.session.get('OPENID_TRUSTROOT_VALID') - - logger.debug('Got OPENID_REQUEST %s, OPENID_TRUSTROOT_VALID %s from ' - 'session %s', orequest, trust_root_valid, request.session) - - if not request.user.is_authenticated(): - return landing_page(request, orequest) - - if orequest is None: - # This isn't normal, but can occur if the user uses the 'back' button - # or if the session data is otherwise lost for some reason. - return error_page( - request, "I've lost track of your session now. Sorry! Please go " - "back to the site you are logging in to with a Baserock " - "OpenID and, if you're not yet logged in, try again.") - - openid = openid_get_identity(request, orequest.identity) - if openid is None: - # User should only ever have one OpenID, created for them when they - # registered. - message = openid_not_found_error_message(request, orequest.identity) - return error_page(request, message) - - if request.method == 'POST' and request.POST.get('decide_page', False): - if request.POST.get('allow', False): - TrustedRoot.objects.get_or_create( - openid=openid, trust_root=orequest.trust_root) - if not conf.FAILED_DISCOVERY_AS_VALID: - request.session[get_trust_session_key(orequest)] = True - return HttpResponseRedirect(reverse('openid-provider-root')) - - oresponse = orequest.answer(False) - logger.debug('orequest.answer(False)') - return prep_response(request, orequest, oresponse) - - return render_to_response('openid_provider/decide.html', { - 'title': _('Trust this site?'), - 'trust_root': orequest.trust_root, - 'trust_root_valid': trust_root_valid, - 'return_to': orequest.return_to, - 'identity': orequest.identity, - }, context_instance=RequestContext(request)) - -def error_page(request, msg): - return render_to_response('openid_provider/error.html', { - 'title': _('Error'), - 'msg': msg, - }, context_instance=RequestContext(request)) - -class SafeQueryDict(QueryDict): - """ - A custom QueryDict class that implements a urlencode method - knowing how to excempt some characters as safe. - - Backported from Django 1.3 - """ - def urlencode(self, safe=None): - output = [] - if safe: - encode = lambda k, v: '%s=%s' % ((quote(k, safe), quote(v, safe))) - else: - encode = lambda k, v: urlencode({k: v}) - for k, list_ in self.lists(): - k = smart_str(k, self.encoding) - output.extend([encode(k, smart_str(v, self.encoding)) - for v in list_]) - return '&'.join(output) - -def landing_page(request, orequest, login_url=None, - redirect_field_name=REDIRECT_FIELD_NAME): - """ - The page shown when the user attempts to sign in somewhere using OpenID - but is not authenticated with the site. For idproxy.net, a message telling - them to log in manually is displayed. - """ - request.session['OPENID_REQUEST'] = orequest.message.toPostArgs() - logger.debug( - 'Set OPENID_REQUEST to %s in session %s', - request.session['OPENID_REQUEST'], request.session) - if not login_url: - login_url = settings.LOGIN_URL - path = request.get_full_path() - login_url_parts = list(urlparse.urlparse(login_url)) - if redirect_field_name: - querystring = SafeQueryDict(login_url_parts[4], mutable=True) - querystring[redirect_field_name] = path - login_url_parts[4] = querystring.urlencode(safe='/') - return HttpResponseRedirect(urlparse.urlunparse(login_url_parts)) - -def openid_is_authorized(request, identity_url, trust_root): - """ - Check that they own the given identity URL, and that the trust_root is - in their whitelist of trusted sites. - """ - if not request.user.is_authenticated(): - return None - - openid = openid_get_identity(request, identity_url) - if openid is None: - return None - - if openid.trustedroot_set.filter(trust_root=trust_root).count() < 1: - return None - - return openid - - -def url_is_equivalent(a, b): - """ - Test if two URLs are equivalent OpenIDs. - """ - return a.rstrip('/') == b.rstrip('/') - - -def openid_get_identity(request, identity_url): - """ - Select openid based on claim (identity_url). - If none was claimed identity_url will be - 'http://specs.openid.net/auth/2.0/identifier_select' - - in that case return default one - - if user has no default one, return any - - in other case return None! - """ - logger.debug('Looking for %s in user %s set of OpenIDs %s', - identity_url, request.user, request.user.openid_set) - for openid in request.user.openid_set.iterator(): - if url_is_equivalent(identity_url, url_for_openid(request, openid)): - return openid - if identity_url == IDENTIFIER_SELECT_URL: - # no claim was made, choose user default openid: - openids = request.user.openid_set.filter(default=True) - if openids.count() == 1: - return openids[0] - if request.user.openid_set.count() > 0: - return request.user.openid_set.all()[0] - return None - - -def openid_get_server(request): - return Server( - get_store(request), - op_endpoint=request.build_absolute_uri( - reverse('openid-provider-root'))) |