summaryrefslogtreecommitdiff
path: root/openstack_auth
diff options
context:
space:
mode:
authorRob Cresswell <robert.cresswell@outlook.com>2017-07-11 14:19:57 +0100
committerRob Cresswell <robert.cresswell@outlook.com>2017-09-27 12:06:57 +0100
commite3e5812b1941ba689849630a0f6e5ffd7f598e1c (patch)
tree4516266b6ba69d3ce34b399a22de1cd938db9f3b /openstack_auth
parentfc3f3195ec7a8510dc6720a257dabf33ad5449a2 (diff)
downloadhorizon-e3e5812b1941ba689849630a0f6e5ffd7f598e1c.tar.gz
Add Django OpenStack Auth to Horizon
Moves Django OpenStack Auth content to Horizon, since they are so tightly coupled. This cleans up the development workflow and should make keystone / auth related contributions easier. Implements: blueprint merge-openstack-auth Change-Id: Ia1cdc47bad1ca6e633073a9f9445b0c7f70d05bc
Diffstat (limited to 'openstack_auth')
-rw-r--r--openstack_auth/__init__.py0
-rw-r--r--openstack_auth/backend.py283
-rw-r--r--openstack_auth/exceptions.py17
-rw-r--r--openstack_auth/forms.py154
-rw-r--r--openstack_auth/models.py18
-rw-r--r--openstack_auth/plugin/__init__.py22
-rw-r--r--openstack_auth/plugin/base.py241
-rw-r--r--openstack_auth/plugin/k2k.py107
-rw-r--r--openstack_auth/plugin/password.py51
-rw-r--r--openstack_auth/plugin/token.py41
-rw-r--r--openstack_auth/policy.py231
-rw-r--r--openstack_auth/tests/__init__.py0
-rw-r--r--openstack_auth/tests/conf/keystone_policy.json146
-rw-r--r--openstack_auth/tests/conf/nova_policy.json274
-rw-r--r--openstack_auth/tests/conf/policy.v3cloudsample.json195
-rw-r--r--openstack_auth/tests/data_v2.py143
-rw-r--r--openstack_auth/tests/data_v3.py361
-rw-r--r--openstack_auth/tests/models.py0
-rw-r--r--openstack_auth/tests/run_tests.py43
-rw-r--r--openstack_auth/tests/settings.py76
-rw-r--r--openstack_auth/tests/templates/auth/blank.html0
-rw-r--r--openstack_auth/tests/templates/auth/login.html11
-rw-r--r--openstack_auth/tests/tests.py1522
-rw-r--r--openstack_auth/tests/urls.py29
-rw-r--r--openstack_auth/urls.py36
-rw-r--r--openstack_auth/user.py454
-rw-r--r--openstack_auth/utils.py562
-rw-r--r--openstack_auth/views.py327
28 files changed, 5344 insertions, 0 deletions
diff --git a/openstack_auth/__init__.py b/openstack_auth/__init__.py
new file mode 100644
index 000000000..e69de29bb
--- /dev/null
+++ b/openstack_auth/__init__.py
diff --git a/openstack_auth/backend.py b/openstack_auth/backend.py
new file mode 100644
index 000000000..5648cc216
--- /dev/null
+++ b/openstack_auth/backend.py
@@ -0,0 +1,283 @@
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+""" Module defining the Django auth backend class for the Keystone API. """
+
+import datetime
+import logging
+
+import pytz
+
+from django.conf import settings
+from django.utils.module_loading import import_string
+from django.utils.translation import ugettext_lazy as _
+
+from openstack_auth import exceptions
+from openstack_auth import user as auth_user
+from openstack_auth import utils
+
+
+LOG = logging.getLogger(__name__)
+
+
+KEYSTONE_CLIENT_ATTR = "_keystoneclient"
+
+
+class KeystoneBackend(object):
+ """Django authentication backend for use with ``django.contrib.auth``."""
+
+ def __init__(self):
+ self._auth_plugins = None
+
+ @property
+ def auth_plugins(self):
+ if self._auth_plugins is None:
+ plugins = getattr(
+ settings,
+ 'AUTHENTICATION_PLUGINS',
+ ['openstack_auth.plugin.password.PasswordPlugin',
+ 'openstack_auth.plugin.token.TokenPlugin'])
+
+ self._auth_plugins = [import_string(p)() for p in plugins]
+
+ return self._auth_plugins
+
+ def check_auth_expiry(self, auth_ref, margin=None):
+ if not utils.is_token_valid(auth_ref, margin):
+ msg = _("The authentication token issued by the Identity service "
+ "has expired.")
+ LOG.warning("The authentication token issued by the Identity "
+ "service appears to have expired before it was "
+ "issued. This may indicate a problem with either your "
+ "server or client configuration.")
+ raise exceptions.KeystoneAuthException(msg)
+ return True
+
+ def get_user(self, user_id):
+ """Returns the current user from the session data.
+
+ If authenticated, this return the user object based on the user ID
+ and session data.
+
+ .. note::
+
+ This required monkey-patching the ``contrib.auth`` middleware
+ to make the ``request`` object available to the auth backend class.
+
+ """
+ if (hasattr(self, 'request') and
+ user_id == self.request.session["user_id"]):
+ token = self.request.session['token']
+ endpoint = self.request.session['region_endpoint']
+ services_region = self.request.session['services_region']
+ user = auth_user.create_user_from_token(self.request, token,
+ endpoint, services_region)
+ return user
+ else:
+ return None
+
+ def authenticate(self, auth_url=None, **kwargs):
+ """Authenticates a user via the Keystone Identity API."""
+ LOG.debug('Beginning user authentication')
+
+ if not auth_url:
+ auth_url = settings.OPENSTACK_KEYSTONE_URL
+
+ auth_url, url_fixed = utils.fix_auth_url_version_prefix(auth_url)
+ if url_fixed:
+ LOG.warning("The OPENSTACK_KEYSTONE_URL setting points to a v2.0 "
+ "Keystone endpoint, but v3 is specified as the API "
+ "version to use by Horizon. Using v3 endpoint for "
+ "authentication.")
+
+ for plugin in self.auth_plugins:
+ unscoped_auth = plugin.get_plugin(auth_url=auth_url, **kwargs)
+
+ if unscoped_auth:
+ break
+ else:
+ msg = _('No authentication backend could be determined to '
+ 'handle the provided credentials.')
+ LOG.warning('No authentication backend could be determined to '
+ 'handle the provided credentials. This is likely a '
+ 'configuration error that should be addressed.')
+ raise exceptions.KeystoneAuthException(msg)
+
+ # the recent project id a user might have set in a cookie
+ recent_project = None
+ request = kwargs.get('request')
+ if request:
+ # Grab recent_project found in the cookie, try to scope
+ # to the last project used.
+ recent_project = request.COOKIES.get('recent_project')
+ unscoped_auth_ref = plugin.get_access_info(unscoped_auth)
+
+ # Check expiry for our unscoped auth ref.
+ self.check_auth_expiry(unscoped_auth_ref)
+
+ domain_name = kwargs.get('user_domain_name', None)
+ domain_auth, domain_auth_ref = plugin.get_domain_scoped_auth(
+ unscoped_auth, unscoped_auth_ref, domain_name)
+ scoped_auth, scoped_auth_ref = plugin.get_project_scoped_auth(
+ unscoped_auth, unscoped_auth_ref, recent_project=recent_project)
+
+ # Abort if there are no projects for this user and a valid domain
+ # token has not been obtained
+ #
+ # The valid use cases for a user login are:
+ # Keystone v2: user must have a role on a project and be able
+ # to obtain a project scoped token
+ # Keystone v3: 1) user can obtain a domain scoped token (user
+ # has a role on the domain they authenticated to),
+ # only, no roles on a project
+ # 2) user can obtain a domain scoped token and has
+ # a role on a project in the domain they
+ # authenticated to (and can obtain a project scoped
+ # token)
+ # 3) user cannot obtain a domain scoped token, but can
+ # obtain a project scoped token
+ if not scoped_auth_ref and domain_auth_ref:
+ # if the user can't obtain a project scoped token, set the scoped
+ # token to be the domain token, if valid
+ scoped_auth = domain_auth
+ scoped_auth_ref = domain_auth_ref
+ elif not scoped_auth_ref and not domain_auth_ref:
+ msg = _('You are not authorized for any projects.')
+ if utils.get_keystone_version() >= 3:
+ msg = _('You are not authorized for any projects or domains.')
+ raise exceptions.KeystoneAuthException(msg)
+
+ # Check expiry for our new scoped token.
+ self.check_auth_expiry(scoped_auth_ref)
+
+ # We want to try to use the same region we just logged into
+ # which may or may not be the default depending upon the order
+ # keystone uses
+ region_name = None
+ id_endpoints = scoped_auth_ref.service_catalog.\
+ get_endpoints(service_type='identity')
+ for id_endpoint in [cat for cat in id_endpoints['identity']]:
+ if auth_url in id_endpoint.values():
+ region_name = id_endpoint['region']
+ break
+
+ interface = getattr(settings, 'OPENSTACK_ENDPOINT_TYPE', 'public')
+
+ endpoint, url_fixed = utils.fix_auth_url_version_prefix(
+ scoped_auth_ref.service_catalog.url_for(
+ service_type='identity',
+ interface=interface,
+ region_name=region_name))
+ if url_fixed:
+ LOG.warning("The Keystone URL in service catalog points to a v2.0 "
+ "Keystone endpoint, but v3 is specified as the API "
+ "version to use by Horizon. Using v3 endpoint for "
+ "authentication.")
+
+ # If we made it here we succeeded. Create our User!
+ unscoped_token = unscoped_auth_ref.auth_token
+
+ user = auth_user.create_user_from_token(
+ request,
+ auth_user.Token(scoped_auth_ref, unscoped_token=unscoped_token),
+ endpoint,
+ services_region=region_name)
+
+ if request is not None:
+ # if no k2k providers exist then the function returns quickly
+ utils.store_initial_k2k_session(auth_url, request, scoped_auth_ref,
+ unscoped_auth_ref)
+ request.session['unscoped_token'] = unscoped_token
+ if domain_auth_ref:
+ # check django session engine, if using cookies, this will not
+ # work, as it will overflow the cookie so don't add domain
+ # scoped token to the session and put error in the log
+ if utils.using_cookie_backed_sessions():
+ LOG.error('Using signed cookies as SESSION_ENGINE with '
+ 'OPENSTACK_KEYSTONE_MULTIDOMAIN_SUPPORT is '
+ 'enabled. This disables the ability to '
+ 'perform identity operations due to cookie size '
+ 'constraints.')
+ else:
+ request.session['domain_token'] = domain_auth_ref
+
+ request.user = user
+ timeout = getattr(settings, "SESSION_TIMEOUT", 3600)
+ token_life = user.token.expires - datetime.datetime.now(pytz.utc)
+ session_time = min(timeout, int(token_life.total_seconds()))
+ request.session.set_expiry(session_time)
+
+ keystone_client_class = utils.get_keystone_client().Client
+ session = utils.get_session()
+ scoped_client = keystone_client_class(session=session,
+ auth=scoped_auth)
+
+ # Support client caching to save on auth calls.
+ setattr(request, KEYSTONE_CLIENT_ATTR, scoped_client)
+
+ LOG.debug('Authentication completed.')
+ return user
+
+ def get_group_permissions(self, user, obj=None):
+ """Returns an empty set since Keystone doesn't support "groups"."""
+ # Keystone V3 added "groups". The Auth token response includes the
+ # roles from the user's Group assignment. It should be fine just
+ # returning an empty set here.
+ return set()
+
+ def get_all_permissions(self, user, obj=None):
+ """Returns a set of permission strings that the user has.
+
+ This permission available to the user is derived from the user's
+ Keystone "roles".
+
+ The permissions are returned as ``"openstack.{{ role.name }}"``.
+ """
+ if user.is_anonymous() or obj is not None:
+ return set()
+ # TODO(gabrielhurley): Integrate policy-driven RBAC
+ # when supported by Keystone.
+ role_perms = {utils.get_role_permission(role['name'])
+ for role in user.roles}
+
+ services = []
+ for service in user.service_catalog:
+ try:
+ service_type = service['type']
+ except KeyError:
+ continue
+ service_regions = [utils.get_endpoint_region(endpoint) for endpoint
+ in service.get('endpoints', [])]
+ if user.services_region in service_regions:
+ services.append(service_type.lower())
+ service_perms = {"openstack.services.%s" % service
+ for service in services}
+ return role_perms | service_perms
+
+ def has_perm(self, user, perm, obj=None):
+ """Returns True if the given user has the specified permission."""
+ if not user.is_active:
+ return False
+ return perm in self.get_all_permissions(user, obj)
+
+ def has_module_perms(self, user, app_label):
+ """Returns True if user has any permissions in the given app_label.
+
+ Currently this matches for the app_label ``"openstack"``.
+ """
+ if not user.is_active:
+ return False
+ for perm in self.get_all_permissions(user):
+ if perm[:perm.index('.')] == app_label:
+ return True
+ return False
diff --git a/openstack_auth/exceptions.py b/openstack_auth/exceptions.py
new file mode 100644
index 000000000..37bbaf5b1
--- /dev/null
+++ b/openstack_auth/exceptions.py
@@ -0,0 +1,17 @@
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+class KeystoneAuthException(Exception):
+ """Generic error class to identify and catch our own errors."""
+ pass
diff --git a/openstack_auth/forms.py b/openstack_auth/forms.py
new file mode 100644
index 000000000..c7d0c51cf
--- /dev/null
+++ b/openstack_auth/forms.py
@@ -0,0 +1,154 @@
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import collections
+import logging
+
+from django.conf import settings
+from django.contrib.auth import authenticate
+from django.contrib.auth import forms as django_auth_forms
+from django import forms
+from django.utils.translation import ugettext_lazy as _
+from django.views.decorators.debug import sensitive_variables
+
+from openstack_auth import exceptions
+from openstack_auth import utils
+
+
+LOG = logging.getLogger(__name__)
+
+
+class Login(django_auth_forms.AuthenticationForm):
+ """Form used for logging in a user.
+
+ Handles authentication with Keystone by providing the domain name, username
+ and password. A scoped token is fetched after successful authentication.
+
+ A domain name is required if authenticating with Keystone V3 running
+ multi-domain configuration.
+
+ If the user authenticated has a default project set, the token will be
+ automatically scoped to their default project.
+
+ If the user authenticated has no default project set, the authentication
+ backend will try to scope to the projects returned from the user's assigned
+ projects. The first successful project scoped will be returned.
+
+ Inherits from the base ``django.contrib.auth.forms.AuthenticationForm``
+ class for added security features.
+ """
+ region = forms.ChoiceField(label=_("Region"), required=False)
+ username = forms.CharField(
+ label=_("User Name"),
+ widget=forms.TextInput(attrs={"autofocus": "autofocus"}))
+ password = forms.CharField(label=_("Password"),
+ widget=forms.PasswordInput(render_value=False))
+
+ def __init__(self, *args, **kwargs):
+ super(Login, self).__init__(*args, **kwargs)
+ fields_ordering = ['username', 'password', 'region']
+ if getattr(settings,
+ 'OPENSTACK_KEYSTONE_MULTIDOMAIN_SUPPORT',
+ False):
+ last_domain = self.request.COOKIES.get('login_domain', None)
+ if getattr(settings,
+ 'OPENSTACK_KEYSTONE_DOMAIN_DROPDOWN',
+ False):
+ self.fields['domain'] = forms.ChoiceField(
+ label=_("Domain"),
+ initial=last_domain,
+ required=True,
+ choices=getattr(settings,
+ 'OPENSTACK_KEYSTONE_DOMAIN_CHOICES',
+ ()))
+ else:
+ self.fields['domain'] = forms.CharField(
+ initial=last_domain,
+ label=_("Domain"),
+ required=True,
+ widget=forms.TextInput(attrs={"autofocus": "autofocus"}))
+ self.fields['username'].widget = forms.widgets.TextInput()
+ fields_ordering = ['domain', 'username', 'password', 'region']
+ self.fields['region'].choices = self.get_region_choices()
+ if len(self.fields['region'].choices) == 1:
+ self.fields['region'].initial = self.fields['region'].choices[0][0]
+ self.fields['region'].widget = forms.widgets.HiddenInput()
+ elif len(self.fields['region'].choices) > 1:
+ self.fields['region'].initial = self.request.COOKIES.get(
+ 'login_region')
+
+ # if websso is enabled and keystone version supported
+ # prepend the websso_choices select input to the form
+ if utils.is_websso_enabled():
+ initial = getattr(settings, 'WEBSSO_INITIAL_CHOICE', 'credentials')
+ self.fields['auth_type'] = forms.ChoiceField(
+ label=_("Authenticate using"),
+ choices=getattr(settings, 'WEBSSO_CHOICES', ()),
+ required=False,
+ initial=initial)
+ # add auth_type to the top of the list
+ fields_ordering.insert(0, 'auth_type')
+
+ # websso is enabled, but keystone version is not supported
+ elif getattr(settings, 'WEBSSO_ENABLED', False):
+ msg = ("Websso is enabled but horizon is not configured to work " +
+ "with keystone version 3 or above.")
+ LOG.warning(msg)
+ self.fields = collections.OrderedDict(
+ (key, self.fields[key]) for key in fields_ordering)
+
+ @staticmethod
+ def get_region_choices():
+ default_region = (settings.OPENSTACK_KEYSTONE_URL, "Default Region")
+ regions = getattr(settings, 'AVAILABLE_REGIONS', [])
+ if not regions:
+ regions = [default_region]
+ return regions
+
+ @sensitive_variables()
+ def clean(self):
+ default_domain = getattr(settings,
+ 'OPENSTACK_KEYSTONE_DEFAULT_DOMAIN',
+ 'Default')
+ username = self.cleaned_data.get('username')
+ password = self.cleaned_data.get('password')
+ region = self.cleaned_data.get('region')
+ domain = self.cleaned_data.get('domain', default_domain)
+
+ if not (username and password):
+ # Don't authenticate, just let the other validators handle it.
+ return self.cleaned_data
+
+ try:
+ self.user_cache = authenticate(request=self.request,
+ username=username,
+ password=password,
+ user_domain_name=domain,
+ auth_url=region)
+ msg = 'Login successful for user "%(username)s", remote address '\
+ '%(remote_ip)s.' % {
+ 'username': username,
+ 'remote_ip': utils.get_client_ip(self.request)
+ }
+ LOG.info(msg)
+ except exceptions.KeystoneAuthException as exc:
+ msg = 'Login failed for user "%(username)s", remote address '\
+ '%(remote_ip)s.' % {
+ 'username': username,
+ 'remote_ip': utils.get_client_ip(self.request)
+ }
+ LOG.warning(msg)
+ raise forms.ValidationError(exc)
+ if hasattr(self, 'check_for_test_cookie'): # Dropped in django 1.7
+ self.check_for_test_cookie()
+ return self.cleaned_data
diff --git a/openstack_auth/models.py b/openstack_auth/models.py
new file mode 100644
index 000000000..7316aacbe
--- /dev/null
+++ b/openstack_auth/models.py
@@ -0,0 +1,18 @@
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# import the User model in here so Django can find it
+from openstack_auth.user import User
+
+
+__all__ = ['User']
diff --git a/openstack_auth/plugin/__init__.py b/openstack_auth/plugin/__init__.py
new file mode 100644
index 000000000..75e5e852e
--- /dev/null
+++ b/openstack_auth/plugin/__init__.py
@@ -0,0 +1,22 @@
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from openstack_auth.plugin.base import BasePlugin
+from openstack_auth.plugin.k2k import K2KAuthPlugin
+from openstack_auth.plugin.password import PasswordPlugin
+from openstack_auth.plugin.token import TokenPlugin
+
+
+__all__ = ['BasePlugin',
+ 'PasswordPlugin',
+ 'TokenPlugin',
+ 'K2KAuthPlugin']
diff --git a/openstack_auth/plugin/base.py b/openstack_auth/plugin/base.py
new file mode 100644
index 000000000..829a403c4
--- /dev/null
+++ b/openstack_auth/plugin/base.py
@@ -0,0 +1,241 @@
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import abc
+import logging
+
+from django.utils.translation import ugettext_lazy as _
+from keystoneauth1 import exceptions as keystone_exceptions
+from keystoneclient.v2_0 import client as v2_client
+from keystoneclient.v3 import client as v3_client
+import six
+
+from openstack_auth import exceptions
+from openstack_auth import utils
+
+LOG = logging.getLogger(__name__)
+__all__ = ['BasePlugin']
+
+
+@six.add_metaclass(abc.ABCMeta)
+class BasePlugin(object):
+ """Base plugin to provide ways to log in to dashboard.
+
+ Provides a framework for keystoneclient plugins that can be used with the
+ information provided to return an unscoped token.
+ """
+
+ @abc.abstractmethod
+ def get_plugin(self, auth_url=None, **kwargs):
+ """Create a new plugin to attempt to authenticate.
+
+ Given the information provided by the login providers attempt to create
+ an authentication plugin that can be used to authenticate the user.
+
+ If the provided login information does not contain enough information
+ for this plugin to proceed then it should return None.
+
+ :param str auth_url: The URL to authenticate against.
+
+ :returns: A plugin that will be used to authenticate or None if the
+ plugin cannot authenticate with the data provided.
+ :rtype: keystoneclient.auth.BaseAuthPlugin
+ """
+ return None
+
+ @property
+ def keystone_version(self):
+ """The Identity API version as specified in the settings file."""
+ return utils.get_keystone_version()
+
+ def list_projects(self, session, auth_plugin, auth_ref=None):
+ """List the projects that are accessible to this plugin.
+
+ Query the keystone server for all projects that this authentication
+ token can be rescoped to.
+
+ This function is overrideable by plugins if they use a non-standard
+ mechanism to determine projects.
+
+ :param session: A session object for communication:
+ :type session: keystoneclient.session.Session
+ :param auth_plugin: The auth plugin returned by :py:meth:`get_plugin`.
+ :type auth_plugin: keystoneclient.auth.BaseAuthPlugin
+ :param auth_ref: The current authentication data. This is optional as
+ future auth plugins may not have auth_ref data and all
+ the required information should be available via the
+ auth_plugin.
+ :type auth_ref: keystoneclient.access.AccessInfo` or None.
+
+ :raises: exceptions.KeystoneAuthException on lookup failure.
+
+ :returns: A list of projects. This currently accepts returning both v2
+ or v3 keystoneclient projects objects.
+ """
+ try:
+ if self.keystone_version >= 3:
+ client = v3_client.Client(session=session, auth=auth_plugin)
+ if auth_ref.is_federated:
+ return client.federation.projects.list()
+ else:
+ return client.projects.list(user=auth_ref.user_id)
+
+ else:
+ client = v2_client.Client(session=session, auth=auth_plugin)
+ return client.tenants.list()
+
+ except (keystone_exceptions.ClientException,
+ keystone_exceptions.AuthorizationFailure):
+ msg = _('Unable to retrieve authorized projects.')
+ raise exceptions.KeystoneAuthException(msg)
+
+ def list_domains(self, session, auth_plugin, auth_ref=None):
+ try:
+ if self.keystone_version >= 3:
+ client = v3_client.Client(session=session, auth=auth_plugin)
+ return client.auth.domains()
+ else:
+ return []
+ except (keystone_exceptions.ClientException,
+ keystone_exceptions.AuthorizationFailure):
+ msg = _('Unable to retrieve authorized domains.')
+ raise exceptions.KeystoneAuthException(msg)
+
+ def get_access_info(self, keystone_auth):
+ """Get the access info from an unscoped auth
+
+ This function provides the base functionality that the
+ plugins will use to authenticate and get the access info object.
+
+ :param keystone_auth: keystoneauth1 identity plugin
+ :raises: exceptions.KeystoneAuthException on auth failure
+ :returns: keystoneclient.access.AccessInfo
+ """
+ session = utils.get_session()
+
+ try:
+ unscoped_auth_ref = keystone_auth.get_access(session)
+ except keystone_exceptions.ConnectFailure as exc:
+ LOG.error(str(exc))
+ msg = _('Unable to establish connection to keystone endpoint.')
+ raise exceptions.KeystoneAuthException(msg)
+ except (keystone_exceptions.Unauthorized,
+ keystone_exceptions.Forbidden,
+ keystone_exceptions.NotFound) as exc:
+ LOG.debug(str(exc))
+ raise exceptions.KeystoneAuthException(_('Invalid credentials.'))
+ except (keystone_exceptions.ClientException,
+ keystone_exceptions.AuthorizationFailure) as exc:
+ msg = _("An error occurred authenticating. "
+ "Please try again later.")
+ LOG.debug(str(exc))
+ raise exceptions.KeystoneAuthException(msg)
+ return unscoped_auth_ref
+
+ def get_project_scoped_auth(self, unscoped_auth, unscoped_auth_ref,
+ recent_project=None):
+ """Get the project scoped keystone auth and access info
+
+ This function returns a project scoped keystone token plugin
+ and AccessInfo object.
+
+ :param unscoped_auth: keystone auth plugin
+ :param unscoped_auth_ref: keystoneclient.access.AccessInfo` or None.
+ :param recent_project: project that we should try to scope to
+ :return: keystone token auth plugin, AccessInfo object
+ """
+ auth_url = unscoped_auth.auth_url
+ session = utils.get_session()
+
+ projects = self.list_projects(
+ session, unscoped_auth, unscoped_auth_ref)
+ # Attempt to scope only to enabled projects
+ projects = [project for project in projects if project.enabled]
+
+ # if a most recent project was found, try using it first
+ if recent_project:
+ for pos, project in enumerate(projects):
+ if project.id == recent_project:
+ # move recent project to the beginning
+ projects.pop(pos)
+ projects.insert(0, project)
+ break
+
+ scoped_auth = None
+ scoped_auth_ref = None
+ for project in projects:
+ token = unscoped_auth_ref.auth_token
+ scoped_auth = utils.get_token_auth_plugin(auth_url,
+ token=token,
+ project_id=project.id)
+ try:
+ scoped_auth_ref = scoped_auth.get_access(session)
+ except (keystone_exceptions.ClientException,
+ keystone_exceptions.AuthorizationFailure):
+ LOG.info('Attempted scope to project %s failed, will attempt'
+ 'to scope to another project.', project.name)
+ pass
+ else:
+ break
+
+ return scoped_auth, scoped_auth_ref
+
+ def get_domain_scoped_auth(self, unscoped_auth, unscoped_auth_ref,
+ domain_name=None):
+ """Get the domain scoped keystone auth and access info
+
+ This function returns a domain scoped keystone token plugin
+ and AccessInfo object.
+
+ :param unscoped_auth: keystone auth plugin
+ :param unscoped_auth_ref: keystoneclient.access.AccessInfo` or None.
+ :param domain_name: domain that we should try to scope to
+ :return: keystone token auth plugin, AccessInfo object
+ """
+ session = utils.get_session()
+ auth_url = unscoped_auth.auth_url
+
+ if utils.get_keystone_version() < 3:
+ return None, None
+ if domain_name:
+ domains = [domain_name]
+ else:
+ domains = self.list_domains(session,
+ unscoped_auth,
+ unscoped_auth_ref)
+ domains = [domain.name for domain in domains if domain.enabled]
+
+ # domain support can require domain scoped tokens to perform
+ # identity operations depending on the policy files being used
+ # for keystone.
+ domain_auth = None
+ domain_auth_ref = None
+ for domain_name in domains:
+ token = unscoped_auth_ref.auth_token
+ domain_auth = utils.get_token_auth_plugin(
+ auth_url,
+ token,
+ domain_name=domain_name)
+ try:
+ domain_auth_ref = domain_auth.get_access(session)
+ except (keystone_exceptions.ClientException,
+ keystone_exceptions.AuthorizationFailure):
+ LOG.info('Attempted scope to domain %s failed, will attempt'
+ 'to scope to another domain.', domain_name)
+ pass
+ else:
+ if len(domains) > 1:
+ LOG.info("More than one valid domain found for user %s,"
+ " scoping to %s",
+ (unscoped_auth_ref.user_id, domain_name))
+ break
+ return domain_auth, domain_auth_ref
diff --git a/openstack_auth/plugin/k2k.py b/openstack_auth/plugin/k2k.py
new file mode 100644
index 000000000..ba4cbf6bc
--- /dev/null
+++ b/openstack_auth/plugin/k2k.py
@@ -0,0 +1,107 @@
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+
+from django.conf import settings
+from django.utils.translation import ugettext_lazy as _
+from keystoneauth1.identity import v3 as v3_auth
+
+from openstack_auth import exceptions
+from openstack_auth.plugin import base
+from openstack_auth import utils
+
+LOG = logging.getLogger(__name__)
+
+__all__ = ['K2KAuthPlugin']
+
+
+class K2KAuthPlugin(base.BasePlugin):
+
+ def get_plugin(self, service_provider=None, auth_url=None, plugins=None,
+ **kwargs):
+ """Authenticate using keystone to keystone federation.
+
+ This plugin uses other v3 plugins to authenticate a user to a
+ identity provider in order to authenticate the user to a service
+ provider
+
+ :param service_provider: service provider ID
+ :param auth_url: Keystone auth url
+ :param plugins: list of openstack_auth plugins to check
+ :returns Keystone2Keystone keystone auth plugin
+ """
+
+ # Avoid mutable default arg for plugins
+ plugins = plugins or []
+
+ # service_provider being None prevents infinite recursion
+ if utils.get_keystone_version() < 3 or not service_provider:
+ return None
+
+ keystone_idp_id = getattr(settings, 'KEYSTONE_PROVIDER_IDP_ID',
+ 'localkeystone')
+ if service_provider == keystone_idp_id:
+ return None
+
+ for plugin in plugins:
+ unscoped_idp_auth = plugin.get_plugin(plugins=plugins,
+ auth_url=auth_url, **kwargs)
+ if unscoped_idp_auth:
+ break
+ else:
+ LOG.debug('Could not find base authentication backend for '
+ 'K2K plugin with the provided credentials.')
+ return None
+
+ idp_exception = None
+ scoped_idp_auth = None
+ unscoped_auth_ref = base.BasePlugin.get_access_info(
+ self, unscoped_idp_auth)
+ try:
+ scoped_idp_auth, __ = self.get_project_scoped_auth(
+ unscoped_idp_auth, unscoped_auth_ref)
+ except exceptions.KeystoneAuthException as idp_excp:
+ idp_exception = idp_excp
+
+ if not scoped_idp_auth or idp_exception:
+ msg = 'Identity provider authentication Failed.'
+ raise exceptions.KeystoneAuthException(msg)
+
+ session = utils.get_session()
+
+ if scoped_idp_auth.get_sp_auth_url(session, service_provider) is None:
+ msg = _('Could not find service provider ID on Keystone.')
+ raise exceptions.KeystoneAuthException(msg)
+
+ unscoped_auth = v3_auth.Keystone2Keystone(
+ base_plugin=scoped_idp_auth,
+ service_provider=service_provider)
+ return unscoped_auth
+
+ def get_access_info(self, unscoped_auth):
+ """Get the access info object
+
+ We attempt to get the auth ref. If it fails and if the K2K auth plugin
+ was being used then we will prepend a message saying that the error was
+ on the service provider side.
+ :param: unscoped_auth: Keystone auth plugin for unscoped user
+ :returns: keystoneclient.access.AccessInfo object
+ """
+ try:
+ unscoped_auth_ref = base.BasePlugin.get_access_info(
+ self, unscoped_auth)
+ except exceptions.KeystoneAuthException as excp:
+ msg = _('Service provider authentication failed. %s')
+ raise exceptions.KeystoneAuthException(msg % str(excp))
+ return unscoped_auth_ref
diff --git a/openstack_auth/plugin/password.py b/openstack_auth/plugin/password.py
new file mode 100644
index 000000000..0e3100c29
--- /dev/null
+++ b/openstack_auth/plugin/password.py
@@ -0,0 +1,51 @@
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import logging
+
+from keystoneauth1.identity import v2 as v2_auth
+from keystoneauth1.identity import v3 as v3_auth
+
+from openstack_auth.plugin import base
+from openstack_auth import utils
+
+LOG = logging.getLogger(__name__)
+
+__all__ = ['PasswordPlugin']
+
+
+class PasswordPlugin(base.BasePlugin):
+ """Authenticate against keystone given a username and password.
+
+ This is the default login mechanism. Given a username and password inputted
+ from a login form returns a v2 or v3 keystone Password plugin for
+ authentication.
+ """
+
+ def get_plugin(self, auth_url=None, username=None, password=None,
+ user_domain_name=None, **kwargs):
+ if not all((auth_url, username, password)):
+ return None
+
+ LOG.debug('Attempting to authenticate for %s', username)
+
+ if utils.get_keystone_version() >= 3:
+ return v3_auth.Password(auth_url=auth_url,
+ username=username,
+ password=password,
+ user_domain_name=user_domain_name,
+ unscoped=True)
+
+ else:
+ return v2_auth.Password(auth_url=auth_url,
+ username=username,
+ password=password)
diff --git a/openstack_auth/plugin/token.py b/openstack_auth/plugin/token.py
new file mode 100644
index 000000000..b812b44fa
--- /dev/null
+++ b/openstack_auth/plugin/token.py
@@ -0,0 +1,41 @@
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from keystoneauth1.identity import v2 as v2_auth
+from keystoneauth1.identity import v3 as v3_auth
+
+from openstack_auth.plugin import base
+from openstack_auth import utils
+
+
+__all__ = ['TokenPlugin']
+
+
+class TokenPlugin(base.BasePlugin):
+ """Authenticate against keystone with an existing token."""
+
+ def get_plugin(self, auth_url=None, token=None, project_id=None,
+ **kwargs):
+ if not all((auth_url, token)):
+ return None
+
+ if utils.get_keystone_version() >= 3:
+ return v3_auth.Token(auth_url=auth_url,
+ token=token,
+ project_id=project_id,
+ reauthenticate=False)
+
+ else:
+ return v2_auth.Token(auth_url=auth_url,
+ token=token,
+ tenant_id=project_id,
+ reauthenticate=False)
diff --git a/openstack_auth/policy.py b/openstack_auth/policy.py
new file mode 100644
index 000000000..74cc23a7d
--- /dev/null
+++ b/openstack_auth/policy.py
@@ -0,0 +1,231 @@
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""Policy engine for openstack_auth"""
+
+import logging
+import os.path
+
+from django.conf import settings
+from oslo_config import cfg
+from oslo_policy import opts as policy_opts
+from oslo_policy import policy
+
+from openstack_auth import user as auth_user
+from openstack_auth import utils as auth_utils
+
+LOG = logging.getLogger(__name__)
+
+_ENFORCER = None
+_BASE_PATH = getattr(settings, 'POLICY_FILES_PATH', '')
+
+
+def _get_policy_conf(policy_file, policy_dirs=None):
+ conf = cfg.ConfigOpts()
+ # Passing [] is required. Otherwise oslo.config looks up sys.argv.
+ conf([])
+ policy_opts.set_defaults(conf)
+ policy_file = os.path.join(_BASE_PATH, policy_file)
+ conf.set_default('policy_file', policy_file, 'oslo_policy')
+ # Policy Enforcer has been updated to take in a policy directory
+ # as a config option. However, the default value in is set to
+ # ['policy.d'] which causes the code to break. Set the default
+ # value to empty list for now.
+ if policy_dirs is None:
+ policy_dirs = []
+ policy_dirs = [os.path.join(_BASE_PATH, policy_dir)
+ for policy_dir in policy_dirs]
+ conf.set_default('policy_dirs', policy_dirs, 'oslo_policy')
+ return conf
+
+
+def _get_enforcer():
+ global _ENFORCER
+ if not _ENFORCER:
+ _ENFORCER = {}
+ policy_files = getattr(settings, 'POLICY_FILES', {})
+ policy_dirs = getattr(settings, 'POLICY_DIRS', {})
+ for service in policy_files.keys():
+ conf = _get_policy_conf(policy_file=policy_files[service],
+ policy_dirs=policy_dirs.get(service, []))
+ enforcer = policy.Enforcer(conf)
+ # Ensure enforcer.policy_path is populated.
+ enforcer.load_rules()
+ if os.path.isfile(enforcer.policy_path):
+ LOG.debug("adding enforcer for service: %s", service)
+ _ENFORCER[service] = enforcer
+ else:
+ LOG.warning("policy file for service: %s not found at %s",
+ (service, enforcer.policy_path))
+ return _ENFORCER
+
+
+def reset():
+ global _ENFORCER
+ _ENFORCER = None
+
+
+def check(actions, request, target=None):
+ """Check user permission.
+
+ Check if the user has permission to the action according
+ to policy setting.
+
+ :param actions: list of scope and action to do policy checks on,
+ the composition of which is (scope, action). Multiple actions
+ are treated as a logical AND.
+
+ * scope: service type managing the policy for action
+
+ * action: string representing the action to be checked
+
+ this should be colon separated for clarity.
+ i.e.
+
+ | compute:create_instance
+ | compute:attach_volume
+ | volume:attach_volume
+
+ for a policy action that requires a single action, actions
+ should look like
+
+ | "(("compute", "compute:create_instance"),)"
+
+ for a multiple action check, actions should look like
+ | "(("identity", "identity:list_users"),
+ | ("identity", "identity:list_roles"))"
+
+ :param request: django http request object. If not specified, credentials
+ must be passed.
+ :param target: dictionary representing the object of the action
+ for object creation this should be a dictionary
+ representing the location of the object e.g.
+ {'project_id': object.project_id}
+ :returns: boolean if the user has permission or not for the actions.
+ """
+ if target is None:
+ target = {}
+ user = auth_utils.get_user(request)
+
+ # Several service policy engines default to a project id check for
+ # ownership. Since the user is already scoped to a project, if a
+ # different project id has not been specified use the currently scoped
+ # project's id.
+ #
+ # The reason is the operator can edit the local copies of the service
+ # policy file. If a rule is removed, then the default rule is used. We
+ # don't want to block all actions because the operator did not fully
+ # understand the implication of editing the policy file. Additionally,
+ # the service APIs will correct us if we are too permissive.
+ if target.get('project_id') is None:
+ target['project_id'] = user.project_id
+ if target.get('tenant_id') is None:
+ target['tenant_id'] = target['project_id']
+ # same for user_id
+ if target.get('user_id') is None:
+ target['user_id'] = user.id
+
+ domain_id_keys = [
+ 'domain_id',
+ 'project.domain_id',
+ 'user.domain_id',
+ 'group.domain_id'
+ ]
+ # populates domain id keys with user's current domain id
+ for key in domain_id_keys:
+ if target.get(key) is None:
+ target[key] = user.user_domain_id
+
+ credentials = _user_to_credentials(user)
+ domain_credentials = _domain_to_credentials(request, user)
+ # if there is a domain token use the domain_id instead of the user's domain
+ if domain_credentials:
+ credentials['domain_id'] = domain_credentials.get('domain_id')
+
+ enforcer = _get_enforcer()
+
+ for action in actions:
+ scope, action = action[0], action[1]
+ if scope in enforcer:
+ # this is for handling the v3 policy file and will only be
+ # needed when a domain scoped token is present
+ if scope == 'identity' and domain_credentials:
+ # use domain credentials
+ if not _check_credentials(enforcer[scope],
+ action,
+ target,
+ domain_credentials):
+ return False
+
+ # use project credentials
+ if not _check_credentials(enforcer[scope],
+ action, target, credentials):
+ return False
+
+ # if no policy for scope, allow action, underlying API will
+ # ultimately block the action if not permitted, treat as though
+ # allowed
+ return True
+
+
+def _check_credentials(enforcer_scope, action, target, credentials):
+ is_valid = True
+ if not enforcer_scope.enforce(action, target, credentials):
+ # to match service implementations, if a rule is not found,
+ # use the default rule for that service policy
+ #
+ # waiting to make the check because the first call to
+ # enforce loads the rules
+ if action not in enforcer_scope.rules:
+ if not enforcer_scope.enforce('default', target, credentials):
+ is_valid = False
+ else:
+ is_valid = False
+ return is_valid
+
+
+def _user_to_credentials(user):
+ if not hasattr(user, "_credentials"):
+ roles = [role['name'] for role in user.roles]
+ user._credentials = {'user_id': user.id,
+ 'token': user.token,
+ 'username': user.username,
+ 'project_id': user.project_id,
+ 'tenant_id': user.project_id,
+ 'project_name': user.project_name,
+ 'domain_id': user.user_domain_id,
+ 'is_admin': user.is_superuser,
+ 'roles': roles}
+ return user._credentials
+
+
+def _domain_to_credentials(request, user):
+ if not hasattr(user, "_domain_credentials"):
+ try:
+ domain_auth_ref = request.session.get('domain_token')
+
+ # no domain role or not running on V3
+ if not domain_auth_ref:
+ return None
+ domain_user = auth_user.create_user_from_token(
+ request, auth_user.Token(domain_auth_ref),
+ domain_auth_ref.service_catalog.url_for(interface=None))
+ user._domain_credentials = _user_to_credentials(domain_user)
+
+ # uses the domain_id associated with the domain_user
+ user._domain_credentials['domain_id'] = domain_user.domain_id
+
+ except Exception:
+ LOG.warning("Failed to create user from domain scoped token.")
+ return None
+ return user._domain_credentials
diff --git a/openstack_auth/tests/__init__.py b/openstack_auth/tests/__init__.py
new file mode 100644
index 000000000..e69de29bb
--- /dev/null
+++ b/openstack_auth/tests/__init__.py
diff --git a/openstack_auth/tests/conf/keystone_policy.json b/openstack_auth/tests/conf/keystone_policy.json
new file mode 100644
index 000000000..90ffdfd4b
--- /dev/null
+++ b/openstack_auth/tests/conf/keystone_policy.json
@@ -0,0 +1,146 @@
+{
+ "admin_required": "role:admin or is_admin:1",
+ "service_role": "role:service",
+ "service_or_admin": "rule:admin_required or rule:service_role",
+ "owner" : "user_id:%(user_id)s",
+ "admin_or_owner": "rule:admin_required or rule:owner",
+
+ "default": "rule:admin_required",
+
+ "identity:get_region": "",
+ "identity:list_regions": "",
+ "identity:create_region": "rule:admin_required",
+ "identity:update_region": "rule:admin_required",
+ "identity:delete_region": "rule:admin_required",
+
+ "identity:get_service": "rule:admin_required",
+ "identity:list_services": "rule:admin_required",
+ "identity:create_service": "rule:admin_required",
+ "identity:update_service": "rule:admin_required",
+ "identity:delete_service": "rule:admin_required",
+
+ "identity:get_endpoint": "rule:admin_required",
+ "identity:list_endpoints": "rule:admin_required",
+ "identity:create_endpoint": "rule:admin_required",
+ "identity:update_endpoint": "rule:admin_required",
+ "identity:delete_endpoint": "rule:admin_required",
+
+ "identity:get_catalog": "",
+
+ "identity:get_domain": "rule:admin_required",
+ "identity:list_domains": "rule:admin_required",
+ "identity:create_domain": "rule:admin_required",
+ "identity:update_domain": "rule:admin_required",
+ "identity:delete_domain": "rule:admin_required",
+
+ "identity:get_project": "rule:admin_required",
+ "identity:list_projects": "rule:admin_required",
+ "identity:list_user_projects": "rule:admin_or_owner",
+ "identity:create_project": "rule:admin_required",
+ "identity:update_project": "rule:admin_required",
+ "identity:delete_project": "rule:admin_required",
+
+ "identity:get_user": "rule:admin_required",
+ "identity:list_users": "rule:admin_required",
+ "identity:create_user": "rule:admin_required",
+ "identity:update_user": "rule:admin_required",
+ "identity:delete_user": "rule:admin_required",
+ "identity:change_password": "rule:admin_or_owner",
+
+ "identity:get_group": "rule:admin_required",
+ "identity:list_groups": "rule:admin_required",
+ "identity:list_groups_for_user": "rule:admin_or_owner",
+ "identity:create_group": "rule:admin_required",
+ "identity:update_group": "rule:admin_required",
+ "identity:delete_group": "rule:admin_required",
+ "identity:list_users_in_group": "rule:admin_required",
+ "identity:remove_user_from_group": "rule:admin_required",
+ "identity:check_user_in_group": "rule:admin_required",
+ "identity:add_user_to_group": "rule:admin_required",
+
+ "identity:get_credential": "rule:admin_required",
+ "identity:list_credentials": "rule:admin_required",
+ "identity:create_credential": "rule:admin_required",
+ "identity:update_credential": "rule:admin_required",
+ "identity:delete_credential": "rule:admin_required",
+
+ "identity:ec2_get_credential": "rule:admin_or_owner",
+ "identity:ec2_list_credentials": "rule:admin_or_owner",
+ "identity:ec2_create_credential": "rule:admin_or_owner",
+ "identity:ec2_delete_credential": "rule:admin_required or (rule:owner and user_id:%(target.credential.user_id)s)",
+
+ "identity:get_role": "rule:admin_required",
+ "identity:list_roles": "rule:admin_required",
+ "identity:create_role": "rule:admin_required",
+ "identity:update_role": "rule:admin_required",
+ "identity:delete_role": "rule:admin_required",
+
+ "identity:check_grant": "rule:admin_required",
+ "identity:list_grants": "rule:admin_required",
+ "identity:create_grant": "rule:admin_required",
+ "identity:revoke_grant": "rule:admin_required",
+
+ "identity:list_role_assignments": "rule:admin_required",
+
+ "identity:get_policy": "rule:admin_required",
+ "identity:list_policies": "rule:admin_required",
+ "identity:create_policy": "rule:admin_required",
+ "identity:update_policy": "rule:admin_required",
+ "identity:delete_policy": "rule:admin_required",
+
+ "identity:check_token": "rule:admin_required",
+ "identity:validate_token": "rule:service_or_admin",
+ "identity:validate_token_head": "rule:service_or_admin",
+ "identity:revocation_list": "rule:service_or_admin",
+ "identity:revoke_token": "rule:admin_or_owner",
+
+ "identity:create_trust": "user_id:%(trust.trustor_user_id)s",
+ "identity:get_trust": "rule:admin_or_owner",
+ "identity:list_trusts": "",
+ "identity:list_roles_for_trust": "",
+ "identity:check_role_for_trust": "",
+ "identity:get_role_for_trust": "",
+ "identity:delete_trust": "",
+
+ "identity:create_consumer": "rule:admin_required",
+ "identity:get_consumer": "rule:admin_required",
+ "identity:list_consumers": "rule:admin_required",
+ "identity:delete_consumer": "rule:admin_required",
+ "identity:update_consumer": "rule:admin_required",
+
+ "identity:authorize_request_token": "rule:admin_required",
+ "identity:list_access_token_roles": "rule:admin_required",
+ "identity:get_access_token_role": "rule:admin_required",
+ "identity:list_access_tokens": "rule:admin_required",
+ "identity:get_access_token": "rule:admin_required",
+ "identity:delete_access_token": "rule:admin_required",
+
+ "identity:list_projects_for_endpoint": "rule:admin_required",
+ "identity:add_endpoint_to_project": "rule:admin_required",
+ "identity:check_endpoint_in_project": "rule:admin_required",
+ "identity:list_endpoints_for_project": "rule:admin_required",
+ "identity:remove_endpoint_from_project": "rule:admin_required",
+
+ "identity:create_identity_provider": "rule:admin_required",
+ "identity:list_identity_providers": "rule:admin_required",
+ "identity:get_identity_providers": "rule:admin_required",
+ "identity:update_identity_provider": "rule:admin_required",
+ "identity:delete_identity_provider": "rule:admin_required",
+
+ "identity:create_protocol": "rule:admin_required",
+ "identity:update_protocol": "rule:admin_required",
+ "identity:get_protocol": "rule:admin_required",
+ "identity:list_protocols": "rule:admin_required",
+ "identity:delete_protocol": "rule:admin_required",
+
+ "identity:create_mapping": "rule:admin_required",
+ "identity:get_mapping": "rule:admin_required",
+ "identity:list_mappings": "rule:admin_required",
+ "identity:delete_mapping": "rule:admin_required",
+ "identity:update_mapping": "rule:admin_required",
+
+ "identity:list_projects_for_groups": "",
+ "identity:list_domains_for_groups": "",
+
+ "identity:list_revoke_events": ""
+}
diff --git a/openstack_auth/tests/conf/nova_policy.json b/openstack_auth/tests/conf/nova_policy.json
new file mode 100644
index 000000000..487e46c11
--- /dev/null
+++ b/openstack_auth/tests/conf/nova_policy.json
@@ -0,0 +1,274 @@
+{
+ "context_is_admin": "role:admin",
+ "admin_or_owner": "is_admin:True or project_id:%(project_id)s",
+ "default": "rule:admin_or_owner",
+
+ "cells_scheduler_filter:TargetCellFilter": "is_admin:True",
+
+ "compute:create": "",
+ "compute:create:attach_network": "",
+ "compute:create:attach_volume": "",
+ "compute:create:forced_host": "is_admin:True",
+ "compute:delete": "rule:default",
+ "compute:get_all": "",
+ "compute:get_all_tenants": "",
+ "compute:reboot": "rule:default",
+ "compute:rebuild": "rule:default",
+ "compute:snapshot": "rule:default",
+ "compute:start": "rule:default",
+ "compute:stop": "rule:default",
+ "compute:unlock_override": "rule:admin_api",
+ "compute:attach_volume" : "rule:default",
+ "compute:detach_volume" : "rule:default",
+ "compute:update": "rule:default",
+
+ "compute:resize": "rule:default",
+ "compute:confirm_resize": "rule:default",
+ "compute:revert_resize": "rule:default",
+
+ "compute:shelve": "",
+ "compute:shelve_offload": "",
+ "compute:unshelve": "",
+
+ "admin_api": "is_admin:True",
+ "compute_extension:accounts": "rule:admin_api",
+ "compute_extension:admin_actions": "rule:admin_api",
+ "compute_extension:admin_actions:pause": "rule:admin_or_owner",
+ "compute_extension:admin_actions:unpause": "rule:admin_or_owner",
+ "compute_extension:admin_actions:suspend": "rule:admin_or_owner",
+ "compute_extension:admin_actions:resume": "rule:admin_or_owner",
+ "compute_extension:admin_actions:lock": "rule:admin_or_owner",
+ "compute_extension:admin_actions:unlock": "rule:admin_or_owner",
+ "compute_extension:admin_actions:resetNetwork": "rule:admin_api",
+ "compute_extension:admin_actions:injectNetworkInfo": "rule:admin_api",
+ "compute_extension:admin_actions:createBackup": "rule:admin_or_owner",
+ "compute_extension:admin_actions:migrateLive": "rule:admin_api",
+ "compute_extension:admin_actions:resetState": "rule:admin_api",
+ "compute_extension:admin_actions:migrate": "rule:admin_api",
+ "compute_extension:v3:os-admin-actions": "rule:admin_api",
+ "compute_extension:v3:os-admin-actions:pause": "rule:admin_or_owner",
+ "compute_extension:v3:os-admin-actions:unpause": "rule:admin_or_owner",
+ "compute_extension:v3:os-admin-actions:suspend": "rule:admin_or_owner",
+ "compute_extension:v3:os-admin-actions:resume": "rule:admin_or_owner",
+ "compute_extension:v3:os-admin-actions:lock": "rule:admin_or_owner",
+ "compute_extension:v3:os-admin-actions:unlock": "rule:admin_or_owner",
+ "compute_extension:v3:os-admin-actions:reset_network": "rule:admin_api",
+ "compute_extension:v3:os-admin-actions:inject_network_info": "rule:admin_api",
+ "compute_extension:v3:os-admin-actions:create_backup": "rule:admin_or_owner",
+ "compute_extension:v3:os-admin-actions:migrate_live": "rule:admin_api",
+ "compute_extension:v3:os-admin-actions:reset_state": "rule:admin_api",
+ "compute_extension:v3:os-admin-actions:migrate": "rule:admin_api",
+ "compute_extension:v3:os-admin-password": "",
+ "compute_extension:aggregates": "rule:admin_api",
+ "compute_extension:v3:os-aggregates": "rule:admin_api",
+ "compute_extension:agents": "rule:admin_api",
+ "compute_extension:v3:os-agents": "rule:admin_api",
+ "compute_extension:attach_interfaces": "",
+ "compute_extension:v3:os-attach-interfaces": "",
+ "compute_extension:baremetal_nodes": "rule:admin_api",
+ "compute_extension:v3:os-baremetal-nodes": "rule:admin_api",
+ "compute_extension:cells": "rule:admin_api",
+ "compute_extension:v3:os-cells": "rule:admin_api",
+ "compute_extension:certificates": "",
+ "compute_extension:v3:os-certificates": "",
+ "compute_extension:cloudpipe": "rule:admin_api",
+ "compute_extension:cloudpipe_update": "rule:admin_api",
+ "compute_extension:console_output": "",
+ "compute_extension:v3:consoles:discoverable": "",
+ "compute_extension:v3:os-console-output": "",
+ "compute_extension:consoles": "",
+ "compute_extension:v3:os-remote-consoles": "",
+ "compute_extension:coverage_ext": "rule:admin_api",
+ "compute_extension:v3:os-coverage": "rule:admin_api",
+ "compute_extension:createserverext": "",
+ "compute_extension:deferred_delete": "",
+ "compute_extension:v3:os-deferred-delete": "",
+ "compute_extension:disk_config": "",
+ "compute_extension:evacuate": "rule:admin_api",
+ "compute_extension:v3:os-evacuate": "rule:admin_api",
+ "compute_extension:extended_server_attributes": "rule:admin_api",
+ "compute_extension:v3:os-extended-server-attributes": "rule:admin_api",
+ "compute_extension:extended_status": "",
+ "compute_extension:v3:os-extended-status": "",
+ "compute_extension:extended_availability_zone": "",
+ "compute_extension:v3:os-extended-availability-zone": "",
+ "compute_extension:extended_ips": "",
+ "compute_extension:extended_ips_mac": "",
+ "compute_extension:extended_vif_net": "",
+ "compute_extension:v3:extension_info:discoverable": "",
+ "compute_extension:extended_volumes": "",
+ "compute_extension:v3:os-extended-volumes": "",
+ "compute_extension:v3:os-extended-volumes:attach": "",
+ "compute_extension:v3:os-extended-volumes:detach": "",
+ "compute_extension:fixed_ips": "rule:admin_api",
+ "compute_extension:v3:os-fixed-ips:discoverable": "",
+ "compute_extension:v3:os-fixed-ips": "rule:admin_api",
+ "compute_extension:flavor_access": "",
+ "compute_extension:v3:os-flavor-access": "",
+ "compute_extension:flavor_disabled": "",
+ "compute_extension:v3:os-flavor-disabled": "",
+ "compute_extension:flavor_rxtx": "",
+ "compute_extension:v3:os-flavor-rxtx": "",
+ "compute_extension:flavor_swap": "",
+ "compute_extension:flavorextradata": "",
+ "compute_extension:flavorextraspecs:index": "",
+ "compute_extension:flavorextraspecs:show": "",
+ "compute_extension:flavorextraspecs:create": "rule:admin_api",
+ "compute_extension:flavorextraspecs:update": "rule:admin_api",
+ "compute_extension:flavorextraspecs:delete": "rule:admin_api",
+ "compute_extension:v3:flavor-extra-specs:index": "",
+ "compute_extension:v3:flavor-extra-specs:show": "",
+ "compute_extension:v3:flavor-extra-specs:create": "rule:admin_api",
+ "compute_extension:v3:flavor-extra-specs:update": "rule:admin_api",
+ "compute_extension:v3:flavor-extra-specs:delete": "rule:admin_api",
+ "compute_extension:flavormanage": "rule:admin_api",
+ "compute_extension:floating_ip_dns": "",
+ "compute_extension:floating_ip_pools": "",
+ "compute_extension:floating_ips": "",
+ "compute_extension:floating_ips_bulk": "rule:admin_api",
+ "compute_extension:fping": "",
+ "compute_extension:fping:all_tenants": "rule:admin_api",
+ "compute_extension:hide_server_addresses": "is_admin:False",
+ "compute_extension:v3:os-hide-server-addresses": "is_admin:False",
+ "compute_extension:hosts": "rule:admin_api",
+ "compute_extension:v3:os-hosts": "rule:admin_api",
+ "compute_extension:hypervisors": "rule:admin_api",
+ "compute_extension:v3:os-hypervisors": "rule:admin_api",
+ "compute_extension:image_size": "",
+ "compute_extension:v3:os-image-metadata": "",
+ "compute_extension:v3:os-images": "",
+ "compute_extension:instance_actions": "",
+ "compute_extension:v3:os-instance-actions": "",
+ "compute_extension:instance_actions:events": "rule:admin_api",
+ "compute_extension:v3:os-instance-actions:events": "rule:admin_api",
+ "compute_extension:instance_usage_audit_log": "rule:admin_api",
+ "compute_extension:v3:os-instance-usage-audit-log": "rule:admin_api",
+ "compute_extension:v3:ips:discoverable": "",
+ "compute_extension:keypairs": "",
+ "compute_extension:keypairs:index": "",
+ "compute_extension:keypairs:show": "",
+ "compute_extension:keypairs:create": "",
+ "compute_extension:keypairs:delete": "",
+ "compute_extension:v3:os-keypairs:discoverable": "",
+ "compute_extension:v3:os-keypairs": "",
+ "compute_extension:v3:os-keypairs:index": "",
+ "compute_extension:v3:os-keypairs:show": "",
+ "compute_extension:v3:os-keypairs:create": "",
+ "compute_extension:v3:os-keypairs:delete": "",
+ "compute_extension:multinic": "",
+ "compute_extension:v3:os-multinic": "",
+ "compute_extension:networks": "rule:admin_api",
+ "compute_extension:networks:view": "",
+ "compute_extension:networks_associate": "rule:admin_api",
+ "compute_extension:quotas:show": "",
+ "compute_extension:quotas:update": "rule:admin_api",
+ "compute_extension:quotas:delete": "rule:admin_api",
+ "compute_extension:v3:os-quota-sets:show": "",
+ "compute_extension:v3:os-quota-sets:update": "rule:admin_api",
+ "compute_extension:v3:os-quota-sets:delete": "rule:admin_api",
+ "compute_extension:quota_classes": "",
+ "compute_extension:v3:os-quota-class-sets": "",
+ "compute_extension:rescue": "",
+ "compute_extension:v3:os-rescue": "",
+ "compute_extension:security_group_default_rules": "rule:admin_api",
+ "compute_extension:security_groups": "",
+ "compute_extension:v3:os-security-groups": "",
+ "compute_extension:server_diagnostics": "rule:admin_api",
+ "compute_extension:v3:os-server-diagnostics": "rule:admin_api",
+ "compute_extension:server_password": "",
+ "compute_extension:v3:os-server-password": "",
+ "compute_extension:server_usage": "",
+ "compute_extension:v3:os-server-usage": "",
+ "compute_extension:services": "rule:admin_api",
+ "compute_extension:v3:os-services": "rule:admin_api",
+ "compute_extension:v3:servers:discoverable": "",
+ "compute_extension:shelve": "",
+ "compute_extension:shelveOffload": "rule:admin_api",
+ "compute_extension:v3:os-shelve:shelve": "",
+ "compute_extension:v3:os-shelve:shelve_offload": "rule:admin_api",
+ "compute_extension:simple_tenant_usage:show": "rule:admin_or_owner",
+ "compute_extension:v3:os-simple-tenant-usage:show": "rule:admin_or_owner",
+ "compute_extension:simple_tenant_usage:list": "rule:admin_api",
+ "compute_extension:v3:os-simple-tenant-usage:list": "rule:admin_api",
+ "compute_extension:unshelve": "",
+ "compute_extension:v3:os-shelve:unshelve": "",
+ "compute_extension:users": "rule:admin_api",
+ "compute_extension:virtual_interfaces": "",
+ "compute_extension:virtual_storage_arrays": "",
+ "compute_extension:volumes": "",
+ "compute_extension:volume_attachments:index": "",
+ "compute_extension:volume_attachments:show": "",
+ "compute_extension:volume_attachments:create": "",
+ "compute_extension:volume_attachments:update": "",
+ "compute_extension:volume_attachments:delete": "",
+ "compute_extension:volumetypes": "",
+ "compute_extension:availability_zone:list": "",
+ "compute_extension:v3:os-availability-zone:list": "",
+ "compute_extension:availability_zone:detail": "rule:admin_api",
+ "compute_extension:v3:os-availability-zone:detail": "rule:admin_api",
+ "compute_extension:used_limits_for_admin": "rule:admin_api",
+ "compute_extension:v3:os-used-limits": "",
+ "compute_extension:v3:os-used-limits:tenant": "rule:admin_api",
+ "compute_extension:migrations:index": "rule:admin_api",
+ "compute_extension:v3:os-migrations:index": "rule:admin_api",
+
+
+ "volume:create": "",
+ "volume:get_all": "",
+ "volume:get_volume_metadata": "",
+ "volume:get_snapshot": "",
+ "volume:get_all_snapshots": "",
+
+
+ "volume_extension:types_manage": "rule:admin_api",
+ "volume_extension:types_extra_specs": "rule:admin_api",
+ "volume_extension:volume_admin_actions:reset_status": "rule:admin_api",
+ "volume_extension:snapshot_admin_actions:reset_status": "rule:admin_api",
+ "volume_extension:volume_admin_actions:force_delete": "rule:admin_api",
+
+
+ "network:get_all": "",
+ "network:get": "",
+ "network:create": "",
+ "network:delete": "",
+ "network:associate": "",
+ "network:disassociate": "",
+ "network:get_vifs_by_instance": "",
+ "network:allocate_for_instance": "",
+ "network:deallocate_for_instance": "",
+ "network:validate_networks": "",
+ "network:get_instance_uuids_by_ip_filter": "",
+ "network:get_instance_id_by_floating_address": "",
+ "network:setup_networks_on_host": "",
+ "network:get_backdoor_port": "",
+
+ "network:get_floating_ip": "",
+ "network:get_floating_ip_pools": "",
+ "network:get_floating_ip_by_address": "",
+ "network:get_floating_ips_by_project": "",
+ "network:get_floating_ips_by_fixed_address": "",
+ "network:allocate_floating_ip": "",
+ "network:deallocate_floating_ip": "",
+ "network:associate_floating_ip": "",
+ "network:disassociate_floating_ip": "",
+ "network:release_floating_ip": "",
+ "network:migrate_instance_start": "",
+ "network:migrate_instance_finish": "",
+
+ "network:get_fixed_ip": "",
+ "network:get_fixed_ip_by_address": "",
+ "network:add_fixed_ip_to_instance": "",
+ "network:remove_fixed_ip_from_instance": "",
+ "network:add_network_to_project": "",
+ "network:get_instance_nw_info": "",
+
+ "network:get_dns_domains": "",
+ "network:add_dns_entry": "",
+ "network:modify_dns_entry": "",
+ "network:delete_dns_entry": "",
+ "network:get_dns_entries_by_address": "",
+ "network:get_dns_entries_by_name": "",
+ "network:create_private_dns_domain": "",
+ "network:create_public_dns_domain": "",
+ "network:delete_dns_domain": ""
+}
diff --git a/openstack_auth/tests/conf/policy.v3cloudsample.json b/openstack_auth/tests/conf/policy.v3cloudsample.json
new file mode 100644
index 000000000..a96996c68
--- /dev/null
+++ b/openstack_auth/tests/conf/policy.v3cloudsample.json
@@ -0,0 +1,195 @@
+{
+ "admin_required": "role:admin",
+ "cloud_admin": "rule:admin_required and domain_id:admin_domain_id",
+ "service_role": "role:service",
+ "service_or_admin": "rule:admin_required or rule:service_role",
+ "owner" : "user_id:%(user_id)s or user_id:%(target.token.user_id)s",
+ "admin_or_owner": "(rule:admin_required and domain_id:%(target.token.user.domain.id)s) or rule:owner",
+ "admin_or_cloud_admin": "rule:admin_required or rule:cloud_admin",
+ "admin_and_matching_domain_id": "rule:admin_required and domain_id:%(domain_id)s",
+ "service_admin_or_owner": "rule:service_or_admin or rule:owner",
+
+ "default": "rule:admin_required",
+
+ "identity:get_region": "",
+ "identity:list_regions": "",
+ "identity:create_region": "rule:cloud_admin",
+ "identity:update_region": "rule:cloud_admin",
+ "identity:delete_region": "rule:cloud_admin",
+
+ "identity:get_service": "rule:admin_or_cloud_admin",
+ "identity:list_services": "rule:admin_or_cloud_admin",
+ "identity:create_service": "rule:cloud_admin",
+ "identity:update_service": "rule:cloud_admin",
+ "identity:delete_service": "rule:cloud_admin",
+
+ "identity:get_endpoint": "rule:admin_or_cloud_admin",
+ "identity:list_endpoints": "rule:admin_or_cloud_admin",
+ "identity:create_endpoint": "rule:cloud_admin",
+ "identity:update_endpoint": "rule:cloud_admin",
+ "identity:delete_endpoint": "rule:cloud_admin",
+
+ "identity:get_domain": "rule:cloud_admin or rule:admin_and_matching_domain_id",
+ "identity:list_domains": "rule:cloud_admin",
+ "identity:create_domain": "rule:cloud_admin",
+ "identity:update_domain": "rule:cloud_admin",
+ "identity:delete_domain": "rule:cloud_admin",
+
+ "admin_and_matching_target_project_domain_id": "rule:admin_required and domain_id:%(target.project.domain_id)s",
+ "admin_and_matching_project_domain_id": "rule:admin_required and domain_id:%(project.domain_id)s",
+ "identity:get_project": "rule:cloud_admin or rule:admin_and_matching_target_project_domain_id",
+ "identity:list_projects": "rule:cloud_admin or rule:admin_and_matching_domain_id",
+ "identity:list_user_projects": "rule:owner or rule:admin_and_matching_domain_id",
+ "identity:create_project": "rule:cloud_admin or rule:admin_and_matching_project_domain_id",
+ "identity:update_project": "rule:cloud_admin or rule:admin_and_matching_target_project_domain_id",
+ "identity:delete_project": "rule:cloud_admin or rule:admin_and_matching_target_project_domain_id",
+
+ "admin_and_matching_target_user_domain_id": "rule:admin_required and domain_id:%(target.user.domain_id)s",
+ "admin_and_matching_user_domain_id": "rule:admin_required and domain_id:%(user.domain_id)s",
+ "identity:get_user": "rule:cloud_admin or rule:admin_and_matching_target_user_domain_id",
+ "identity:list_users": "rule:cloud_admin or rule:admin_and_matching_domain_id",
+ "identity:create_user": "rule:cloud_admin or rule:admin_and_matching_user_domain_id",
+ "identity:update_user": "rule:cloud_admin or rule:admin_and_matching_target_user_domain_id",
+ "identity:delete_user": "rule:cloud_admin or rule:admin_and_matching_target_user_domain_id",
+
+ "admin_and_matching_target_group_domain_id": "rule:admin_required and domain_id:%(target.group.domain_id)s",
+ "admin_and_matching_group_domain_id": "rule:admin_required and domain_id:%(group.domain_id)s",
+ "identity:get_group": "rule:cloud_admin or rule:admin_and_matching_target_group_domain_id",
+ "identity:list_groups": "rule:cloud_admin or rule:admin_and_matching_domain_id",
+ "identity:list_groups_for_user": "rule:owner or rule:admin_and_matching_domain_id",
+ "identity:create_group": "rule:cloud_admin or rule:admin_and_matching_group_domain_id",
+ "identity:update_group": "rule:cloud_admin or rule:admin_and_matching_target_group_domain_id",
+ "identity:delete_group": "rule:cloud_admin or rule:admin_and_matching_target_group_domain_id",
+ "identity:list_users_in_group": "rule:cloud_admin or rule:admin_and_matching_target_group_domain_id",
+ "identity:remove_user_from_group": "rule:cloud_admin or rule:admin_and_matching_target_group_domain_id",
+ "identity:check_user_in_group": "rule:cloud_admin or rule:admin_and_matching_target_group_domain_id",
+ "identity:add_user_to_group": "rule:cloud_admin or rule:admin_and_matching_target_group_domain_id",
+
+ "identity:get_credential": "rule:admin_required",
+ "identity:list_credentials": "rule:admin_required or user_id:%(user_id)s",
+ "identity:create_credential": "rule:admin_required",
+ "identity:update_credential": "rule:admin_required",
+ "identity:delete_credential": "rule:admin_required",
+
+ "identity:ec2_get_credential": "rule:admin_or_cloud_admin or (rule:owner and user_id:%(target.credential.user_id)s)",
+ "identity:ec2_list_credentials": "rule:admin_or_cloud_admin or rule:owner",
+ "identity:ec2_create_credential": "rule:admin_or_cloud_admin or rule:owner",
+ "identity:ec2_delete_credential": "rule:admin_or_cloud_admin or (rule:owner and user_id:%(target.credential.user_id)s)",
+
+ "identity:get_role": "rule:admin_or_cloud_admin",
+ "identity:list_roles": "rule:admin_or_cloud_admin",
+ "identity:create_role": "rule:cloud_admin",
+ "identity:update_role": "rule:cloud_admin",
+ "identity:delete_role": "rule:cloud_admin",
+
+ "domain_admin_for_grants": "rule:admin_required and (domain_id:%(domain_id)s or domain_id:%(target.project.domain_id)s)",
+ "project_admin_for_grants": "rule:admin_required and project_id:%(project_id)s",
+ "identity:check_grant": "rule:cloud_admin or rule:domain_admin_for_grants or rule:project_admin_for_grants",
+ "identity:list_grants": "rule:cloud_admin or rule:domain_admin_for_grants or rule:project_admin_for_grants",
+ "identity:create_grant": "rule:cloud_admin or rule:domain_admin_for_grants or rule:project_admin_for_grants",
+ "identity:revoke_grant": "rule:cloud_admin or rule:domain_admin_for_grants or rule:project_admin_for_grants",
+
+ "admin_on_domain_filter" : "rule:admin_required and domain_id:%(scope.domain.id)s",
+ "admin_on_project_filter" : "rule:admin_required and project_id:%(scope.project.id)s",
+ "identity:list_role_assignments": "rule:cloud_admin or rule:admin_on_domain_filter or rule:admin_on_project_filter",
+
+ "identity:get_policy": "rule:cloud_admin",
+ "identity:list_policies": "rule:cloud_admin",
+ "identity:create_policy": "rule:cloud_admin",
+ "identity:update_policy": "rule:cloud_admin",
+ "identity:delete_policy": "rule:cloud_admin",
+
+ "identity:change_password": "rule:owner",
+ "identity:check_token": "rule:admin_or_owner",
+ "identity:validate_token": "rule:service_admin_or_owner",
+ "identity:validate_token_head": "rule:service_or_admin",
+ "identity:revocation_list": "rule:service_or_admin",
+ "identity:revoke_token": "rule:admin_or_owner",
+
+ "identity:create_trust": "user_id:%(trust.trustor_user_id)s",
+ "identity:list_trusts": "",
+ "identity:list_roles_for_trust": "",
+ "identity:get_role_for_trust": "",
+ "identity:delete_trust": "",
+
+ "identity:create_consumer": "rule:admin_required",
+ "identity:get_consumer": "rule:admin_required",
+ "identity:list_consumers": "rule:admin_required",
+ "identity:delete_consumer": "rule:admin_required",
+ "identity:update_consumer": "rule:admin_required",
+
+ "identity:authorize_request_token": "rule:admin_required",
+ "identity:list_access_token_roles": "rule:admin_required",
+ "identity:get_access_token_role": "rule:admin_required",
+ "identity:list_access_tokens": "rule:admin_required",
+ "identity:get_access_token": "rule:admin_required",
+ "identity:delete_access_token": "rule:admin_required",
+
+ "identity:list_projects_for_endpoint": "rule:admin_required",
+ "identity:add_endpoint_to_project": "rule:admin_required",
+ "identity:check_endpoint_in_project": "rule:admin_required",
+ "identity:list_endpoints_for_project": "rule:admin_required",
+ "identity:remove_endpoint_from_project": "rule:admin_required",
+
+ "identity:create_endpoint_group": "rule:admin_required",
+ "identity:list_endpoint_groups": "rule:admin_required",
+ "identity:get_endpoint_group": "rule:admin_required",
+ "identity:update_endpoint_group": "rule:admin_required",
+ "identity:delete_endpoint_group": "rule:admin_required",
+ "identity:list_projects_associated_with_endpoint_group": "rule:admin_required",
+ "identity:list_endpoints_associated_with_endpoint_group": "rule:admin_required",
+ "identity:get_endpoint_group_in_project": "rule:admin_required",
+ "identity:list_endpoint_groups_for_project": "rule:admin_required",
+ "identity:add_endpoint_group_to_project": "rule:admin_required",
+ "identity:remove_endpoint_group_from_project": "rule:admin_required",
+
+ "identity:create_identity_provider": "rule:cloud_admin",
+ "identity:list_identity_providers": "rule:cloud_admin",
+ "identity:get_identity_providers": "rule:cloud_admin",
+ "identity:update_identity_provider": "rule:cloud_admin",
+ "identity:delete_identity_provider": "rule:cloud_admin",
+
+ "identity:create_protocol": "rule:cloud_admin",
+ "identity:update_protocol": "rule:cloud_admin",
+ "identity:get_protocol": "rule:cloud_admin",
+ "identity:list_protocols": "rule:cloud_admin",
+ "identity:delete_protocol": "rule:cloud_admin",
+
+ "identity:create_mapping": "rule:cloud_admin",
+ "identity:get_mapping": "rule:cloud_admin",
+ "identity:list_mappings": "rule:cloud_admin",
+ "identity:delete_mapping": "rule:cloud_admin",
+ "identity:update_mapping": "rule:cloud_admin",
+
+ "identity:create_service_provider": "rule:cloud_admin",
+ "identity:list_service_providers": "rule:cloud_admin",
+ "identity:get_service_provider": "rule:cloud_admin",
+ "identity:update_service_provider": "rule:cloud_admin",
+ "identity:delete_service_provider": "rule:cloud_admin",
+
+ "identity:get_auth_catalog": "",
+ "identity:get_auth_projects": "",
+ "identity:get_auth_domains": "",
+
+ "identity:list_projects_for_groups": "",
+ "identity:list_domains_for_groups": "",
+
+ "identity:list_revoke_events": "",
+
+ "identity:create_policy_association_for_endpoint": "rule:cloud_admin",
+ "identity:check_policy_association_for_endpoint": "rule:cloud_admin",
+ "identity:delete_policy_association_for_endpoint": "rule:cloud_admin",
+ "identity:create_policy_association_for_service": "rule:cloud_admin",
+ "identity:check_policy_association_for_service": "rule:cloud_admin",
+ "identity:delete_policy_association_for_service": "rule:cloud_admin",
+ "identity:create_policy_association_for_region_and_service": "rule:cloud_admin",
+ "identity:check_policy_association_for_region_and_service": "rule:cloud_admin",
+ "identity:delete_policy_association_for_region_and_service": "rule:cloud_admin",
+ "identity:get_policy_for_endpoint": "rule:cloud_admin",
+ "identity:list_endpoints_for_policy": "rule:cloud_admin",
+
+ "identity:create_domain_config": "rule:cloud_admin",
+ "identity:get_domain_config": "rule:cloud_admin",
+ "identity:update_domain_config": "rule:cloud_admin",
+ "identity:delete_domain_config": "rule:cloud_admin"
+}
diff --git a/openstack_auth/tests/data_v2.py b/openstack_auth/tests/data_v2.py
new file mode 100644
index 000000000..1f3a914a2
--- /dev/null
+++ b/openstack_auth/tests/data_v2.py
@@ -0,0 +1,143 @@
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import datetime
+import uuid
+
+from django.utils import datetime_safe
+from keystoneauth1.access import access
+from keystoneauth1.access import service_catalog
+from keystoneclient.v2_0 import roles
+from keystoneclient.v2_0 import tenants
+from keystoneclient.v2_0 import users
+
+
+class TestDataContainer(object):
+ """Arbitrary holder for test data in an object-oriented fashion."""
+ pass
+
+
+def generate_test_data():
+ '''Builds a set of test_data data as returned by Keystone V2.'''
+ test_data = TestDataContainer()
+
+ keystone_service = {
+ 'type': 'identity',
+ 'name': 'keystone',
+ 'endpoints_links': [],
+ 'endpoints': [
+ {
+ 'region': 'RegionOne',
+ 'adminURL': 'http://admin.localhost:35357/v2.0',
+ 'internalURL': 'http://internal.localhost:5000/v2.0',
+ 'publicURL': 'http://public.localhost:5000/v2.0'
+ }
+ ]
+ }
+
+ # Users
+ user_dict = {'id': uuid.uuid4().hex,
+ 'name': 'gabriel',
+ 'email': 'gabriel@example.com',
+ 'password': 'swordfish',
+ 'token': '',
+ 'enabled': True}
+ test_data.user = users.User(None, user_dict, loaded=True)
+
+ # Tenants
+ tenant_dict_1 = {'id': uuid.uuid4().hex,
+ 'name': 'tenant_one',
+ 'description': '',
+ 'enabled': True}
+ tenant_dict_2 = {'id': uuid.uuid4().hex,
+ 'name': 'tenant_two',
+ 'description': '',
+ 'enabled': False}
+ test_data.tenant_one = tenants.Tenant(None, tenant_dict_1, loaded=True)
+ test_data.tenant_two = tenants.Tenant(None, tenant_dict_2, loaded=True)
+
+ nova_service = {
+ 'type': 'compute',
+ 'name': 'nova',
+ 'endpoint_links': [],
+ 'endpoints': [
+ {
+ 'region': 'RegionOne',
+ 'adminURL': ('http://nova-admin.localhost:8774/v2.0/%s'
+ % (tenant_dict_1['id'])),
+ 'internalURL': ('http://nova-internal.localhost:8774/v2.0/%s'
+ % (tenant_dict_1['id'])),
+ 'publicURL': ('http://nova-public.localhost:8774/v2.0/%s'
+ % (tenant_dict_1['id']))
+ },
+ {
+ 'region': 'RegionTwo',
+ 'adminURL': ('http://nova2-admin.localhost:8774/v2.0/%s'
+ % (tenant_dict_1['id'])),
+ 'internalURL': ('http://nova2-internal.localhost:8774/v2.0/%s'
+ % (tenant_dict_1['id'])),
+ 'publicURL': ('http://nova2-public.localhost:8774/v2.0/%s'
+ % (tenant_dict_1['id']))
+ }
+ ]
+ }
+
+ # Roles
+ role_dict = {'id': uuid.uuid4().hex,
+ 'name': 'Member'}
+ test_data.role = roles.Role(roles.RoleManager, role_dict)
+
+ # Tokens
+ tomorrow = datetime_safe.datetime.now() + datetime.timedelta(days=1)
+ expiration = datetime_safe.datetime.isoformat(tomorrow)
+
+ scoped_token_dict = {
+ 'access': {
+ 'token': {
+ 'id': uuid.uuid4().hex,
+ 'expires': expiration,
+ 'tenant': tenant_dict_1,
+ 'tenants': [tenant_dict_1, tenant_dict_2]},
+ 'user': {
+ 'id': user_dict['id'],
+ 'name': user_dict['name'],
+ 'roles': [role_dict]},
+ 'serviceCatalog': [keystone_service, nova_service]
+ }
+ }
+
+ test_data.scoped_access_info = access.create(
+ resp=None,
+ body=scoped_token_dict)
+
+ unscoped_token_dict = {
+ 'access': {
+ 'token': {
+ 'id': uuid.uuid4().hex,
+ 'expires': expiration},
+ 'user': {
+ 'id': user_dict['id'],
+ 'name': user_dict['name'],
+ 'roles': [role_dict]},
+ 'serviceCatalog': [keystone_service]
+ }
+ }
+ test_data.unscoped_access_info = access.create(
+ resp=None,
+ body=unscoped_token_dict)
+
+ # Service Catalog
+ test_data.service_catalog = service_catalog.ServiceCatalogV2(
+ [keystone_service, nova_service])
+
+ return test_data
diff --git a/openstack_auth/tests/data_v3.py b/openstack_auth/tests/data_v3.py
new file mode 100644
index 000000000..1d2ef82b1
--- /dev/null
+++ b/openstack_auth/tests/data_v3.py
@@ -0,0 +1,361 @@
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import datetime
+import uuid
+
+from django.utils import datetime_safe
+from keystoneauth1.access import access
+from keystoneauth1.access import service_catalog
+from keystoneclient.common import cms
+from keystoneclient.v3 import domains
+from keystoneclient.v3 import projects
+from keystoneclient.v3 import roles
+from keystoneclient.v3 import users
+import requests
+
+
+class TestDataContainer(object):
+ """Arbitrary holder for test data in an object-oriented fashion."""
+ pass
+
+
+class TestResponse(requests.Response):
+ """Class used to wrap requests.Response.
+
+ It also provides some convenience to initialize with a dict.
+ """
+
+ def __init__(self, data):
+ self._text = None
+ super(TestResponse, self).__init__()
+ if isinstance(data, dict):
+ self.status_code = data.get('status_code', 200)
+ self.headers = data.get('headers', None)
+ # Fake the text attribute to streamline Response creation
+ self._text = data.get('text', None)
+ else:
+ self.status_code = data
+
+ def __eq__(self, other):
+ return self.__dict__ == other.__dict__
+
+ @property
+ def text(self):
+ return self._text
+
+
+def generate_test_data(pki=False, service_providers=False,
+ endpoint='localhost'):
+ '''Builds a set of test_data data as returned by Keystone V2.'''
+ test_data = TestDataContainer()
+
+ keystone_service = {
+ 'type': 'identity',
+ 'id': uuid.uuid4().hex,
+ 'endpoints': [
+ {
+ 'url': 'http://admin.%s:35357/v3' % endpoint,
+ 'region': 'RegionOne',
+ 'interface': 'admin',
+ 'id': uuid.uuid4().hex,
+ },
+ {
+ 'url': 'http://internal.%s:5000/v3' % endpoint,
+ 'region': 'RegionOne',
+ 'interface': 'internal',
+ 'id': uuid.uuid4().hex
+ },
+ {
+ 'url': 'http://public.%s:5000/v3' % endpoint,
+ 'region': 'RegionOne',
+ 'interface': 'public',
+ 'id': uuid.uuid4().hex
+ }
+ ]
+ }
+
+ # Domains
+ domain_dict = {'id': uuid.uuid4().hex,
+ 'name': 'domain',
+ 'description': '',
+ 'enabled': True}
+ test_data.domain = domains.Domain(domains.DomainManager(None),
+ domain_dict, loaded=True)
+
+ # Users
+ user_dict = {'id': uuid.uuid4().hex,
+ 'name': 'gabriel',
+ 'email': 'gabriel@example.com',
+ 'password': 'swordfish',
+ 'domain_id': domain_dict['id'],
+ 'token': '',
+ 'enabled': True}
+ test_data.user = users.User(users.UserManager(None),
+ user_dict, loaded=True)
+
+ # Projects
+ project_dict_1 = {'id': uuid.uuid4().hex,
+ 'name': 'tenant_one',
+ 'description': '',
+ 'domain_id': domain_dict['id'],
+ 'enabled': True}
+ project_dict_2 = {'id': uuid.uuid4().hex,
+ 'name': 'tenant_two',
+ 'description': '',
+ 'domain_id': domain_dict['id'],
+ 'enabled': False}
+ test_data.project_one = projects.Project(projects.ProjectManager(None),
+ project_dict_1,
+ loaded=True)
+ test_data.project_two = projects.Project(projects.ProjectManager(None),
+ project_dict_2,
+ loaded=True)
+
+ # Roles
+ role_dict = {'id': uuid.uuid4().hex,
+ 'name': 'Member'}
+ test_data.role = roles.Role(roles.RoleManager, role_dict)
+
+ nova_service = {
+ 'type': 'compute',
+ 'id': uuid.uuid4().hex,
+ 'endpoints': [
+ {
+ 'url': ('http://nova-admin.%s:8774/v2.0/%s'
+ % (endpoint, project_dict_1['id'])),
+ 'region': 'RegionOne',
+ 'interface': 'admin',
+ 'id': uuid.uuid4().hex,
+ },
+ {
+ 'url': ('http://nova-internal.%s:8774/v2.0/%s'
+ % (endpoint, project_dict_1['id'])),
+ 'region': 'RegionOne',
+ 'interface': 'internal',
+ 'id': uuid.uuid4().hex
+ },
+ {
+ 'url': ('http://nova-public.%s:8774/v2.0/%s'
+ % (endpoint, project_dict_1['id'])),
+ 'region': 'RegionOne',
+ 'interface': 'public',
+ 'id': uuid.uuid4().hex
+ },
+ {
+ 'url': ('http://nova2-admin.%s:8774/v2.0/%s'
+ % (endpoint, project_dict_1['id'])),
+ 'region': 'RegionTwo',
+ 'interface': 'admin',
+ 'id': uuid.uuid4().hex,
+ },
+ {
+ 'url': ('http://nova2-internal.%s:8774/v2.0/%s'
+ % (endpoint, project_dict_1['id'])),
+ 'region': 'RegionTwo',
+ 'interface': 'internal',
+ 'id': uuid.uuid4().hex
+ },
+ {
+ 'url': ('http://nova2-public.%s:8774/v2.0/%s'
+ % (endpoint, project_dict_1['id'])),
+ 'region': 'RegionTwo',
+ 'interface': 'public',
+ 'id': uuid.uuid4().hex
+ }
+ ]
+ }
+
+ # Tokens
+ tomorrow = datetime_safe.datetime.now() + datetime.timedelta(days=1)
+ expiration = datetime_safe.datetime.isoformat(tomorrow)
+ if pki:
+ # We don't need a real PKI token, but just the prefix to make the
+ # keystone client treat it as a PKI token
+ auth_token = cms.PKI_ASN1_PREFIX + uuid.uuid4().hex
+ else:
+ auth_token = uuid.uuid4().hex
+
+ auth_response_headers = {
+ 'X-Subject-Token': auth_token
+ }
+
+ auth_response = TestResponse({
+ "headers": auth_response_headers
+ })
+
+ scoped_token_dict = {
+ 'token': {
+ 'methods': ['password'],
+ 'expires_at': expiration,
+ 'project': {
+ 'id': project_dict_1['id'],
+ 'name': project_dict_1['name'],
+ 'domain': {
+ 'id': domain_dict['id'],
+ 'name': domain_dict['name']
+ }
+ },
+ 'user': {
+ 'id': user_dict['id'],
+ 'name': user_dict['name'],
+ 'domain': {
+ 'id': domain_dict['id'],
+ 'name': domain_dict['name']
+ }
+ },
+ 'roles': [role_dict],
+ 'catalog': [keystone_service, nova_service]
+ }
+ }
+
+ sp_list = None
+ if service_providers:
+ test_data.sp_auth_url = 'http://service_provider_endp:5000/v3'
+ test_data.service_provider_id = 'k2kserviceprovider'
+ # The access info for the identity provider
+ # should return a list of service providers
+ sp_list = [
+ {'auth_url': test_data.sp_auth_url,
+ 'id': test_data.service_provider_id,
+ 'sp_url': 'https://k2kserviceprovider/sp_url'}
+ ]
+ scoped_token_dict['token']['service_providers'] = sp_list
+
+ test_data.scoped_access_info = access.create(
+ resp=auth_response,
+ body=scoped_token_dict
+ )
+
+ domain_token_dict = {
+ 'token': {
+ 'methods': ['password'],
+ 'expires_at': expiration,
+ 'domain': {
+ 'id': domain_dict['id'],
+ 'name': domain_dict['name'],
+ },
+ 'user': {
+ 'id': user_dict['id'],
+ 'name': user_dict['name'],
+ 'domain': {
+ 'id': domain_dict['id'],
+ 'name': domain_dict['name']
+ }
+ },
+ 'roles': [role_dict],
+ 'catalog': [keystone_service, nova_service]
+ }
+ }
+ test_data.domain_scoped_access_info = access.create(
+ resp=auth_response,
+ body=domain_token_dict
+ )
+
+ unscoped_token_dict = {
+ 'token': {
+ 'methods': ['password'],
+ 'expires_at': expiration,
+ 'user': {
+ 'id': user_dict['id'],
+ 'name': user_dict['name'],
+ 'domain': {
+ 'id': domain_dict['id'],
+ 'name': domain_dict['name']
+ }
+ },
+ 'catalog': [keystone_service]
+ }
+ }
+
+ if service_providers:
+ unscoped_token_dict['token']['service_providers'] = sp_list
+
+ test_data.unscoped_access_info = access.create(
+ resp=auth_response,
+ body=unscoped_token_dict
+ )
+
+ # Service Catalog
+ test_data.service_catalog = service_catalog.ServiceCatalogV3(
+ [keystone_service, nova_service])
+
+ # federated user
+ federated_scoped_token_dict = {
+ 'token': {
+ 'methods': ['password'],
+ 'expires_at': expiration,
+ 'project': {
+ 'id': project_dict_1['id'],
+ 'name': project_dict_1['name'],
+ 'domain': {
+ 'id': domain_dict['id'],
+ 'name': domain_dict['name']
+ }
+ },
+ 'user': {
+ 'id': user_dict['id'],
+ 'name': user_dict['name'],
+ 'domain': {
+ 'id': domain_dict['id'],
+ 'name': domain_dict['name']
+ },
+ 'OS-FEDERATION': {
+ 'identity_provider': 'ACME',
+ 'protocol': 'OIDC',
+ 'groups': [
+ {'id': uuid.uuid4().hex},
+ {'id': uuid.uuid4().hex}
+ ]
+ }
+ },
+ 'roles': [role_dict],
+ 'catalog': [keystone_service, nova_service]
+ }
+ }
+
+ test_data.federated_scoped_access_info = access.create(
+ resp=auth_response,
+ body=federated_scoped_token_dict
+ )
+
+ federated_unscoped_token_dict = {
+ 'token': {
+ 'methods': ['password'],
+ 'expires_at': expiration,
+ 'user': {
+ 'id': user_dict['id'],
+ 'name': user_dict['name'],
+ 'domain': {
+ 'id': domain_dict['id'],
+ 'name': domain_dict['name']
+ },
+ 'OS-FEDERATION': {
+ 'identity_provider': 'ACME',
+ 'protocol': 'OIDC',
+ 'groups': [
+ {'id': uuid.uuid4().hex},
+ {'id': uuid.uuid4().hex}
+ ]
+ }
+ },
+ 'catalog': [keystone_service]
+ }
+ }
+
+ test_data.federated_unscoped_access_info = access.create(
+ resp=auth_response,
+ body=federated_unscoped_token_dict
+ )
+
+ return test_data
diff --git a/openstack_auth/tests/models.py b/openstack_auth/tests/models.py
new file mode 100644
index 000000000..e69de29bb
--- /dev/null
+++ b/openstack_auth/tests/models.py
diff --git a/openstack_auth/tests/run_tests.py b/openstack_auth/tests/run_tests.py
new file mode 100644
index 000000000..c25449101
--- /dev/null
+++ b/openstack_auth/tests/run_tests.py
@@ -0,0 +1,43 @@
+#!/usr/bin/env python
+
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os
+import sys
+
+import django
+from django.test.runner import DiscoverRunner as test_runner
+
+
+os.environ['DJANGO_SETTINGS_MODULE'] = 'openstack_auth.tests.settings'
+
+if hasattr(django, 'setup'):
+ django.setup()
+
+
+def run(*test_args):
+ if not test_args:
+ test_args = ['tests']
+ parent = os.path.join(
+ os.path.dirname(os.path.abspath(__file__)),
+ "..",
+ "..",
+ )
+ sys.path.insert(0, parent)
+ failures = test_runner().run_tests(test_args)
+ sys.exit(failures)
+
+
+if __name__ == '__main__':
+ run(*sys.argv[1:])
diff --git a/openstack_auth/tests/settings.py b/openstack_auth/tests/settings.py
new file mode 100644
index 000000000..1e750a278
--- /dev/null
+++ b/openstack_auth/tests/settings.py
@@ -0,0 +1,76 @@
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os
+
+ALLOWED_HOSTS = ['*']
+
+DATABASES = {'default': {'ENGINE': 'django.db.backends.sqlite3'}}
+
+INSTALLED_APPS = [
+ 'django',
+ 'django.contrib.contenttypes',
+ 'django.contrib.auth',
+ 'django.contrib.sessions',
+ 'django.contrib.messages',
+ 'openstack_auth',
+ 'openstack_auth.tests'
+]
+
+MIDDLEWARE_CLASSES = [
+ 'django.middleware.common.CommonMiddleware',
+ 'django.middleware.csrf.CsrfViewMiddleware',
+ 'django.contrib.sessions.middleware.SessionMiddleware',
+ 'django.contrib.auth.middleware.AuthenticationMiddleware',
+ 'django.contrib.messages.middleware.MessageMiddleware'
+]
+
+AUTHENTICATION_BACKENDS = ['openstack_auth.backend.KeystoneBackend']
+
+OPENSTACK_KEYSTONE_URL = "http://localhost:5000/v3"
+
+ROOT_URLCONF = 'openstack_auth.tests.urls'
+
+LOGIN_REDIRECT_URL = '/'
+
+SECRET_KEY = 'badcafe'
+
+OPENSTACK_API_VERSIONS = {
+ "identity": 3
+}
+
+USE_TZ = True
+
+OPENSTACK_KEYSTONE_MULTIDOMAIN_SUPPORT = False
+
+OPENSTACK_KEYSTONE_DEFAULT_DOMAIN = 'domain'
+
+# NOTE(saschpe): The openstack_auth.user.Token object isn't
+# JSON-serializable ATM
+SESSION_SERIALIZER = 'django.contrib.sessions.serializers.PickleSerializer'
+
+TEST_DIR = os.path.dirname(os.path.abspath(__file__))
+POLICY_FILES_PATH = os.path.join(TEST_DIR, "conf")
+POLICY_FILES = {
+ 'identity': 'keystone_policy.json',
+ 'compute': 'nova_policy.json'
+}
+
+TEMPLATES = [
+ {
+ 'BACKEND': 'django.template.backends.django.DjangoTemplates',
+ 'APP_DIRS': True,
+ },
+]
+
+AUTH_USER_MODEL = 'openstack_auth.User'
diff --git a/openstack_auth/tests/templates/auth/blank.html b/openstack_auth/tests/templates/auth/blank.html
new file mode 100644
index 000000000..e69de29bb
--- /dev/null
+++ b/openstack_auth/tests/templates/auth/blank.html
diff --git a/openstack_auth/tests/templates/auth/login.html b/openstack_auth/tests/templates/auth/login.html
new file mode 100644
index 000000000..eac9a9c1f
--- /dev/null
+++ b/openstack_auth/tests/templates/auth/login.html
@@ -0,0 +1,11 @@
+<!DOCTYPE html>
+<html>
+ <head>
+ <title>Login</title>
+ </head>
+ <body>
+ <form action="." method="POST">{{ csrf_token }}
+ {{ form.as_p }}
+ </form>
+ </body>
+</html> \ No newline at end of file
diff --git a/openstack_auth/tests/tests.py b/openstack_auth/tests/tests.py
new file mode 100644
index 000000000..78edc8fab
--- /dev/null
+++ b/openstack_auth/tests/tests.py
@@ -0,0 +1,1522 @@
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import uuid
+
+import django
+from django.conf import settings
+from django.contrib import auth
+from django.core.urlresolvers import reverse
+from django import http
+from django import test
+from django.test.utils import override_settings
+from keystoneauth1 import exceptions as keystone_exceptions
+from keystoneauth1.identity import v2 as v2_auth
+from keystoneauth1.identity import v3 as v3_auth
+from keystoneauth1 import session
+from keystoneauth1 import token_endpoint
+from keystoneclient.v2_0 import client as client_v2
+from keystoneclient.v3 import client as client_v3
+import mock
+from mox3 import mox
+from testscenarios import load_tests_apply_scenarios
+
+from openstack_auth import policy
+from openstack_auth.tests import data_v2
+from openstack_auth.tests import data_v3
+from openstack_auth import user
+from openstack_auth import utils
+
+
+DEFAULT_DOMAIN = settings.OPENSTACK_KEYSTONE_DEFAULT_DOMAIN
+
+
+class OpenStackAuthTestsMixin(object):
+ '''Common functions for version specific tests.'''
+
+ scenarios = [
+ ('pure', {'interface': None}),
+ ('public', {'interface': 'publicURL'}),
+ ('internal', {'interface': 'internalURL'}),
+ ('admin', {'interface': 'adminURL'})
+ ]
+
+ def _mock_unscoped_client(self, user):
+ plugin = self._create_password_auth()
+ plugin.get_access(mox.IsA(session.Session)). \
+ AndReturn(self.data.unscoped_access_info)
+ plugin.auth_url = settings.OPENSTACK_KEYSTONE_URL
+ return self.ks_client_module.Client(session=mox.IsA(session.Session),
+ auth=plugin)
+
+ def _mock_unscoped_client_with_token(self, user, unscoped):
+ plugin = token_endpoint.Token(settings.OPENSTACK_KEYSTONE_URL,
+ unscoped.auth_token)
+ return self.ks_client_module.Client(session=mox.IsA(session.Session),
+ auth=plugin)
+
+ def _mock_client_token_auth_failure(self, unscoped, tenant_id):
+ plugin = self._create_token_auth(tenant_id, unscoped.auth_token)
+ plugin.get_access(mox.IsA(session.Session)). \
+ AndRaise(keystone_exceptions.AuthorizationFailure)
+
+ def _mock_client_password_auth_failure(self, username, password, exc):
+ plugin = self._create_password_auth(username=username,
+ password=password)
+ plugin.get_access(mox.IsA(session.Session)).AndRaise(exc)
+
+ def _mock_scoped_client_for_tenant(self, auth_ref, tenant_id, url=None,
+ client=True, token=None):
+ if url is None:
+ url = settings.OPENSTACK_KEYSTONE_URL
+
+ if not token:
+ token = self.data.unscoped_access_info.auth_token
+
+ plugin = self._create_token_auth(
+ tenant_id,
+ token=token,
+ url=url)
+ self.scoped_token_auth = plugin
+ plugin.get_access(mox.IsA(session.Session)).AndReturn(auth_ref)
+ if client:
+ return self.ks_client_module.Client(
+ session=mox.IsA(session.Session),
+ auth=plugin)
+
+ def get_form_data(self, user):
+ return {'region': settings.OPENSTACK_KEYSTONE_URL,
+ 'domain': DEFAULT_DOMAIN,
+ 'password': user.password,
+ 'username': user.name}
+
+
+class OpenStackAuthFederatedTestsMixin(object):
+ """Common functions for federation"""
+ def _mock_unscoped_federated_list_projects(self, client, projects):
+ client.federation = self.mox.CreateMockAnything()
+ client.federation.projects = self.mox.CreateMockAnything()
+ client.federation.projects.list().AndReturn(projects)
+
+ def _mock_unscoped_list_domains(self, client, domains):
+ client.auth = self.mox.CreateMockAnything()
+ client.auth.domains().AndReturn(domains)
+
+ def _mock_unscoped_token_client(self, unscoped, auth_url=None,
+ client=True, plugin=None):
+ if not auth_url:
+ auth_url = settings.OPENSTACK_KEYSTONE_URL
+ if unscoped and not plugin:
+ plugin = self._create_token_auth(
+ None,
+ token=unscoped.auth_token,
+ url=auth_url)
+ plugin.get_access(mox.IsA(session.Session)).AndReturn(unscoped)
+ plugin.auth_url = auth_url
+ if client:
+ return self.ks_client_module.Client(
+ session=mox.IsA(session.Session),
+ auth=plugin)
+
+ def _mock_plugin(self, unscoped, auth_url=None):
+ if not auth_url:
+ auth_url = settings.OPENSTACK_KEYSTONE_URL
+ plugin = self._create_token_auth(
+ None,
+ token=unscoped.auth_token,
+ url=auth_url)
+ plugin.get_access(mox.IsA(session.Session)).AndReturn(unscoped)
+ plugin.auth_url = settings.OPENSTACK_KEYSTONE_URL
+ return plugin
+
+ def _mock_federated_client_list_projects(self, unscoped_auth, projects):
+ client = self._mock_unscoped_token_client(None, plugin=unscoped_auth)
+ self._mock_unscoped_federated_list_projects(client, projects)
+
+ def _mock_federated_client_list_domains(self, unscoped_auth, domains):
+ client = self._mock_unscoped_token_client(None, plugin=unscoped_auth)
+ self._mock_unscoped_list_domains(client, domains)
+
+
+class OpenStackAuthTestsV2(OpenStackAuthTestsMixin, test.TestCase):
+
+ def setUp(self):
+ super(OpenStackAuthTestsV2, self).setUp()
+
+ if getattr(self, 'interface', None):
+ override = self.settings(OPENSTACK_ENDPOINT_TYPE=self.interface)
+ override.enable()
+ self.addCleanup(override.disable)
+
+ self.mox = mox.Mox()
+ self.addCleanup(self.mox.VerifyAll)
+ self.addCleanup(self.mox.UnsetStubs)
+
+ self.data = data_v2.generate_test_data()
+ self.ks_client_module = client_v2
+
+ settings.OPENSTACK_API_VERSIONS['identity'] = 2.0
+ settings.OPENSTACK_KEYSTONE_URL = "http://localhost:5000/v2.0"
+
+ self.mox.StubOutClassWithMocks(token_endpoint, 'Token')
+ self.mox.StubOutClassWithMocks(v2_auth, 'Token')
+ self.mox.StubOutClassWithMocks(v2_auth, 'Password')
+ self.mox.StubOutClassWithMocks(client_v2, 'Client')
+
+ def _mock_unscoped_list_tenants(self, client, tenants):
+ client.tenants = self.mox.CreateMockAnything()
+ client.tenants.list().AndReturn(tenants)
+
+ def _mock_unscoped_client_list_tenants(self, user, tenants):
+ client = self._mock_unscoped_client(user)
+ self._mock_unscoped_list_tenants(client, tenants)
+
+ def _create_password_auth(self, username=None, password=None, url=None):
+ if not username:
+ username = self.data.user.name
+
+ if not password:
+ password = self.data.user.password
+
+ if not url:
+ url = settings.OPENSTACK_KEYSTONE_URL
+
+ return v2_auth.Password(auth_url=url,
+ password=password,
+ username=username)
+
+ def _create_token_auth(self, project_id, token=None, url=None):
+ if not token:
+ token = self.data.unscoped_access_info.auth_token
+
+ if not url:
+ url = settings.OPENSTACK_KEYSTONE_URL
+
+ return v2_auth.Token(auth_url=url,
+ token=token,
+ tenant_id=project_id,
+ reauthenticate=False)
+
+ def _login(self):
+ tenants = [self.data.tenant_one, self.data.tenant_two]
+ user = self.data.user
+ unscoped = self.data.unscoped_access_info
+
+ form_data = self.get_form_data(user)
+ self._mock_unscoped_client_list_tenants(user, tenants)
+ self._mock_scoped_client_for_tenant(unscoped, self.data.tenant_one.id)
+
+ self.mox.ReplayAll()
+
+ url = reverse('login')
+
+ # GET the page to set the test cookie.
+ response = self.client.get(url, form_data)
+ self.assertEqual(response.status_code, 200)
+
+ # POST to the page to log in.
+ response = self.client.post(url, form_data)
+ self.assertRedirects(response, settings.LOGIN_REDIRECT_URL)
+
+ def test_login(self):
+ self._login()
+
+ def test_login_with_disabled_tenant(self):
+ # Test to validate that authentication will not try to get
+ # scoped token for disabled project.
+ tenants = [self.data.tenant_two, self.data.tenant_one]
+ user = self.data.user
+ unscoped = self.data.unscoped_access_info
+
+ form_data = self.get_form_data(user)
+ self._mock_unscoped_client_list_tenants(user, tenants)
+ self._mock_scoped_client_for_tenant(unscoped, self.data.tenant_one.id)
+ self.mox.ReplayAll()
+
+ url = reverse('login')
+
+ # GET the page to set the test cookie.
+ response = self.client.get(url, form_data)
+ self.assertEqual(response.status_code, 200)
+
+ # POST to the page to log in.
+ response = self.client.post(url, form_data)
+ self.assertRedirects(response, settings.LOGIN_REDIRECT_URL)
+
+ def test_login_w_bad_region_cookie(self):
+ self.client.cookies['services_region'] = "bad_region"
+ self._login()
+ self.assertNotEqual("bad_region",
+ self.client.session['services_region'])
+ self.assertEqual("RegionOne",
+ self.client.session['services_region'])
+
+ def test_no_enabled_tenants(self):
+ tenants = [self.data.tenant_two]
+ user = self.data.user
+
+ form_data = self.get_form_data(user)
+ self._mock_unscoped_client_list_tenants(user, tenants)
+ self.mox.ReplayAll()
+
+ url = reverse('login')
+
+ # GET the page to set the test cookie.
+ response = self.client.get(url, form_data)
+ self.assertEqual(response.status_code, 200)
+
+ # POST to the page to log in.
+ response = self.client.post(url, form_data)
+ self.assertTemplateUsed(response, 'auth/login.html')
+ self.assertContains(response,
+ 'You are not authorized for any projects.')
+
+ def test_no_tenants(self):
+ user = self.data.user
+
+ form_data = self.get_form_data(user)
+ self._mock_unscoped_client_list_tenants(user, [])
+
+ self.mox.ReplayAll()
+
+ url = reverse('login')
+
+ # GET the page to set the test cookie.
+ response = self.client.get(url, form_data)
+ self.assertEqual(response.status_code, 200)
+
+ # POST to the page to log in.
+ response = self.client.post(url, form_data)
+ self.assertTemplateUsed(response, 'auth/login.html')
+ self.assertContains(response,
+ 'You are not authorized for any projects.')
+
+ def test_invalid_credentials(self):
+ user = self.data.user
+
+ form_data = self.get_form_data(user)
+ form_data['password'] = "invalid"
+
+ exc = keystone_exceptions.Unauthorized(401)
+ self._mock_client_password_auth_failure(user.name, "invalid", exc)
+
+ self.mox.ReplayAll()
+
+ url = reverse('login')
+
+ # GET the page to set the test cookie.
+ response = self.client.get(url, form_data)
+ self.assertEqual(response.status_code, 200)
+
+ # POST to the page to log in.
+ response = self.client.post(url, form_data)
+ self.assertTemplateUsed(response, 'auth/login.html')
+ self.assertContains(response, "Invalid credentials.")
+
+ def test_exception(self):
+ user = self.data.user
+
+ form_data = self.get_form_data(user)
+ exc = keystone_exceptions.ClientException(500)
+ self._mock_client_password_auth_failure(user.name, user.password, exc)
+ self.mox.ReplayAll()
+
+ url = reverse('login')
+
+ # GET the page to set the test cookie.
+ response = self.client.get(url, form_data)
+ self.assertEqual(response.status_code, 200)
+
+ # POST to the page to log in.
+ response = self.client.post(url, form_data)
+
+ self.assertTemplateUsed(response, 'auth/login.html')
+ self.assertContains(response,
+ ("An error occurred authenticating. Please try "
+ "again later."))
+
+ def test_redirect_when_already_logged_in(self):
+ self._login()
+
+ response = self.client.get(reverse('login'))
+ self.assertEqual(response.status_code, 302)
+ self.assertNotIn(reverse('login'), response['location'])
+
+ def test_dont_redirect_when_already_logged_in_if_next_is_set(self):
+ self._login()
+
+ expected_url = "%s?%s=/%s/" % (reverse('login'),
+ auth.REDIRECT_FIELD_NAME,
+ 'special')
+
+ response = self.client.get(expected_url)
+ self.assertEqual(response.status_code, 200)
+ self.assertTemplateUsed(response, 'auth/login.html')
+
+ def test_switch(self, next=None):
+ tenant = self.data.tenant_two
+ tenants = [self.data.tenant_one, self.data.tenant_two]
+ user = self.data.user
+ unscoped = self.data.unscoped_access_info
+ scoped = self.data.scoped_access_info
+ sc = self.data.service_catalog
+ et = getattr(settings, 'OPENSTACK_ENDPOINT_TYPE', 'publicURL')
+ endpoint = sc.url_for(service_type='identity', interface=et)
+
+ form_data = self.get_form_data(user)
+
+ self._mock_unscoped_client_list_tenants(user, tenants)
+ self._mock_scoped_client_for_tenant(unscoped, self.data.tenant_one.id)
+ self._mock_scoped_client_for_tenant(scoped, tenant.id, url=endpoint,
+ client=False)
+
+ self.mox.ReplayAll()
+
+ url = reverse('login')
+
+ response = self.client.get(url)
+ self.assertEqual(response.status_code, 200)
+
+ response = self.client.post(url, form_data)
+ self.assertRedirects(response, settings.LOGIN_REDIRECT_URL)
+
+ url = reverse('switch_tenants', args=[tenant.id])
+
+ scoped._token['tenant']['id'] = self.data.tenant_two.id
+
+ if next:
+ form_data.update({auth.REDIRECT_FIELD_NAME: next})
+
+ response = self.client.get(url, form_data)
+
+ if next:
+ if django.VERSION >= (1, 9):
+ expected_url = next
+ else:
+ expected_url = 'http://testserver%s' % next
+ self.assertEqual(response['location'], expected_url)
+ else:
+ self.assertRedirects(response, settings.LOGIN_REDIRECT_URL)
+
+ self.assertEqual(self.client.session['token'].tenant['id'],
+ scoped.tenant_id)
+
+ def test_switch_with_next(self):
+ self.test_switch(next='/next_url')
+
+ def test_switch_region(self, next=None):
+ tenants = [self.data.tenant_one, self.data.tenant_two]
+ user = self.data.user
+ scoped = self.data.scoped_access_info
+ sc = self.data.service_catalog
+
+ form_data = self.get_form_data(user)
+
+ self._mock_unscoped_client_list_tenants(user, tenants)
+ self._mock_scoped_client_for_tenant(scoped, self.data.tenant_one.id)
+
+ self.mox.ReplayAll()
+
+ url = reverse('login')
+
+ response = self.client.get(url)
+ self.assertEqual(response.status_code, 200)
+
+ response = self.client.post(url, form_data)
+ self.assertRedirects(response, settings.LOGIN_REDIRECT_URL)
+
+ old_region = sc.get_endpoints()['compute'][0]['region']
+ self.assertEqual(self.client.session['services_region'], old_region)
+
+ region = sc.get_endpoints()['compute'][1]['region']
+ url = reverse('switch_services_region', args=[region])
+
+ form_data['region_name'] = region
+
+ if next:
+ form_data.update({auth.REDIRECT_FIELD_NAME: next})
+
+ response = self.client.get(url, form_data)
+
+ if next:
+ if django.VERSION >= (1, 9):
+ expected_url = next
+ else:
+ expected_url = 'http://testserver%s' % next
+ self.assertEqual(response['location'], expected_url)
+ else:
+ self.assertRedirects(response, settings.LOGIN_REDIRECT_URL)
+
+ self.assertEqual(self.client.session['services_region'], region)
+ self.assertEqual(self.client.cookies['services_region'].value, region)
+
+ def test_switch_region_with_next(self, next=None):
+ self.test_switch_region(next='/next_url')
+
+ def test_tenant_sorting(self):
+ tenants = [self.data.tenant_two, self.data.tenant_one]
+ expected_tenants = [self.data.tenant_one, self.data.tenant_two]
+ user = self.data.user
+ unscoped = self.data.unscoped_access_info
+
+ client = self._mock_unscoped_client_with_token(user, unscoped)
+ self._mock_unscoped_list_tenants(client, tenants)
+
+ self.mox.ReplayAll()
+
+ tenant_list = utils.get_project_list(
+ user_id=user.id,
+ auth_url=settings.OPENSTACK_KEYSTONE_URL,
+ token=unscoped.auth_token)
+ self.assertEqual(tenant_list, expected_tenants)
+
+
+class OpenStackAuthTestsV3(OpenStackAuthTestsMixin,
+ OpenStackAuthFederatedTestsMixin,
+ test.TestCase):
+
+ def _mock_unscoped_client_list_projects(self, user, projects):
+ client = self._mock_unscoped_client(user)
+ self._mock_unscoped_list_projects(client, user, projects)
+
+ def _mock_unscoped_list_projects(self, client, user, projects):
+ client.projects = self.mox.CreateMockAnything()
+ client.projects.list(user=user.id).AndReturn(projects)
+
+ def _mock_unscoped_client_list_projects_fail(self, user):
+ client = self._mock_unscoped_client(user)
+ self._mock_unscoped_list_projects_fail(client, user)
+
+ def _mock_unscoped_list_projects_fail(self, client, user):
+ plugin = self._create_token_auth(
+ project_id=None,
+ domain_name=DEFAULT_DOMAIN,
+ token=self.data.unscoped_access_info.auth_token,
+ url=settings.OPENSTACK_KEYSTONE_URL)
+ plugin.get_access(mox.IsA(session.Session)).AndReturn(
+ self.data.domain_scoped_access_info)
+ client.projects = self.mox.CreateMockAnything()
+ client.projects.list(user=user.id).AndRaise(
+ keystone_exceptions.AuthorizationFailure)
+
+ def _mock_unscoped_and_domain_list_projects(self, user, projects):
+ client = self._mock_unscoped_client(user)
+ self._mock_scoped_for_domain(projects)
+ self._mock_unscoped_list_projects(client, user, projects)
+
+ def _mock_scoped_for_domain(self, projects):
+ url = settings.OPENSTACK_KEYSTONE_URL
+
+ plugin = self._create_token_auth(
+ project_id=None,
+ domain_name=DEFAULT_DOMAIN,
+ token=self.data.unscoped_access_info.auth_token,
+ url=url)
+
+ plugin.get_access(mox.IsA(session.Session)).AndReturn(
+ self.data.domain_scoped_access_info)
+
+ # if no projects or no enabled projects for user, but domain scoped
+ # token client auth gets set to domain scoped auth otherwise it's set
+ # to the project scoped auth and that happens in a different mock
+ enabled_projects = [project for project in projects if project.enabled]
+ if not projects or not enabled_projects:
+ return self.ks_client_module.Client(
+ session=mox.IsA(session.Session),
+ auth=plugin)
+
+ def _create_password_auth(self, username=None, password=None, url=None):
+ if not username:
+ username = self.data.user.name
+
+ if not password:
+ password = self.data.user.password
+
+ if not url:
+ url = settings.OPENSTACK_KEYSTONE_URL
+
+ return v3_auth.Password(auth_url=url,
+ password=password,
+ username=username,
+ user_domain_name=DEFAULT_DOMAIN,
+ unscoped=True)
+
+ def _create_token_auth(self, project_id, token=None, url=None,
+ domain_name=None):
+ if not token:
+ token = self.data.unscoped_access_info.auth_token
+
+ if not url:
+ url = settings.OPENSTACK_KEYSTONE_URL
+
+ if domain_name:
+ return v3_auth.Token(auth_url=url,
+ token=token,
+ domain_name=domain_name,
+ reauthenticate=False)
+ else:
+ return v3_auth.Token(auth_url=url,
+ token=token,
+ project_id=project_id,
+ reauthenticate=False)
+
+ def setUp(self):
+ super(OpenStackAuthTestsV3, self).setUp()
+
+ if getattr(self, 'interface', None):
+ override = self.settings(OPENSTACK_ENDPOINT_TYPE=self.interface)
+ override.enable()
+ self.addCleanup(override.disable)
+
+ self.mox = mox.Mox()
+ self.addCleanup(self.mox.VerifyAll)
+ self.addCleanup(self.mox.UnsetStubs)
+
+ self.data = data_v3.generate_test_data()
+ self.ks_client_module = client_v3
+ settings.OPENSTACK_API_VERSIONS['identity'] = 3
+ settings.OPENSTACK_KEYSTONE_URL = "http://localhost:5000/v3"
+
+ self.mox.StubOutClassWithMocks(token_endpoint, 'Token')
+ self.mox.StubOutClassWithMocks(v3_auth, 'Token')
+ self.mox.StubOutClassWithMocks(v3_auth, 'Password')
+ self.mox.StubOutClassWithMocks(client_v3, 'Client')
+ self.mox.StubOutClassWithMocks(v3_auth, 'Keystone2Keystone')
+
+ def test_login(self):
+ projects = [self.data.project_one, self.data.project_two]
+ user = self.data.user
+ unscoped = self.data.unscoped_access_info
+
+ form_data = self.get_form_data(user)
+ self._mock_unscoped_and_domain_list_projects(user, projects)
+ self._mock_scoped_client_for_tenant(unscoped, self.data.project_one.id)
+
+ self.mox.ReplayAll()
+
+ url = reverse('login')
+
+ # GET the page to set the test cookie.
+ response = self.client.get(url, form_data)
+ self.assertEqual(response.status_code, 200)
+
+ # POST to the page to log in.
+ response = self.client.post(url, form_data)
+ self.assertRedirects(response, settings.LOGIN_REDIRECT_URL)
+
+ def test_login_with_disabled_project(self):
+ # Test to validate that authentication will not try to get
+ # scoped token for disabled project.
+ projects = [self.data.project_two, self.data.project_one]
+ user = self.data.user
+ unscoped = self.data.unscoped_access_info
+
+ form_data = self.get_form_data(user)
+ self._mock_unscoped_and_domain_list_projects(user, projects)
+ self._mock_scoped_client_for_tenant(unscoped, self.data.project_one.id)
+ self.mox.ReplayAll()
+
+ url = reverse('login')
+
+ # GET the page to set the test cookie.
+ response = self.client.get(url, form_data)
+ self.assertEqual(response.status_code, 200)
+
+ # POST to the page to log in.
+ response = self.client.post(url, form_data)
+ self.assertRedirects(response, settings.LOGIN_REDIRECT_URL)
+
+ def test_no_enabled_projects(self):
+ projects = [self.data.project_two]
+ user = self.data.user
+
+ form_data = self.get_form_data(user)
+
+ self._mock_unscoped_and_domain_list_projects(user, projects)
+ self.mox.ReplayAll()
+
+ url = reverse('login')
+
+ # GET the page to set the test cookie.
+ response = self.client.get(url, form_data)
+ self.assertEqual(response.status_code, 200)
+
+ # POST to the page to log in.
+ response = self.client.post(url, form_data)
+ self.assertRedirects(response, settings.LOGIN_REDIRECT_URL)
+
+ def test_no_projects(self):
+ user = self.data.user
+ form_data = self.get_form_data(user)
+
+ self._mock_unscoped_and_domain_list_projects(user, [])
+ self.mox.ReplayAll()
+
+ url = reverse('login')
+
+ # GET the page to set the test cookie.
+ response = self.client.get(url, form_data)
+ self.assertEqual(response.status_code, 200)
+
+ # POST to the page to log in.
+ response = self.client.post(url, form_data)
+ self.assertRedirects(response, settings.LOGIN_REDIRECT_URL)
+
+ def test_fail_projects(self):
+ user = self.data.user
+
+ form_data = self.get_form_data(user)
+ self._mock_unscoped_client_list_projects_fail(user)
+ self.mox.ReplayAll()
+
+ url = reverse('login')
+
+ # GET the page to set the test cookie.
+ response = self.client.get(url, form_data)
+ self.assertEqual(response.status_code, 200)
+
+ # POST to the page to log in.
+ response = self.client.post(url, form_data)
+ self.assertTemplateUsed(response, 'auth/login.html')
+ self.assertContains(response,
+ 'Unable to retrieve authorized projects.')
+
+ def test_invalid_credentials(self):
+ user = self.data.user
+
+ form_data = self.get_form_data(user)
+
+ form_data['password'] = "invalid"
+
+ exc = keystone_exceptions.Unauthorized(401)
+ self._mock_client_password_auth_failure(user.name, "invalid", exc)
+
+ self.mox.ReplayAll()
+
+ url = reverse('login')
+
+ # GET the page to set the test cookie.
+ response = self.client.get(url, form_data)
+ self.assertEqual(response.status_code, 200)
+
+ # POST to the page to log in.
+ response = self.client.post(url, form_data)
+ self.assertTemplateUsed(response, 'auth/login.html')
+ self.assertContains(response, "Invalid credentials.")
+
+ def test_exception(self):
+ user = self.data.user
+ form_data = self.get_form_data(user)
+ exc = keystone_exceptions.ClientException(500)
+ self._mock_client_password_auth_failure(user.name, user.password, exc)
+ self.mox.ReplayAll()
+
+ url = reverse('login')
+
+ # GET the page to set the test cookie.
+ response = self.client.get(url, form_data)
+ self.assertEqual(response.status_code, 200)
+
+ # POST to the page to log in.
+ response = self.client.post(url, form_data)
+
+ self.assertTemplateUsed(response, 'auth/login.html')
+ self.assertContains(response,
+ ("An error occurred authenticating. Please try "
+ "again later."))
+
+ def test_switch(self, next=None):
+ project = self.data.project_two
+ projects = [self.data.project_one, self.data.project_two]
+ user = self.data.user
+ scoped = self.data.scoped_access_info
+ sc = self.data.service_catalog
+ et = getattr(settings, 'OPENSTACK_ENDPOINT_TYPE', 'publicURL')
+
+ form_data = self.get_form_data(user)
+
+ self._mock_unscoped_and_domain_list_projects(user, projects)
+ self._mock_scoped_client_for_tenant(scoped, self.data.project_one.id)
+ self._mock_scoped_client_for_tenant(
+ scoped,
+ project.id,
+ url=sc.url_for(service_type='identity', interface=et),
+ client=False)
+
+ self.mox.ReplayAll()
+
+ url = reverse('login')
+
+ response = self.client.get(url)
+ self.assertEqual(response.status_code, 200)
+
+ response = self.client.post(url, form_data)
+ self.assertRedirects(response, settings.LOGIN_REDIRECT_URL)
+
+ url = reverse('switch_tenants', args=[project.id])
+
+ scoped._project['id'] = self.data.project_two.id
+
+ if next:
+ form_data.update({auth.REDIRECT_FIELD_NAME: next})
+
+ response = self.client.get(url, form_data)
+
+ if next:
+ if django.VERSION >= (1, 9):
+ expected_url = next
+ else:
+ expected_url = 'http://testserver%s' % next
+ self.assertEqual(response['location'], expected_url)
+ else:
+ self.assertRedirects(response, settings.LOGIN_REDIRECT_URL)
+
+ self.assertEqual(self.client.session['token'].project['id'],
+ scoped.project_id)
+
+ def test_switch_with_next(self):
+ self.test_switch(next='/next_url')
+
+ def test_switch_region(self, next=None):
+ projects = [self.data.project_one, self.data.project_two]
+ user = self.data.user
+ scoped = self.data.unscoped_access_info
+ sc = self.data.service_catalog
+
+ form_data = self.get_form_data(user)
+ self._mock_unscoped_and_domain_list_projects(user, projects)
+ self._mock_scoped_client_for_tenant(scoped, self.data.project_one.id)
+
+ self.mox.ReplayAll()
+
+ url = reverse('login')
+
+ response = self.client.get(url)
+ self.assertEqual(response.status_code, 200)
+
+ response = self.client.post(url, form_data)
+ self.assertRedirects(response, settings.LOGIN_REDIRECT_URL)
+
+ old_region = sc.get_endpoints()['compute'][0]['region']
+ self.assertEqual(self.client.session['services_region'], old_region)
+
+ region = sc.get_endpoints()['compute'][1]['region']
+ url = reverse('switch_services_region', args=[region])
+
+ form_data['region_name'] = region
+
+ if next:
+ form_data.update({auth.REDIRECT_FIELD_NAME: next})
+
+ response = self.client.get(url, form_data)
+
+ if next:
+ if django.VERSION >= (1, 9):
+ expected_url = next
+ else:
+ expected_url = 'http://testserver%s' % next
+ self.assertEqual(response['location'], expected_url)
+ else:
+ self.assertRedirects(response, settings.LOGIN_REDIRECT_URL)
+
+ self.assertEqual(self.client.session['services_region'], region)
+
+ def test_switch_region_with_next(self, next=None):
+ self.test_switch_region(next='/next_url')
+
+ def test_switch_keystone_provider_remote_fail(self):
+ auth_url = settings.OPENSTACK_KEYSTONE_URL
+ target_provider = 'k2kserviceprovider'
+ self.data = data_v3.generate_test_data(service_providers=True)
+ self.sp_data = data_v3.generate_test_data(endpoint='http://sp2')
+ projects = [self.data.project_one, self.data.project_two]
+ user = self.data.user
+ unscoped = self.data.unscoped_access_info
+ form_data = self.get_form_data(user)
+
+ # mock authenticate
+ self._mock_unscoped_and_domain_list_projects(user, projects)
+ self._mock_scoped_client_for_tenant(unscoped, self.data.project_one.id)
+
+ # mock switch
+ plugin = v3_auth.Token(auth_url=auth_url,
+ token=unscoped.auth_token,
+ project_id=None,
+ reauthenticate=False)
+ plugin.get_access(mox.IsA(session.Session)
+ ).AndReturn(self.data.unscoped_access_info)
+ plugin.auth_url = auth_url
+ client = self.ks_client_module.Client(session=mox.IsA(session.Session),
+ auth=plugin)
+
+ self._mock_unscoped_list_projects(client, user, projects)
+ plugin = self._create_token_auth(
+ self.data.project_one.id,
+ token=self.data.unscoped_access_info.auth_token,
+ url=settings.OPENSTACK_KEYSTONE_URL)
+ plugin.get_access(mox.IsA(session.Session)).AndReturn(
+ settings.OPENSTACK_KEYSTONE_URL)
+ plugin.get_sp_auth_url(
+ mox.IsA(session.Session), target_provider
+ ).AndReturn('https://k2kserviceprovider/sp_url')
+
+ # let the K2K plugin fail when logging in
+ plugin = v3_auth.Keystone2Keystone(
+ base_plugin=plugin, service_provider=target_provider)
+ plugin.get_access(mox.IsA(session.Session)).AndRaise(
+ keystone_exceptions.AuthorizationFailure)
+ self.mox.ReplayAll()
+
+ # Log in
+ url = reverse('login')
+ response = self.client.get(url)
+ self.assertEqual(response.status_code, 200)
+
+ response = self.client.post(url, form_data)
+ self.assertRedirects(response, settings.LOGIN_REDIRECT_URL)
+
+ # Switch
+ url = reverse('switch_keystone_provider', args=[target_provider])
+ form_data['keystone_provider'] = target_provider
+ response = self.client.get(url, form_data, follow=True)
+ self.assertRedirects(response, settings.LOGIN_REDIRECT_URL)
+
+ # Assert that provider has not changed because of failure
+ self.assertEqual(self.client.session['keystone_provider_id'],
+ 'localkeystone')
+ # These should never change
+ self.assertEqual(self.client.session['k2k_base_unscoped_token'],
+ unscoped.auth_token)
+ self.assertEqual(self.client.session['k2k_auth_url'], auth_url)
+
+ def test_switch_keystone_provider_remote(self):
+ auth_url = settings.OPENSTACK_KEYSTONE_URL
+ target_provider = 'k2kserviceprovider'
+ self.data = data_v3.generate_test_data(service_providers=True)
+ self.sp_data = data_v3.generate_test_data(endpoint='http://sp2')
+ projects = [self.data.project_one, self.data.project_two]
+ domains = []
+ user = self.data.user
+ unscoped = self.data.unscoped_access_info
+ form_data = self.get_form_data(user)
+
+ # mock authenticate
+ self._mock_unscoped_and_domain_list_projects(user, projects)
+ self._mock_scoped_client_for_tenant(unscoped, self.data.project_one.id)
+
+ # mock switch
+ plugin = v3_auth.Token(auth_url=auth_url,
+ token=unscoped.auth_token,
+ project_id=None,
+ reauthenticate=False)
+ plugin.get_access(mox.IsA(session.Session)).AndReturn(
+ self.data.unscoped_access_info)
+
+ plugin.auth_url = auth_url
+ client = self.ks_client_module.Client(session=mox.IsA(session.Session),
+ auth=plugin)
+
+ self._mock_unscoped_list_projects(client, user, projects)
+ plugin = self._create_token_auth(
+ self.data.project_one.id,
+ token=self.data.unscoped_access_info.auth_token,
+ url=settings.OPENSTACK_KEYSTONE_URL)
+ plugin.get_access(mox.IsA(session.Session)).AndReturn(
+ settings.OPENSTACK_KEYSTONE_URL)
+
+ plugin.get_sp_auth_url(
+ mox.IsA(session.Session), target_provider
+ ).AndReturn('https://k2kserviceprovider/sp_url')
+ plugin = v3_auth.Keystone2Keystone(base_plugin=plugin,
+ service_provider=target_provider)
+ plugin.get_access(mox.IsA(session.Session)). \
+ AndReturn(self.sp_data.unscoped_access_info)
+ plugin.auth_url = 'http://service_provider_endp:5000/v3'
+
+ # mock authenticate for service provider
+ sp_projects = [self.sp_data.project_one, self.sp_data.project_two]
+ sp_unscoped = self.sp_data.federated_unscoped_access_info
+ sp_unscoped_auth = self._mock_plugin(sp_unscoped,
+ auth_url=plugin.auth_url)
+ client = self._mock_unscoped_token_client(None, plugin.auth_url,
+ plugin=sp_unscoped_auth)
+ self._mock_unscoped_list_domains(client, domains)
+ client = self._mock_unscoped_token_client(None, plugin.auth_url,
+ plugin=sp_unscoped_auth)
+ self._mock_unscoped_federated_list_projects(client, sp_projects)
+ self._mock_scoped_client_for_tenant(sp_unscoped,
+ self.sp_data.project_one.id,
+ url=plugin.auth_url,
+ token=sp_unscoped.auth_token)
+
+ self.mox.ReplayAll()
+
+ # Log in
+ url = reverse('login')
+ response = self.client.get(url)
+ self.assertEqual(response.status_code, 200)
+
+ response = self.client.post(url, form_data)
+ self.assertRedirects(response, settings.LOGIN_REDIRECT_URL)
+
+ # Switch
+ url = reverse('switch_keystone_provider', args=[target_provider])
+ form_data['keystone_provider'] = target_provider
+ response = self.client.get(url, form_data, follow=True)
+ self.assertRedirects(response, settings.LOGIN_REDIRECT_URL)
+
+ # Assert keystone provider has changed
+ self.assertEqual(self.client.session['keystone_provider_id'],
+ target_provider)
+ # These should not change
+ self.assertEqual(self.client.session['k2k_base_unscoped_token'],
+ unscoped.auth_token)
+ self.assertEqual(self.client.session['k2k_auth_url'], auth_url)
+
+ def test_switch_keystone_provider_local(self):
+ auth_url = settings.OPENSTACK_KEYSTONE_URL
+ self.data = data_v3.generate_test_data(service_providers=True)
+ keystone_provider = 'localkeystone'
+ projects = [self.data.project_one, self.data.project_two]
+ domains = []
+ user = self.data.user
+ unscoped = self.data.unscoped_access_info
+ form_data = self.get_form_data(user)
+
+ # mock authenticate
+ self._mock_unscoped_and_domain_list_projects(user, projects)
+ self._mock_scoped_client_for_tenant(unscoped, self.data.project_one.id)
+ self._mock_unscoped_token_client(unscoped,
+ auth_url=auth_url,
+ client=False)
+ unscoped_auth = self._mock_plugin(unscoped)
+ client = self._mock_unscoped_token_client(None, auth_url=auth_url,
+ plugin=unscoped_auth)
+ self._mock_unscoped_list_domains(client, domains)
+ client = self._mock_unscoped_token_client(None, auth_url=auth_url,
+ plugin=unscoped_auth)
+ self._mock_unscoped_list_projects(client, user, projects)
+ self._mock_scoped_client_for_tenant(unscoped, self.data.project_one.id)
+
+ self.mox.ReplayAll()
+
+ # Log in
+ url = reverse('login')
+ response = self.client.get(url)
+ self.assertEqual(response.status_code, 200)
+
+ response = self.client.post(url, form_data)
+ self.assertRedirects(response, settings.LOGIN_REDIRECT_URL)
+
+ # Switch
+ url = reverse('switch_keystone_provider', args=[keystone_provider])
+ form_data['keystone_provider'] = keystone_provider
+ response = self.client.get(url, form_data, follow=True)
+ self.assertRedirects(response, settings.LOGIN_REDIRECT_URL)
+
+ # Assert nothing has changed since we are going from local to local
+ self.assertEqual(self.client.session['keystone_provider_id'],
+ keystone_provider)
+ self.assertEqual(self.client.session['k2k_base_unscoped_token'],
+ unscoped.auth_token)
+ self.assertEqual(self.client.session['k2k_auth_url'], auth_url)
+
+ def test_switch_keystone_provider_local_fail(self):
+ auth_url = settings.OPENSTACK_KEYSTONE_URL
+ self.data = data_v3.generate_test_data(service_providers=True)
+ keystone_provider = 'localkeystone'
+ projects = [self.data.project_one, self.data.project_two]
+ user = self.data.user
+ unscoped = self.data.unscoped_access_info
+ form_data = self.get_form_data(user)
+
+ # mock authenticate
+ self._mock_unscoped_and_domain_list_projects(user, projects)
+ self._mock_scoped_client_for_tenant(unscoped, self.data.project_one.id)
+
+ # Let using the base token for logging in fail
+ plugin = v3_auth.Token(auth_url=auth_url,
+ token=unscoped.auth_token,
+ project_id=None,
+ reauthenticate=False)
+ plugin.get_access(mox.IsA(session.Session)). \
+ AndRaise(keystone_exceptions.AuthorizationFailure)
+ plugin.auth_url = auth_url
+ self.mox.ReplayAll()
+
+ # Log in
+ url = reverse('login')
+ response = self.client.get(url)
+ self.assertEqual(response.status_code, 200)
+
+ response = self.client.post(url, form_data)
+ self.assertRedirects(response, settings.LOGIN_REDIRECT_URL)
+
+ # Switch
+ url = reverse('switch_keystone_provider', args=[keystone_provider])
+ form_data['keystone_provider'] = keystone_provider
+ response = self.client.get(url, form_data, follow=True)
+ self.assertRedirects(response, settings.LOGIN_REDIRECT_URL)
+
+ # Assert
+ self.assertEqual(self.client.session['keystone_provider_id'],
+ keystone_provider)
+ self.assertEqual(self.client.session['k2k_base_unscoped_token'],
+ unscoped.auth_token)
+ self.assertEqual(self.client.session['k2k_auth_url'], auth_url)
+
+ def test_tenant_sorting(self):
+ projects = [self.data.project_two, self.data.project_one]
+ expected_projects = [self.data.project_one, self.data.project_two]
+ user = self.data.user
+ unscoped = self.data.unscoped_access_info
+
+ client = self._mock_unscoped_client_with_token(user, unscoped)
+ self._mock_unscoped_list_projects(client, user, projects)
+ self.mox.ReplayAll()
+
+ project_list = utils.get_project_list(
+ user_id=user.id,
+ auth_url=settings.OPENSTACK_KEYSTONE_URL,
+ token=unscoped.auth_token)
+ self.assertEqual(project_list, expected_projects)
+
+ def test_login_form_multidomain(self):
+ override = self.settings(OPENSTACK_KEYSTONE_MULTIDOMAIN_SUPPORT=True)
+ override.enable()
+ self.addCleanup(override.disable)
+
+ url = reverse('login')
+ response = self.client.get(url)
+ self.assertEqual(response.status_code, 200)
+ self.assertContains(response, 'id="id_domain"')
+ self.assertContains(response, 'name="domain"')
+
+ def test_login_form_multidomain_dropdown(self):
+ override = self.settings(OPENSTACK_KEYSTONE_MULTIDOMAIN_SUPPORT=True,
+ OPENSTACK_KEYSTONE_DOMAIN_DROPDOWN=True,
+ OPENSTACK_KEYSTONE_DOMAIN_CHOICES=(
+ ('Default', 'Default'),)
+ )
+ override.enable()
+ self.addCleanup(override.disable)
+
+ url = reverse('login')
+ response = self.client.get(url)
+ self.assertEqual(response.status_code, 200)
+ self.assertContains(response, 'id="id_domain"')
+ self.assertContains(response, 'name="domain"')
+ self.assertContains(response, 'option value="Default"')
+ settings.OPENSTACK_KEYSTONE_DOMAIN_DROPDOWN = False
+
+
+class OpenStackAuthTestsWebSSO(OpenStackAuthTestsMixin,
+ OpenStackAuthFederatedTestsMixin,
+ test.TestCase):
+
+ def _create_token_auth(self, project_id=None, token=None, url=None):
+ if not token:
+ token = self.data.federated_unscoped_access_info.auth_token
+
+ if not url:
+ url = settings.OPENSTACK_KEYSTONE_URL
+
+ return v3_auth.Token(auth_url=url,
+ token=token,
+ project_id=project_id,
+ reauthenticate=False)
+
+ def setUp(self):
+ super(OpenStackAuthTestsWebSSO, self).setUp()
+
+ self.mox = mox.Mox()
+ self.addCleanup(self.mox.VerifyAll)
+ self.addCleanup(self.mox.UnsetStubs)
+
+ self.data = data_v3.generate_test_data()
+ self.ks_client_module = client_v3
+
+ self.idp_id = uuid.uuid4().hex
+ self.idp_oidc_id = uuid.uuid4().hex
+ self.idp_saml2_id = uuid.uuid4().hex
+
+ settings.OPENSTACK_API_VERSIONS['identity'] = 3
+ settings.OPENSTACK_KEYSTONE_URL = 'http://localhost:5000/v3'
+ settings.WEBSSO_ENABLED = True
+ settings.WEBSSO_CHOICES = (
+ ('credentials', 'Keystone Credentials'),
+ ('oidc', 'OpenID Connect'),
+ ('saml2', 'Security Assertion Markup Language'),
+ (self.idp_oidc_id, 'IDP OIDC'),
+ (self.idp_saml2_id, 'IDP SAML2')
+ )
+ settings.WEBSSO_IDP_MAPPING = {
+ self.idp_oidc_id: (self.idp_id, 'oidc'),
+ self.idp_saml2_id: (self.idp_id, 'saml2')
+ }
+
+ self.mox.StubOutClassWithMocks(token_endpoint, 'Token')
+ self.mox.StubOutClassWithMocks(v3_auth, 'Token')
+ self.mox.StubOutClassWithMocks(v3_auth, 'Password')
+ self.mox.StubOutClassWithMocks(client_v3, 'Client')
+
+ def test_login_form(self):
+ url = reverse('login')
+
+ response = self.client.get(url)
+ self.assertEqual(response.status_code, 200)
+ self.assertContains(response, 'credentials')
+ self.assertContains(response, 'oidc')
+ self.assertContains(response, 'saml2')
+ self.assertContains(response, self.idp_oidc_id)
+ self.assertContains(response, self.idp_saml2_id)
+
+ def test_websso_redirect_by_protocol(self):
+ origin = 'http://testserver/auth/websso/'
+ protocol = 'oidc'
+ redirect_url = ('%s/auth/OS-FEDERATION/websso/%s?origin=%s' %
+ (settings.OPENSTACK_KEYSTONE_URL, protocol, origin))
+
+ form_data = {'auth_type': protocol,
+ 'region': settings.OPENSTACK_KEYSTONE_URL}
+ url = reverse('login')
+
+ # POST to the page and redirect to keystone.
+ response = self.client.post(url, form_data)
+ self.assertRedirects(response, redirect_url, status_code=302,
+ target_status_code=404)
+
+ def test_websso_redirect_by_idp(self):
+ origin = 'http://testserver/auth/websso/'
+ protocol = 'oidc'
+ redirect_url = ('%s/auth/OS-FEDERATION/identity_providers/%s'
+ '/protocols/%s/websso?origin=%s' %
+ (settings.OPENSTACK_KEYSTONE_URL, self.idp_id,
+ protocol, origin))
+
+ form_data = {'auth_type': self.idp_oidc_id,
+ 'region': settings.OPENSTACK_KEYSTONE_URL}
+ url = reverse('login')
+
+ # POST to the page and redirect to keystone.
+ response = self.client.post(url, form_data)
+ self.assertRedirects(response, redirect_url, status_code=302,
+ target_status_code=404)
+
+ def test_websso_login(self):
+ projects = [self.data.project_one, self.data.project_two]
+ domains = []
+ unscoped = self.data.federated_unscoped_access_info
+ token = unscoped.auth_token
+ unscoped_auth = self._mock_plugin(unscoped)
+
+ form_data = {'token': token}
+ self._mock_federated_client_list_domains(unscoped_auth, domains)
+ self._mock_federated_client_list_projects(unscoped_auth, projects)
+ self._mock_scoped_client_for_tenant(unscoped, self.data.project_one.id)
+
+ self.mox.ReplayAll()
+
+ url = reverse('websso')
+
+ # POST to the page to log in.
+ response = self.client.post(url, form_data)
+ self.assertRedirects(response, settings.LOGIN_REDIRECT_URL)
+
+ def test_websso_login_with_auth_in_url(self):
+ settings.OPENSTACK_KEYSTONE_URL = 'http://auth.openstack.org:5000/v3'
+
+ projects = [self.data.project_one, self.data.project_two]
+ domains = []
+ unscoped = self.data.federated_unscoped_access_info
+ token = unscoped.auth_token
+ unscoped_auth = self._mock_plugin(unscoped)
+
+ form_data = {'token': token}
+ self._mock_federated_client_list_domains(unscoped_auth, domains)
+ self._mock_federated_client_list_projects(unscoped_auth, projects)
+ self._mock_scoped_client_for_tenant(unscoped, self.data.project_one.id)
+
+ self.mox.ReplayAll()
+
+ url = reverse('websso')
+
+ # POST to the page to log in.
+ response = self.client.post(url, form_data)
+ self.assertRedirects(response, settings.LOGIN_REDIRECT_URL)
+
+load_tests = load_tests_apply_scenarios
+
+
+class PolicyLoaderTestCase(test.TestCase):
+ def test_policy_file_load(self):
+ policy.reset()
+ enforcer = policy._get_enforcer()
+ self.assertEqual(2, len(enforcer))
+ self.assertIn('identity', enforcer)
+ self.assertIn('compute', enforcer)
+
+ def test_policy_reset(self):
+ policy._get_enforcer()
+ self.assertEqual(2, len(policy._ENFORCER))
+ policy.reset()
+ self.assertIsNone(policy._ENFORCER)
+
+
+class PermTestCase(test.TestCase):
+ def test_has_perms(self):
+ testuser = user.User(id=1, roles=[])
+
+ def has_perm(perm, obj=None):
+ return perm in ('perm1', 'perm3')
+
+ with mock.patch.object(testuser, 'has_perm', side_effect=has_perm):
+ self.assertFalse(testuser.has_perms(['perm2']))
+
+ # perm1 AND perm3
+ self.assertFalse(testuser.has_perms(['perm1', 'perm2']))
+
+ # perm1 AND perm3
+ self.assertTrue(testuser.has_perms(['perm1', 'perm3']))
+
+ # perm1 AND (perm2 OR perm3)
+ perm_list = ['perm1', ('perm2', 'perm3')]
+ self.assertTrue(testuser.has_perms(perm_list))
+
+
+class PolicyTestCase(test.TestCase):
+ _roles = []
+
+ def setUp(self):
+ mock_user = user.User(id=1, roles=self._roles)
+ patcher = mock.patch('openstack_auth.utils.get_user',
+ return_value=mock_user)
+ self.MockClass = patcher.start()
+ self.addCleanup(patcher.stop)
+ self.request = http.HttpRequest()
+
+
+class PolicyTestCaseNonAdmin(PolicyTestCase):
+ _roles = [{'id': '1', 'name': 'member'}]
+
+ def test_check_admin_required_false(self):
+ policy.reset()
+ value = policy.check((("identity", "admin_required"),),
+ request=self.request)
+ self.assertFalse(value)
+
+ def test_check_identity_rule_not_found_false(self):
+ policy.reset()
+ value = policy.check((("identity", "i_dont_exist"),),
+ request=self.request)
+ # this should fail because the default check for
+ # identity is admin_required
+ self.assertFalse(value)
+
+ def test_check_nova_context_is_admin_false(self):
+ policy.reset()
+ value = policy.check((("compute", "context_is_admin"),),
+ request=self.request)
+ self.assertFalse(value)
+
+ def test_compound_check_false(self):
+ policy.reset()
+ value = policy.check((("identity", "admin_required"),
+ ("identity", "identity:default"),),
+ request=self.request)
+ self.assertFalse(value)
+
+ def test_scope_not_found(self):
+ policy.reset()
+ value = policy.check((("dummy", "default"),),
+ request=self.request)
+ self.assertTrue(value)
+
+
+class PolicyTestCaseAdmin(PolicyTestCase):
+ _roles = [{'id': '1', 'name': 'admin'}]
+
+ def test_check_admin_required_true(self):
+ policy.reset()
+ value = policy.check((("identity", "admin_required"),),
+ request=self.request)
+ self.assertTrue(value)
+
+ def test_check_identity_rule_not_found_true(self):
+ policy.reset()
+ value = policy.check((("identity", "i_dont_exist"),),
+ request=self.request)
+ # this should succeed because the default check for
+ # identity is admin_required
+ self.assertTrue(value)
+
+ def test_compound_check_true(self):
+ policy.reset()
+ value = policy.check((("identity", "admin_required"),
+ ("identity", "identity:default"),),
+ request=self.request)
+ self.assertTrue(value)
+
+ def test_check_nova_context_is_admin_true(self):
+ policy.reset()
+ value = policy.check((("compute", "context_is_admin"),),
+ request=self.request)
+ self.assertTrue(value)
+
+
+class PolicyTestCaseV3Admin(PolicyTestCase):
+ _roles = [{'id': '1', 'name': 'admin'}]
+
+ def setUp(self):
+ policy_files = {
+ 'identity': 'policy.v3cloudsample.json',
+ 'compute': 'nova_policy.json'}
+
+ override = self.settings(POLICY_FILES=policy_files)
+ override.enable()
+ self.addCleanup(override.disable)
+
+ mock_user = user.User(id=1, roles=self._roles,
+ user_domain_id='admin_domain_id')
+ patcher = mock.patch('openstack_auth.utils.get_user',
+ return_value=mock_user)
+ self.MockClass = patcher.start()
+ self.addCleanup(patcher.stop)
+ self.request = http.HttpRequest()
+
+ def test_check_cloud_admin_required_true(self):
+ policy.reset()
+ value = policy.check((("identity", "cloud_admin"),),
+ request=self.request)
+ self.assertTrue(value)
+
+ def test_check_domain_admin_required_true(self):
+ policy.reset()
+ value = policy.check((
+ ("identity", "admin_and_matching_domain_id"),),
+ request=self.request)
+ self.assertTrue(value)
+
+ def test_check_any_admin_required_true(self):
+ policy.reset()
+ value = policy.check((("identity", "admin_or_cloud_admin"),),
+ request=self.request)
+ self.assertTrue(value)
+
+
+class RoleTestCaseAdmin(test.TestCase):
+
+ def test_get_admin_roles_with_default_value(self):
+ admin_roles = utils.get_admin_roles()
+ self.assertSetEqual({'admin'}, admin_roles)
+
+ @override_settings(OPENSTACK_KEYSTONE_ADMIN_ROLES=['foO', 'BAR', 'admin'])
+ def test_get_admin_roles(self):
+ admin_roles = utils.get_admin_roles()
+ self.assertSetEqual({'foo', 'bar', 'admin'}, admin_roles)
+
+ @override_settings(OPENSTACK_KEYSTONE_ADMIN_ROLES=['foO', 'BAR', 'admin'])
+ def test_get_admin_permissions(self):
+ admin_permissions = utils.get_admin_permissions()
+ self.assertSetEqual({'openstack.roles.foo',
+ 'openstack.roles.bar',
+ 'openstack.roles.admin'}, admin_permissions)
+
+
+class UtilsTestCase(test.TestCase):
+
+ def test_fix_auth_url_version_v20(self):
+ settings.OPENSTACK_API_VERSIONS['identity'] = 2.0
+ test_urls = [
+ ("http://a/", ("http://a/v2.0", False)),
+ ("http://a", ("http://a/v2.0", False)),
+ ("http://a:8080/", ("http://a:8080/v2.0", False)),
+ ("http://a/v2.0", ("http://a/v2.0", False)),
+ ("http://a/v2.0/", ("http://a/v2.0/", False)),
+ ("http://a/identity", ("http://a/identity/v2.0", False)),
+ ("http://a/identity/", ("http://a/identity/v2.0", False)),
+ ("http://a:5000/identity/v2.0",
+ ("http://a:5000/identity/v2.0", False)),
+ ("http://a/identity/v2.0/", ("http://a/identity/v2.0/", False))
+ ]
+ for src, expected in test_urls:
+ self.assertEqual(expected, utils.fix_auth_url_version_prefix(src))
+
+ def test_fix_auth_url_version_v3(self):
+ settings.OPENSTACK_API_VERSIONS['identity'] = 3
+ test_urls = [
+ ("http://a/", ("http://a/v3", False)),
+ ("http://a", ("http://a/v3", False)),
+ ("http://a:8080/", ("http://a:8080/v3", False)),
+ ("http://a/v3", ("http://a/v3", False)),
+ ("http://a/v3/", ("http://a/v3/", False)),
+ ("http://a/v2.0/", ("http://a/v3/", True)),
+ ("http://a/v2.0", ("http://a/v3", True)),
+ ("http://a/identity", ("http://a/identity/v3", False)),
+ ("http://a:5000/identity/", ("http://a:5000/identity/v3", False)),
+ ("http://a/identity/v3", ("http://a/identity/v3", False)),
+ ("http://a/identity/v3/", ("http://a/identity/v3/", False))
+ ]
+ for src, expected in test_urls:
+ self.assertEqual(expected, utils.fix_auth_url_version_prefix(src))
+
+
+class UserTestCase(test.TestCase):
+
+ def setUp(self):
+ self.data = data_v3.generate_test_data(pki=True)
+
+ def test_unscoped_token_is_none(self):
+ created_token = user.Token(self.data.domain_scoped_access_info,
+ unscoped_token=None)
+ self.assertTrue(created_token._is_pki_token(
+ self.data.domain_scoped_access_info.auth_token))
+ self.assertFalse(created_token._is_pki_token(None))
+
+
+class BehindProxyTestCase(test.TestCase):
+
+ def setUp(self):
+ self.request = http.HttpRequest()
+
+ def test_without_proxy(self):
+ self.request.META['REMOTE_ADDR'] = '10.111.111.2'
+ from openstack_auth.utils import get_client_ip
+ self.assertEqual('10.111.111.2', get_client_ip(self.request))
+
+ def test_with_proxy_no_settings(self):
+ from openstack_auth.utils import get_client_ip
+ self.request.META['REMOTE_ADDR'] = '10.111.111.2'
+ self.request.META['HTTP_X_REAL_IP'] = '192.168.15.33'
+ self.request.META['HTTP_X_FORWARDED_FOR'] = '172.18.0.2'
+ self.assertEqual('10.111.111.2', get_client_ip(self.request))
+
+ def test_with_settings_without_proxy(self):
+ from openstack_auth.utils import get_client_ip
+ self.request.META['REMOTE_ADDR'] = '10.111.111.2'
+ self.assertEqual('10.111.111.2', get_client_ip(self.request))
+
+ @override_settings(SECURE_PROXY_ADDR_HEADER='HTTP_X_FORWARDED_FOR')
+ def test_with_settings_with_proxy_forwardfor(self):
+ from openstack_auth.utils import get_client_ip
+ self.request.META['REMOTE_ADDR'] = '10.111.111.2'
+ self.request.META['HTTP_X_FORWARDED_FOR'] = '172.18.0.2'
+ self.assertEqual('172.18.0.2', get_client_ip(self.request))
+
+ @override_settings(SECURE_PROXY_ADDR_HEADER='HTTP_X_REAL_IP')
+ def test_with_settings_with_proxy_real_ip(self):
+ from openstack_auth.utils import get_client_ip
+ self.request.META['REMOTE_ADDR'] = '10.111.111.2'
+ self.request.META['HTTP_X_REAL_IP'] = '192.168.15.33'
+ self.request.META['HTTP_X_FORWARDED_FOR'] = '172.18.0.2'
+ self.assertEqual('192.168.15.33', get_client_ip(self.request))
diff --git a/openstack_auth/tests/urls.py b/openstack_auth/tests/urls.py
new file mode 100644
index 000000000..b93725a2a
--- /dev/null
+++ b/openstack_auth/tests/urls.py
@@ -0,0 +1,29 @@
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from django.conf.urls import include
+from django.conf.urls import url
+from django.views import generic
+
+from openstack_auth import utils
+from openstack_auth import views
+
+
+utils.patch_middleware_get_user()
+
+
+urlpatterns = [
+ url(r"", include('openstack_auth.urls')),
+ url(r"^websso/$", views.websso, name='websso'),
+ url(r"^$", generic.TemplateView.as_view(template_name="auth/blank.html"))
+]
diff --git a/openstack_auth/urls.py b/openstack_auth/urls.py
new file mode 100644
index 000000000..12d7cfb0b
--- /dev/null
+++ b/openstack_auth/urls.py
@@ -0,0 +1,36 @@
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from django.conf.urls import url
+
+from openstack_auth import utils
+from openstack_auth import views
+
+utils.patch_middleware_get_user()
+
+
+urlpatterns = [
+ url(r"^login/$", views.login, name='login'),
+ url(r"^logout/$", views.logout, name='logout'),
+ url(r'^switch/(?P<tenant_id>[^/]+)/$', views.switch,
+ name='switch_tenants'),
+ url(r'^switch_services_region/(?P<region_name>[^/]+)/$',
+ views.switch_region,
+ name='switch_services_region'),
+ url(r'^switch_keystone_provider/(?P<keystone_provider>[^/]+)/$',
+ views.switch_keystone_provider,
+ name='switch_keystone_provider')
+]
+
+if utils.is_websso_enabled():
+ urlpatterns.append(url(r"^websso/$", views.websso, name='websso'))
diff --git a/openstack_auth/user.py b/openstack_auth/user.py
new file mode 100644
index 000000000..063648bf8
--- /dev/null
+++ b/openstack_auth/user.py
@@ -0,0 +1,454 @@
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import datetime
+import hashlib
+import logging
+
+import django
+from django.conf import settings
+from django.contrib.auth import models
+from django.db import models as db_models
+from django.utils import deprecation
+from keystoneauth1 import exceptions as keystone_exceptions
+from keystoneclient.common import cms as keystone_cms
+import six
+
+from openstack_auth import utils
+
+
+LOG = logging.getLogger(__name__)
+_TOKEN_HASH_ENABLED = getattr(settings, 'OPENSTACK_TOKEN_HASH_ENABLED', True)
+
+
+def set_session_from_user(request, user):
+ request.session['token'] = user.token
+ request.session['user_id'] = user.id
+ request.session['region_endpoint'] = user.endpoint
+ request.session['services_region'] = user.services_region
+ # Update the user object cached in the request
+ request._cached_user = user
+ request.user = user
+
+
+def create_user_from_token(request, token, endpoint, services_region=None):
+ # if the region is provided, use that, otherwise use the preferred region
+ default_service_regions = getattr(settings, 'DEFAULT_SERVICE_REGIONS', {})
+ default_service_region = default_service_regions.get(endpoint)
+ svc_region = services_region or \
+ utils.default_services_region(token.serviceCatalog, request,
+ selected_region=default_service_region)
+ return User(id=token.user['id'],
+ token=token,
+ user=token.user['name'],
+ password_expires_at=token.user['password_expires_at'],
+ user_domain_id=token.user_domain_id,
+ # We need to consider already logged-in users with an old
+ # version of Token without user_domain_name.
+ user_domain_name=getattr(token, 'user_domain_name', None),
+ project_id=token.project['id'],
+ project_name=token.project['name'],
+ domain_id=token.domain['id'],
+ domain_name=token.domain['name'],
+ enabled=True,
+ service_catalog=token.serviceCatalog,
+ roles=token.roles,
+ endpoint=endpoint,
+ services_region=svc_region,
+ is_federated=getattr(token, 'is_federated', False),
+ unscoped_token=getattr(token, 'unscoped_token',
+ request.session.get('unscoped_token')))
+
+
+class Token(object):
+ """Encapsulates the AccessInfo object from keystoneclient.
+
+ Token object provides a consistent interface for accessing the keystone
+ token information and service catalog.
+
+ Added for maintaining backward compatibility with horizon that expects
+ Token object in the user object.
+ """
+ def __init__(self, auth_ref, unscoped_token=None):
+ # User-related attributes
+ user = {'id': auth_ref.user_id, 'name': auth_ref.username}
+ data = getattr(auth_ref, '_data', {})
+ expiration_date = data.get('token', {}).get('user', {})\
+ .get('password_expires_at')
+ user['password_expires_at'] = expiration_date
+ self.user = user
+ self.user_domain_id = auth_ref.user_domain_id
+ self.user_domain_name = auth_ref.user_domain_name
+
+ # Token-related attributes
+ self.id = auth_ref.auth_token
+ self.unscoped_token = unscoped_token
+ if _TOKEN_HASH_ENABLED and self._is_pki_token(self.id):
+ algorithm = getattr(settings, 'OPENSTACK_TOKEN_HASH_ALGORITHM',
+ 'md5')
+ hasher = hashlib.new(algorithm)
+ hasher.update(self.id.encode('utf-8'))
+ self.id = hasher.hexdigest()
+ # Only hash unscoped token if needed
+ if self._is_pki_token(self.unscoped_token):
+ hasher = hashlib.new(algorithm)
+ hasher.update(self.unscoped_token.encode('utf-8'))
+ self.unscoped_token = hasher.hexdigest()
+ self.expires = auth_ref.expires
+
+ # Project-related attributes
+ project = {}
+ project['id'] = auth_ref.project_id
+ project['name'] = auth_ref.project_name
+ project['is_admin_project'] = getattr(auth_ref, 'is_admin_project',
+ False)
+ project['domain_id'] = getattr(auth_ref, 'project_domain_id', None)
+ self.project = project
+ self.tenant = self.project
+
+ # Domain-related attributes
+ domain = {}
+ domain['id'] = auth_ref.domain_id
+ domain['name'] = auth_ref.domain_name
+ self.domain = domain
+
+ # Federation-related attributes
+ self.is_federated = auth_ref.is_federated
+ self.roles = [{'name': role} for role in auth_ref.role_names]
+ self.serviceCatalog = auth_ref.service_catalog.catalog
+
+ def _is_pki_token(self, token):
+ """Determines if this is a pki-based token (pki or pkiz)"""
+ if token is None:
+ return False
+ return (keystone_cms.is_ans1_token(token)
+ or keystone_cms.is_pkiz(token))
+
+
+class User(models.AbstractBaseUser, models.AnonymousUser):
+ """A User class with some extra special sauce for Keystone.
+
+ In addition to the standard Django user attributes, this class also has
+ the following:
+
+ .. attribute:: token
+
+ The Keystone token object associated with the current user/tenant.
+
+ The token object is deprecated, user auth_ref instead.
+
+ .. attribute:: tenant_id
+
+ The id of the Keystone tenant for the current user/token.
+
+ The tenant_id keyword argument is deprecated, use project_id instead.
+
+ .. attribute:: tenant_name
+
+ The name of the Keystone tenant for the current user/token.
+
+ The tenant_name keyword argument is deprecated, use project_name
+ instead.
+
+ .. attribute:: project_id
+
+ The id of the Keystone project for the current user/token.
+
+ .. attribute:: project_name
+
+ The name of the Keystone project for the current user/token.
+
+ .. attribute:: service_catalog
+
+ The ``ServiceCatalog`` data returned by Keystone.
+
+ .. attribute:: roles
+
+ A list of dictionaries containing role names and ids as returned
+ by Keystone.
+
+ .. attribute:: services_region
+
+ A list of non-identity service endpoint regions extracted from the
+ service catalog.
+
+ .. attribute:: user_domain_id
+
+ The domain id of the current user.
+
+ .. attribute:: user_domain_name
+
+ The domain name of the current user.
+
+ .. attribute:: domain_id
+
+ The id of the Keystone domain scoped for the current user/token.
+
+ .. attribute:: is_federated
+
+ Whether user is federated Keystone user. (Boolean)
+
+ .. attribute:: unscoped_token
+
+ Unscoped Keystone token.
+
+ .. attribute:: password_expires_at
+
+ Password expiration date. This attribute could be None when using
+ keystone version < 3.0 or if the feature is not enabled in keystone.
+
+ """
+
+ keystone_user_id = db_models.CharField(primary_key=True, max_length=255)
+ USERNAME_FIELD = 'keystone_user_id'
+
+ def __init__(self, id=None, token=None, user=None, tenant_id=None,
+ service_catalog=None, tenant_name=None, roles=None,
+ authorized_tenants=None, endpoint=None, enabled=False,
+ services_region=None, user_domain_id=None,
+ user_domain_name=None, domain_id=None, domain_name=None,
+ project_id=None, project_name=None, is_federated=False,
+ unscoped_token=None, password=None, password_expires_at=None):
+ self.id = id
+ self.pk = id
+ self.token = token
+ self.keystone_user_id = id
+ self.username = user
+ self.user_domain_id = user_domain_id
+ self.user_domain_name = user_domain_name
+ self.domain_id = domain_id
+ self.domain_name = domain_name
+ self.project_id = project_id or tenant_id
+ self.project_name = project_name or tenant_name
+ self.service_catalog = service_catalog
+ self._services_region = (
+ services_region
+ or utils.default_services_region(service_catalog)
+ )
+ self.roles = roles or []
+ self.endpoint = endpoint
+ self.enabled = enabled
+ self._authorized_tenants = authorized_tenants
+ self.is_federated = is_federated
+ self.password_expires_at = password_expires_at
+
+ # Unscoped token is used for listing user's project that works
+ # for both federated and keystone user.
+ self.unscoped_token = unscoped_token
+
+ # List of variables to be deprecated.
+ self.tenant_id = self.project_id
+ self.tenant_name = self.project_name
+
+ # Required by AbstractBaseUser
+ self.password = None
+
+ def __unicode__(self):
+ return self.username
+
+ def __repr__(self):
+ return "<%s: %s>" % (self.__class__.__name__, self.username)
+
+ def is_token_expired(self, margin=None):
+ """Determine if the token is expired.
+
+ :returns:
+ ``True`` if the token is expired, ``False`` if not, and
+ ``None`` if there is no token set.
+
+ :param margin:
+ A security time margin in seconds before real expiration.
+ Will return ``True`` if the token expires in less than ``margin``
+ seconds of time.
+ A default margin can be set by the TOKEN_TIMEOUT_MARGIN in the
+ django settings.
+
+ """
+ if self.token is None:
+ return None
+ return not utils.is_token_valid(self.token, margin)
+
+ if django.VERSION >= (1, 10):
+ @property
+ def is_authenticated(self):
+ """Checks for a valid authentication."""
+ if (self.token is not None and utils.is_token_valid(self.token)):
+ return deprecation.CallableTrue
+ else:
+ return deprecation.CallableFalse
+
+ @property
+ def is_anonymous(self):
+ """Return if the user is not authenticated.
+
+ :returns: ``True`` if not authenticated,``False`` otherwise.
+ """
+ return deprecation.CallableBool(not self.is_authenticated)
+ else:
+ def is_authenticated(self, margin=None):
+ """Checks for a valid authentication.
+
+ :param margin:
+ A security time margin in seconds before end of authentication.
+ Will return ``False`` if authentication ends in less than
+ ``margin`` seconds of time.
+ A default margin can be set by the TOKEN_TIMEOUT_MARGIN in the
+ django settings.
+ """
+ return (self.token is not None and
+ utils.is_token_valid(self.token, margin))
+
+ def is_anonymous(self, margin=None):
+ """Return if the user is not authenticated.
+
+ :returns: ``True`` if not authenticated,``False`` otherwise.
+
+ :param margin:
+ A security time margin in seconds before end of an eventual
+ authentication.
+ Will return ``True`` even if authenticated but that
+ authentication ends in less than ``margin`` seconds of time.
+ A default margin can be set by the TOKEN_TIMEOUT_MARGIN in the
+ django settings.
+ """
+ return not self.is_authenticated(margin)
+
+ @property
+ def is_active(self):
+ return self.enabled
+
+ @property
+ def is_superuser(self):
+ """Evaluates whether this user has admin privileges.
+
+ :returns: ``True`` or ``False``.
+ """
+ admin_roles = utils.get_admin_roles()
+ user_roles = {role['name'].lower() for role in self.roles}
+ return not admin_roles.isdisjoint(user_roles)
+
+ @property
+ def authorized_tenants(self):
+ """Returns a memoized list of tenants this user may access."""
+ if self.is_authenticated() and self._authorized_tenants is None:
+ endpoint = self.endpoint
+ try:
+ self._authorized_tenants = utils.get_project_list(
+ user_id=self.id,
+ auth_url=endpoint,
+ token=self.unscoped_token,
+ is_federated=self.is_federated)
+ except (keystone_exceptions.ClientException,
+ keystone_exceptions.AuthorizationFailure):
+ LOG.exception('Unable to retrieve project list.')
+ return self._authorized_tenants or []
+
+ @authorized_tenants.setter
+ def authorized_tenants(self, tenant_list):
+ self._authorized_tenants = tenant_list
+
+ @property
+ def services_region(self):
+ return self._services_region
+
+ @services_region.setter
+ def services_region(self, region):
+ self._services_region = region
+
+ @property
+ def available_services_regions(self):
+ """Returns list of unique region name values in service catalog."""
+ regions = []
+ if self.service_catalog:
+ for service in self.service_catalog:
+ service_type = service.get('type')
+ if service_type is None or service_type == 'identity':
+ continue
+ for endpoint in service.get('endpoints', []):
+ region = utils.get_endpoint_region(endpoint)
+ if region not in regions:
+ regions.append(region)
+ return regions
+
+ def save(*args, **kwargs):
+ # Presume we can't write to Keystone.
+ pass
+
+ def delete(*args, **kwargs):
+ # Presume we can't write to Keystone.
+ pass
+
+ # Check for OR'd permission rules, check that user has one of the
+ # required permission.
+ def has_a_matching_perm(self, perm_list, obj=None):
+ """Returns True if the user has one of the specified permissions.
+
+ If object is passed, it checks if the user has any of the required
+ perms for this object.
+ """
+ # If there are no permissions to check, just return true
+ if not perm_list:
+ return True
+ # Check that user has at least one of the required permissions.
+ for perm in perm_list:
+ if self.has_perm(perm, obj):
+ return True
+ return False
+
+ # Override the default has_perms method. Allowing for more
+ # complex combinations of permissions. Will check for logical AND of
+ # all top level permissions. Will use logical OR for all first level
+ # tuples (check that use has one permissions in the tuple)
+ #
+ # Examples:
+ # Checks for all required permissions
+ # ('openstack.roles.admin', 'openstack.roles.L3-support')
+ #
+ # Checks for admin AND (L2 or L3)
+ # ('openstack.roles.admin', ('openstack.roles.L3-support',
+ # 'openstack.roles.L2-support'),)
+ def has_perms(self, perm_list, obj=None):
+ """Returns True if the user has all of the specified permissions.
+
+ Tuples in the list will possess the required permissions if
+ the user has a permissions matching one of the elements of
+ that tuple
+ """
+ # If there are no permissions to check, just return true
+ if not perm_list:
+ return True
+ for perm in perm_list:
+ if isinstance(perm, six.string_types):
+ # check that the permission matches
+ if not self.has_perm(perm, obj):
+ return False
+ else:
+ # check that a permission in the tuple matches
+ if not self.has_a_matching_perm(perm, obj):
+ return False
+ return True
+
+ def time_until_expiration(self):
+ """Returns the number of remaining days until user's password expires.
+
+ Calculates the number days until the user must change their password,
+ once the password expires the user will not able to log in until an
+ admin changes its password.
+ """
+ if self.password_expires_at is not None:
+ expiration_date = datetime.datetime.strptime(
+ self.password_expires_at, "%Y-%m-%dT%H:%M:%S.%f")
+ return expiration_date - datetime.datetime.now()
+
+ class Meta(object):
+ app_label = 'openstack_auth'
diff --git a/openstack_auth/utils.py b/openstack_auth/utils.py
new file mode 100644
index 000000000..cac0d7a3f
--- /dev/null
+++ b/openstack_auth/utils.py
@@ -0,0 +1,562 @@
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import datetime
+import logging
+import re
+
+from django.conf import settings
+from django.contrib import auth
+from django.contrib.auth import models
+from django.utils import timezone
+from keystoneauth1.identity import v2 as v2_auth
+from keystoneauth1.identity import v3 as v3_auth
+from keystoneauth1 import session
+from keystoneauth1 import token_endpoint
+from keystoneclient.v2_0 import client as client_v2
+from keystoneclient.v3 import client as client_v3
+from six.moves.urllib import parse as urlparse
+
+
+LOG = logging.getLogger(__name__)
+
+_TOKEN_TIMEOUT_MARGIN = getattr(settings, 'TOKEN_TIMEOUT_MARGIN', 0)
+
+"""
+We need the request object to get the user, so we'll slightly modify the
+existing django.contrib.auth.get_user method. To do so we update the
+auth middleware to point to our overridden method.
+
+Calling the "patch_middleware_get_user" method somewhere like our urls.py
+file takes care of hooking it in appropriately.
+"""
+
+
+def middleware_get_user(request):
+ if not hasattr(request, '_cached_user'):
+ request._cached_user = get_user(request)
+ return request._cached_user
+
+
+def get_user(request):
+ try:
+ user_id = request.session[auth.SESSION_KEY]
+ backend_path = request.session[auth.BACKEND_SESSION_KEY]
+ backend = auth.load_backend(backend_path)
+ backend.request = request
+ user = backend.get_user(user_id) or models.AnonymousUser()
+ except KeyError:
+ user = models.AnonymousUser()
+ return user
+
+
+def patch_middleware_get_user():
+ # NOTE(adriant): We can't import middleware until our customer user model
+ # is actually registered, otherwise a call to get_user_model within the
+ # middleware module will fail.
+ from django.contrib.auth import middleware
+ middleware.get_user = middleware_get_user
+ auth.get_user = get_user
+
+
+""" End Monkey-Patching. """
+
+
+def is_token_valid(token, margin=None):
+ """Timezone-aware checking of the auth token's expiration timestamp.
+
+ Returns ``True`` if the token has not yet expired, otherwise ``False``.
+
+ :param token: The openstack_auth.user.Token instance to check
+
+ :param margin:
+ A time margin in seconds to subtract from the real token's validity.
+ An example usage is that the token can be valid once the middleware
+ passed, and invalid (timed-out) during a view rendering and this
+ generates authorization errors during the view rendering.
+ A default margin can be set by the TOKEN_TIMEOUT_MARGIN in the
+ django settings.
+ """
+ expiration = token.expires
+ # In case we get an unparseable expiration timestamp, return False
+ # so you can't have a "forever" token just by breaking the expires param.
+ if expiration is None:
+ return False
+ if margin is None:
+ margin = getattr(settings, 'TOKEN_TIMEOUT_MARGIN', 0)
+ expiration = expiration - datetime.timedelta(seconds=margin)
+ if settings.USE_TZ and timezone.is_naive(expiration):
+ # Presumes that the Keystone is using UTC.
+ expiration = timezone.make_aware(expiration, timezone.utc)
+ return expiration > timezone.now()
+
+
+# From django.contrib.auth.views
+# Added in Django 1.4.3, 1.5b2
+# Vendored here for compatibility with old Django versions.
+def is_safe_url(url, host=None):
+ """Return ``True`` if the url is a safe redirection.
+
+ The safe redirection means that it doesn't point to a different host.
+ Always returns ``False`` on an empty url.
+ """
+ if not url:
+ return False
+ netloc = urlparse.urlparse(url)[1]
+ return not netloc or netloc == host
+
+
+# DEPRECATED -- Mitaka
+# This method definition is included to prevent breaking backward compatibility
+# The original functionality was problematic and has been removed.
+def remove_project_cache(token):
+ pass
+
+
+# Helper for figuring out keystone version
+# Implementation will change when API version discovery is available
+def get_keystone_version():
+ return getattr(settings, 'OPENSTACK_API_VERSIONS', {}).get('identity', 2.0)
+
+
+def get_session():
+ insecure = getattr(settings, 'OPENSTACK_SSL_NO_VERIFY', False)
+ verify = getattr(settings, 'OPENSTACK_SSL_CACERT', True)
+
+ if insecure:
+ verify = False
+
+ return session.Session(verify=verify)
+
+
+def get_keystone_client():
+ if get_keystone_version() < 3:
+ return client_v2
+ else:
+ return client_v3
+
+
+def is_token_deletion_disabled():
+ LOG.warning("Deprecated TOKEN_DELETION_DISABLED setting is no longer used")
+ return getattr(settings, 'TOKEN_DELETION_DISABLED', False)
+
+
+def is_websso_enabled():
+ """Websso is supported in Keystone version 3."""
+ websso_enabled = getattr(settings, 'WEBSSO_ENABLED', False)
+ keystonev3_plus = (get_keystone_version() >= 3)
+ return websso_enabled and keystonev3_plus
+
+
+def build_absolute_uri(request, relative_url):
+ """Ensure absolute_uri are relative to WEBROOT."""
+ webroot = getattr(settings, 'WEBROOT', '')
+ if webroot.endswith("/") and relative_url.startswith("/"):
+ webroot = webroot[:-1]
+
+ return request.build_absolute_uri(webroot + relative_url)
+
+
+def get_websso_url(request, auth_url, websso_auth):
+ """Return the keystone endpoint for initiating WebSSO.
+
+ Generate the keystone WebSSO endpoint that will redirect the user
+ to the login page of the federated identity provider.
+
+ Based on the authentication type selected by the user in the login
+ form, it will construct the keystone WebSSO endpoint.
+
+ :param request: Django http request object.
+ :type request: django.http.HttpRequest
+ :param auth_url: Keystone endpoint configured in the horizon setting.
+ The value is derived from:
+ - OPENSTACK_KEYSTONE_URL
+ - AVAILABLE_REGIONS
+ :type auth_url: string
+ :param websso_auth: Authentication type selected by the user from the
+ login form. The value is derived from the horizon
+ setting WEBSSO_CHOICES.
+ :type websso_auth: string
+
+ Example of horizon WebSSO setting::
+
+ WEBSSO_CHOICES = (
+ ("credentials", "Keystone Credentials"),
+ ("oidc", "OpenID Connect"),
+ ("saml2", "Security Assertion Markup Language"),
+ ("acme_oidc", "ACME - OpenID Connect"),
+ ("acme_saml2", "ACME - SAML2")
+ )
+
+ WEBSSO_IDP_MAPPING = {
+ "acme_oidc": ("acme", "oidc"),
+ "acme_saml2": ("acme", "saml2")
+ }
+ }
+
+ The value of websso_auth will be looked up in the WEBSSO_IDP_MAPPING
+ dictionary, if a match is found it will return a IdP specific WebSSO
+ endpoint using the values found in the mapping.
+
+ The value in WEBSSO_IDP_MAPPING is expected to be a tuple formatted as
+ (<idp_id>, <protocol_id>). Using the values found, a IdP/protocol
+ specific URL will be constructed:
+
+ /auth/OS-FEDERATION/identity_providers/<idp_id>
+ /protocols/<protocol_id>/websso
+
+ If no value is found from the WEBSSO_IDP_MAPPING dictionary, it will
+ treat the value as the global WebSSO protocol <protocol_id> and
+ construct the WebSSO URL by:
+
+ /auth/OS-FEDERATION/websso/<protocol_id>
+
+ :returns: Keystone WebSSO endpoint.
+ :rtype: string
+
+ """
+ origin = build_absolute_uri(request, '/auth/websso/')
+ idp_mapping = getattr(settings, 'WEBSSO_IDP_MAPPING', {})
+ idp_id, protocol_id = idp_mapping.get(websso_auth,
+ (None, websso_auth))
+
+ if idp_id:
+ # Use the IDP specific WebSSO endpoint
+ url = ('%s/auth/OS-FEDERATION/identity_providers/%s'
+ '/protocols/%s/websso?origin=%s' %
+ (auth_url, idp_id, protocol_id, origin))
+ else:
+ # If no IDP mapping found for the identifier,
+ # perform WebSSO by protocol.
+ url = ('%s/auth/OS-FEDERATION/websso/%s?origin=%s' %
+ (auth_url, protocol_id, origin))
+
+ return url
+
+
+def has_in_url_path(url, subs):
+ """Test if any of `subs` strings is present in the `url` path."""
+ scheme, netloc, path, query, fragment = urlparse.urlsplit(url)
+ return any([sub in path for sub in subs])
+
+
+def url_path_replace(url, old, new, count=None):
+ """Return a copy of url with replaced path.
+
+ Return a copy of url with all occurrences of old replaced by new in the url
+ path. If the optional argument count is given, only the first count
+ occurrences are replaced.
+ """
+ args = []
+ scheme, netloc, path, query, fragment = urlparse.urlsplit(url)
+ if count is not None:
+ args.append(count)
+ return urlparse.urlunsplit((
+ scheme, netloc, path.replace(old, new, *args), query, fragment))
+
+
+def url_path_append(url, suffix):
+ scheme, netloc, path, query, fragment = urlparse.urlsplit(url)
+ path = (path + suffix).replace('//', '/')
+ return urlparse.urlunsplit((scheme, netloc, path, query, fragment))
+
+
+def _augment_url_with_version(auth_url):
+ """Optionally augment auth_url path with version suffix.
+
+ Check if path component already contains version suffix and if it does
+ not, append version suffix to the end of path, not erasing the previous
+ path contents, since keystone web endpoint (like /identity) could be
+ there. Keystone version needs to be added to endpoint because as of Kilo,
+ the identity URLs returned by Keystone might no longer contain API
+ versions, leaving the version choice up to the user.
+ """
+ if has_in_url_path(auth_url, ["/v2.0", "/v3"]):
+ return auth_url
+
+ if get_keystone_version() >= 3:
+ return url_path_append(auth_url, "/v3")
+ else:
+ return url_path_append(auth_url, "/v2.0")
+
+
+# TODO(tsufiev): remove this legacy version as soon as Horizon switches to
+# the new fix_auth_url_version_prefix() call
+def fix_auth_url_version(auth_url):
+ """Fix up the auth url if an invalid or no version prefix was given.
+
+ People still give a v2 auth_url even when they specify that they want v3
+ authentication. Fix the URL to say v3 in this case and add version if it is
+ missing entirely. This should be smarter and use discovery.
+ """
+ auth_url = _augment_url_with_version(auth_url)
+
+ if get_keystone_version() >= 3 and has_in_url_path(auth_url, ["/v2.0"]):
+ LOG.warning("The Keystone URL (either in Horizon settings or in "
+ "service catalog) points to a v2.0 Keystone endpoint, "
+ "but v3 is specified as the API version to use by "
+ "Horizon. Using v3 endpoint for authentication.")
+ auth_url = url_path_replace(auth_url, "/v2.0", "/v3", 1)
+
+ return auth_url
+
+
+def fix_auth_url_version_prefix(auth_url):
+ """Fix up the auth url if an invalid or no version prefix was given.
+
+ People still give a v2 auth_url even when they specify that they want v3
+ authentication. Fix the URL to say v3 in this case and add version if it is
+ missing entirely. This should be smarter and use discovery.
+ """
+ auth_url = _augment_url_with_version(auth_url)
+
+ url_fixed = False
+ if get_keystone_version() >= 3 and has_in_url_path(auth_url, ["/v2.0"]):
+ url_fixed = True
+ auth_url = url_path_replace(auth_url, "/v2.0", "/v3", 1)
+
+ return auth_url, url_fixed
+
+
+def clean_up_auth_url(auth_url):
+ """Clean up the auth url to extract the exact Keystone URL"""
+
+ # NOTE(mnaser): This drops the query and fragment because we're only
+ # trying to extract the Keystone URL.
+ scheme, netloc, path, query, fragment = urlparse.urlsplit(auth_url)
+ return urlparse.urlunsplit((
+ scheme, netloc, re.sub(r'/auth.*', '', path), '', ''))
+
+
+def get_token_auth_plugin(auth_url, token, project_id=None, domain_name=None):
+ if get_keystone_version() >= 3:
+ if domain_name:
+ return v3_auth.Token(auth_url=auth_url,
+ token=token,
+ domain_name=domain_name,
+ reauthenticate=False)
+ else:
+ return v3_auth.Token(auth_url=auth_url,
+ token=token,
+ project_id=project_id,
+ reauthenticate=False)
+ else:
+ return v2_auth.Token(auth_url=auth_url,
+ token=token,
+ tenant_id=project_id,
+ reauthenticate=False)
+
+
+def get_project_list(*args, **kwargs):
+ is_federated = kwargs.get('is_federated', False)
+ sess = kwargs.get('session') or get_session()
+ auth_url, _ = fix_auth_url_version_prefix(kwargs['auth_url'])
+ auth = token_endpoint.Token(auth_url, kwargs['token'])
+ client = get_keystone_client().Client(session=sess, auth=auth)
+
+ if get_keystone_version() < 3:
+ projects = client.tenants.list()
+ elif is_federated:
+ projects = client.federation.projects.list()
+ else:
+ projects = client.projects.list(user=kwargs.get('user_id'))
+
+ projects.sort(key=lambda project: project.name.lower())
+ return projects
+
+
+def default_services_region(service_catalog, request=None,
+ selected_region=None):
+ """Returns the first endpoint region for first non-identity service.
+
+ Extracted from the service catalog.
+ """
+ if service_catalog:
+ available_regions = [get_endpoint_region(endpoint) for service
+ in service_catalog for endpoint
+ in service.get('endpoints', [])
+ if (service.get('type') is not None
+ and service.get('type') != 'identity')]
+ if not available_regions:
+ # this is very likely an incomplete keystone setup
+ LOG.warning('No regions could be found excluding identity.')
+ available_regions = [get_endpoint_region(endpoint) for service
+ in service_catalog for endpoint
+ in service.get('endpoints', [])]
+
+ if not available_regions:
+ # if there are no region setup for any service endpoint,
+ # this is a critical problem and it's not clear how this occurs
+ LOG.error('No regions can be found in the service catalog.')
+ return None
+
+ if request and selected_region is None:
+ selected_region = request.COOKIES.get('services_region',
+ available_regions[0])
+ if selected_region not in available_regions:
+ selected_region = available_regions[0]
+ return selected_region
+ return None
+
+
+def set_response_cookie(response, cookie_name, cookie_value):
+ """Common function for setting the cookie in the response.
+
+ Provides a common policy of setting cookies for last used project
+ and region, can be reused in other locations.
+
+ This method will set the cookie to expire in 365 days.
+ """
+ now = timezone.now()
+ expire_date = now + datetime.timedelta(days=365)
+ response.set_cookie(cookie_name, cookie_value, expires=expire_date)
+
+
+def get_endpoint_region(endpoint):
+ """Common function for getting the region from endpoint.
+
+ In Keystone V3, region has been deprecated in favor of
+ region_id.
+
+ This method provides a way to get region that works for both
+ Keystone V2 and V3.
+ """
+ return endpoint.get('region_id') or endpoint.get('region')
+
+
+def using_cookie_backed_sessions():
+ engine = getattr(settings, 'SESSION_ENGINE', '')
+ return "signed_cookies" in engine
+
+
+def get_admin_roles():
+ """Common function for getting the admin roles from settings
+
+ :return:
+ Set object including all admin roles.
+ If there is no role, this will return empty::
+
+ {
+ "foo", "bar", "admin"
+ }
+
+ """
+ admin_roles = {role.lower() for role
+ in getattr(settings, 'OPENSTACK_KEYSTONE_ADMIN_ROLES',
+ ['admin'])}
+ return admin_roles
+
+
+def get_role_permission(role):
+ """Common function for getting the permission froms arg
+
+ This format is 'openstack.roles.xxx' and 'xxx' is a real role name.
+
+ :returns:
+ String like "openstack.roles.admin"
+ If role is None, this will return None.
+
+ """
+ return "openstack.roles.%s" % role.lower()
+
+
+def get_admin_permissions():
+ """Common function for getting the admin permissions from settings
+
+ This format is 'openstack.roles.xxx' and 'xxx' is a real role name.
+
+ :returns:
+ Set object including all admin permission.
+ If there is no permission, this will return empty::
+
+ {
+ "openstack.roles.foo",
+ "openstack.roles.bar",
+ "openstack.roles.admin"
+ }
+
+ """
+ return {get_role_permission(role) for role in get_admin_roles()}
+
+
+def get_client_ip(request):
+ """Return client ip address using SECURE_PROXY_ADDR_HEADER variable.
+
+ If not present or not defined on settings then REMOTE_ADDR is used.
+
+ :param request: Django http request object.
+ :type request: django.http.HttpRequest
+
+ :returns: Possible client ip address
+ :rtype: string
+ """
+ _SECURE_PROXY_ADDR_HEADER = getattr(
+ settings, 'SECURE_PROXY_ADDR_HEADER', False
+ )
+ if _SECURE_PROXY_ADDR_HEADER:
+ return request.META.get(
+ _SECURE_PROXY_ADDR_HEADER,
+ request.META.get('REMOTE_ADDR')
+ )
+ return request.META.get('REMOTE_ADDR')
+
+
+def store_initial_k2k_session(auth_url, request, scoped_auth_ref,
+ unscoped_auth_ref):
+ """Stores session variables if there are k2k service providers
+
+ This stores variables related to Keystone2Keystone federation. This
+ function gets skipped if there are no Keystone service providers.
+ An unscoped token to the identity provider keystone gets stored
+ so that it can be used to do federated login into the service
+ providers when switching keystone providers.
+ The settings file can be configured to set the display name
+ of the local (identity provider) keystone by setting
+ KEYSTONE_PROVIDER_IDP_NAME. The KEYSTONE_PROVIDER_IDP_ID settings
+ variable is used for comparison against the service providers.
+ It should not conflict with any of the service provider ids.
+
+ :param auth_url: base token auth url
+ :param request: Django http request object
+ :param scoped_auth_ref: Scoped Keystone access info object
+ :param unscoped_auth_ref: Unscoped Keystone access info object
+ """
+ keystone_provider_id = request.session.get('keystone_provider_id', None)
+ if keystone_provider_id:
+ return None
+
+ providers = getattr(scoped_auth_ref, 'service_providers', None)
+ if providers:
+ providers = getattr(providers, '_service_providers', None)
+
+ if providers:
+ keystone_idp_name = getattr(settings, 'KEYSTONE_PROVIDER_IDP_NAME',
+ 'Local Keystone')
+ keystone_idp_id = getattr(
+ settings, 'KEYSTONE_PROVIDER_IDP_ID', 'localkeystone')
+ keystone_identity_provider = {'name': keystone_idp_name,
+ 'id': keystone_idp_id}
+ # (edtubill) We will use the IDs as the display names
+ # We may want to be able to set display names in the future.
+ keystone_providers = [
+ {'name': provider_id, 'id': provider_id}
+ for provider_id in providers]
+
+ keystone_providers.append(keystone_identity_provider)
+
+ # We treat the Keystone idp ID as None
+ request.session['keystone_provider_id'] = keystone_idp_id
+ request.session['keystone_providers'] = keystone_providers
+ request.session['k2k_base_unscoped_token'] =\
+ unscoped_auth_ref.auth_token
+ request.session['k2k_auth_url'] = auth_url
diff --git a/openstack_auth/views.py b/openstack_auth/views.py
new file mode 100644
index 000000000..5e7fad4f6
--- /dev/null
+++ b/openstack_auth/views.py
@@ -0,0 +1,327 @@
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import logging
+
+from django.conf import settings
+from django.contrib import auth
+from django.contrib.auth.decorators import login_required
+from django.contrib.auth import views as django_auth_views
+from django.contrib import messages
+from django import http as django_http
+from django import shortcuts
+from django.utils import functional
+from django.utils import http
+from django.utils.translation import ugettext_lazy as _
+from django.views.decorators.cache import never_cache
+from django.views.decorators.csrf import csrf_exempt
+from django.views.decorators.csrf import csrf_protect
+from django.views.decorators.debug import sensitive_post_parameters
+from keystoneauth1 import exceptions as keystone_exceptions
+import six
+
+from openstack_auth import exceptions
+from openstack_auth import forms
+from openstack_auth import plugin
+
+# This is historic and is added back in to not break older versions of
+# Horizon, fix to Horizon to remove this requirement was committed in
+# Juno
+from openstack_auth.forms import Login # noqa:F401
+from openstack_auth import user as auth_user
+from openstack_auth import utils
+
+try:
+ is_safe_url = http.is_safe_url
+except AttributeError:
+ is_safe_url = utils.is_safe_url
+
+
+LOG = logging.getLogger(__name__)
+
+
+@sensitive_post_parameters()
+@csrf_protect
+@never_cache
+def login(request, template_name=None, extra_context=None, **kwargs):
+ """Logs a user in using the :class:`~openstack_auth.forms.Login` form."""
+
+ # If the user enabled websso and selects default protocol
+ # from the dropdown, We need to redirect user to the websso url
+ if request.method == 'POST':
+ auth_type = request.POST.get('auth_type', 'credentials')
+ if utils.is_websso_enabled() and auth_type != 'credentials':
+ auth_url = request.POST.get('region')
+ url = utils.get_websso_url(request, auth_url, auth_type)
+ return shortcuts.redirect(url)
+
+ if not request.is_ajax():
+ # If the user is already authenticated, redirect them to the
+ # dashboard straight away, unless the 'next' parameter is set as it
+ # usually indicates requesting access to a page that requires different
+ # permissions.
+ if (request.user.is_authenticated() and
+ auth.REDIRECT_FIELD_NAME not in request.GET and
+ auth.REDIRECT_FIELD_NAME not in request.POST):
+ return shortcuts.redirect(settings.LOGIN_REDIRECT_URL)
+
+ # Get our initial region for the form.
+ initial = {}
+ current_region = request.session.get('region_endpoint', None)
+ requested_region = request.GET.get('region', None)
+ regions = dict(getattr(settings, "AVAILABLE_REGIONS", []))
+ if requested_region in regions and requested_region != current_region:
+ initial.update({'region': requested_region})
+
+ if request.method == "POST":
+ form = functional.curry(forms.Login)
+ else:
+ form = functional.curry(forms.Login, initial=initial)
+
+ if extra_context is None:
+ extra_context = {'redirect_field_name': auth.REDIRECT_FIELD_NAME}
+
+ if not template_name:
+ if request.is_ajax():
+ template_name = 'auth/_login.html'
+ extra_context['hide'] = True
+ else:
+ template_name = 'auth/login.html'
+
+ res = django_auth_views.login(request,
+ template_name=template_name,
+ authentication_form=form,
+ extra_context=extra_context,
+ **kwargs)
+ # Save the region in the cookie, this is used as the default
+ # selected region next time the Login form loads.
+ if request.method == "POST":
+ utils.set_response_cookie(res, 'login_region',
+ request.POST.get('region', ''))
+ utils.set_response_cookie(res, 'login_domain',
+ request.POST.get('domain', ''))
+
+ # Set the session data here because django's session key rotation
+ # will erase it if we set it earlier.
+ if request.user.is_authenticated():
+ auth_user.set_session_from_user(request, request.user)
+ regions = dict(forms.Login.get_region_choices())
+ region = request.user.endpoint
+ login_region = request.POST.get('region')
+ region_name = regions.get(login_region)
+ request.session['region_endpoint'] = region
+ request.session['region_name'] = region_name
+ expiration_time = request.user.time_until_expiration()
+ threshold_days = getattr(
+ settings, 'PASSWORD_EXPIRES_WARNING_THRESHOLD_DAYS', -1)
+ if expiration_time is not None and \
+ expiration_time.days <= threshold_days:
+ expiration_time = str(expiration_time).rsplit(':', 1)[0]
+ msg = (_('Please consider changing your password, it will expire'
+ ' in %s minutes') %
+ expiration_time).replace(':', ' Hours and ')
+ messages.warning(request, msg)
+ return res
+
+
+@sensitive_post_parameters()
+@csrf_exempt
+@never_cache
+def websso(request):
+ """Logs a user in using a token from Keystone's POST."""
+ referer = request.META.get('HTTP_REFERER', settings.OPENSTACK_KEYSTONE_URL)
+ auth_url = utils.clean_up_auth_url(referer)
+ token = request.POST.get('token')
+ try:
+ request.user = auth.authenticate(request=request, auth_url=auth_url,
+ token=token)
+ except exceptions.KeystoneAuthException as exc:
+ msg = 'Login failed: %s' % six.text_type(exc)
+ res = django_http.HttpResponseRedirect(settings.LOGIN_URL)
+ res.set_cookie('logout_reason', msg, max_age=10)
+ return res
+
+ auth_user.set_session_from_user(request, request.user)
+ auth.login(request, request.user)
+ if request.session.test_cookie_worked():
+ request.session.delete_test_cookie()
+ return django_http.HttpResponseRedirect(settings.LOGIN_REDIRECT_URL)
+
+
+def logout(request, login_url=None, **kwargs):
+ """Logs out the user if he is logged in. Then redirects to the log-in page.
+
+ :param login_url:
+ Once logged out, defines the URL where to redirect after login
+
+ :param kwargs:
+ see django.contrib.auth.views.logout_then_login extra parameters.
+
+ """
+ msg = 'Logging out user "%(username)s".' % \
+ {'username': request.user.username}
+ LOG.info(msg)
+
+ """ Securely logs a user out. """
+ return django_auth_views.logout_then_login(request, login_url=login_url,
+ **kwargs)
+
+
+def delete_token(endpoint, token_id):
+ """Delete a token."""
+ LOG.warning("The delete_token method is deprecated and now does nothing")
+
+
+@login_required
+def switch(request, tenant_id, redirect_field_name=auth.REDIRECT_FIELD_NAME):
+ """Switches an authenticated user from one project to another."""
+ LOG.debug('Switching to tenant %s for user "%s".',
+ (tenant_id, request.user.username))
+
+ endpoint, __ = utils.fix_auth_url_version_prefix(request.user.endpoint)
+ session = utils.get_session()
+ # Keystone can be configured to prevent exchanging a scoped token for
+ # another token. Always use the unscoped token for requesting a
+ # scoped token.
+ unscoped_token = request.user.unscoped_token
+ auth = utils.get_token_auth_plugin(auth_url=endpoint,
+ token=unscoped_token,
+ project_id=tenant_id)
+
+ try:
+ auth_ref = auth.get_access(session)
+ msg = 'Project switch successful for user "%(username)s".' % \
+ {'username': request.user.username}
+ LOG.info(msg)
+ except keystone_exceptions.ClientException:
+ msg = (
+ _('Project switch failed for user "%(username)s".') %
+ {'username': request.user.username})
+ messages.error(request, msg)
+ auth_ref = None
+ LOG.exception('An error occurred while switching sessions.')
+
+ # Ensure the user-originating redirection url is safe.
+ # Taken from django.contrib.auth.views.login()
+ redirect_to = request.GET.get(redirect_field_name, '')
+ if not is_safe_url(url=redirect_to, host=request.get_host()):
+ redirect_to = settings.LOGIN_REDIRECT_URL
+
+ if auth_ref:
+ user = auth_user.create_user_from_token(
+ request,
+ auth_user.Token(auth_ref, unscoped_token=unscoped_token),
+ endpoint)
+ auth_user.set_session_from_user(request, user)
+ message = (
+ _('Switch to project "%(project_name)s" successful.') %
+ {'project_name': request.user.project_name})
+ messages.success(request, message)
+ response = shortcuts.redirect(redirect_to)
+ utils.set_response_cookie(response, 'recent_project',
+ request.user.project_id)
+ return response
+
+
+@login_required
+def switch_region(request, region_name,
+ redirect_field_name=auth.REDIRECT_FIELD_NAME):
+ """Switches the user's region for all services except Identity service.
+
+ The region will be switched if the given region is one of the regions
+ available for the scoped project. Otherwise the region is not switched.
+ """
+ if region_name in request.user.available_services_regions:
+ request.session['services_region'] = region_name
+ LOG.debug('Switching services region to %s for user "%s".',
+ (region_name, request.user.username))
+
+ redirect_to = request.GET.get(redirect_field_name, '')
+ if not is_safe_url(url=redirect_to, host=request.get_host()):
+ redirect_to = settings.LOGIN_REDIRECT_URL
+
+ response = shortcuts.redirect(redirect_to)
+ utils.set_response_cookie(response, 'services_region',
+ request.session['services_region'])
+ return response
+
+
+@login_required
+def switch_keystone_provider(request, keystone_provider=None,
+ redirect_field_name=auth.REDIRECT_FIELD_NAME):
+ """Switches the user's keystone provider using K2K Federation
+
+ If keystone_provider is given then we switch the user to
+ the keystone provider using K2K federation. Otherwise if keystone_provider
+ is None then we switch the user back to the Identity Provider Keystone
+ which a non federated token auth will be used.
+ """
+ base_token = request.session.get('k2k_base_unscoped_token', None)
+ k2k_auth_url = request.session.get('k2k_auth_url', None)
+ keystone_providers = request.session.get('keystone_providers', None)
+
+ if not base_token or not k2k_auth_url:
+ msg = _('K2K Federation not setup for this session')
+ raise exceptions.KeystoneAuthException(msg)
+
+ redirect_to = request.GET.get(redirect_field_name, '')
+ if not is_safe_url(url=redirect_to, host=request.get_host()):
+ redirect_to = settings.LOGIN_REDIRECT_URL
+
+ unscoped_auth_ref = None
+ keystone_idp_id = getattr(
+ settings, 'KEYSTONE_PROVIDER_IDP_ID', 'localkeystone')
+
+ if keystone_provider == keystone_idp_id:
+ current_plugin = plugin.TokenPlugin()
+ unscoped_auth = current_plugin.get_plugin(auth_url=k2k_auth_url,
+ token=base_token)
+ else:
+ # Switch to service provider using K2K federation
+ plugins = [plugin.TokenPlugin()]
+ current_plugin = plugin.K2KAuthPlugin()
+
+ unscoped_auth = current_plugin.get_plugin(
+ auth_url=k2k_auth_url, service_provider=keystone_provider,
+ plugins=plugins, token=base_token)
+
+ try:
+ # Switch to identity provider using token auth
+ unscoped_auth_ref = current_plugin.get_access_info(unscoped_auth)
+ except exceptions.KeystoneAuthException as exc:
+ msg = 'Switching to Keystone Provider %s has failed. %s' \
+ % (keystone_provider, (six.text_type(exc)))
+ messages.error(request, msg)
+
+ if unscoped_auth_ref:
+ try:
+ request.user = auth.authenticate(
+ request=request, auth_url=unscoped_auth.auth_url,
+ token=unscoped_auth_ref.auth_token)
+ except exceptions.KeystoneAuthException as exc:
+ msg = 'Keystone provider switch failed: %s' % six.text_type(exc)
+ res = django_http.HttpResponseRedirect(settings.LOGIN_URL)
+ res.set_cookie('logout_reason', msg, max_age=10)
+ return res
+ auth.login(request, request.user)
+ auth_user.set_session_from_user(request, request.user)
+ request.session['keystone_provider_id'] = keystone_provider
+ request.session['keystone_providers'] = keystone_providers
+ request.session['k2k_base_unscoped_token'] = base_token
+ request.session['k2k_auth_url'] = k2k_auth_url
+ message = (
+ _('Switch to Keystone Provider "%(keystone_provider)s"'
+ 'successful.') % {'keystone_provider': keystone_provider})
+ messages.success(request, message)
+
+ response = shortcuts.redirect(redirect_to)
+ return response