summaryrefslogtreecommitdiff
path: root/openstack_auth/tests/tests.py
diff options
context:
space:
mode:
Diffstat (limited to 'openstack_auth/tests/tests.py')
-rw-r--r--openstack_auth/tests/tests.py1522
1 files changed, 1522 insertions, 0 deletions
diff --git a/openstack_auth/tests/tests.py b/openstack_auth/tests/tests.py
new file mode 100644
index 000000000..78edc8fab
--- /dev/null
+++ b/openstack_auth/tests/tests.py
@@ -0,0 +1,1522 @@
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import uuid
+
+import django
+from django.conf import settings
+from django.contrib import auth
+from django.core.urlresolvers import reverse
+from django import http
+from django import test
+from django.test.utils import override_settings
+from keystoneauth1 import exceptions as keystone_exceptions
+from keystoneauth1.identity import v2 as v2_auth
+from keystoneauth1.identity import v3 as v3_auth
+from keystoneauth1 import session
+from keystoneauth1 import token_endpoint
+from keystoneclient.v2_0 import client as client_v2
+from keystoneclient.v3 import client as client_v3
+import mock
+from mox3 import mox
+from testscenarios import load_tests_apply_scenarios
+
+from openstack_auth import policy
+from openstack_auth.tests import data_v2
+from openstack_auth.tests import data_v3
+from openstack_auth import user
+from openstack_auth import utils
+
+
+DEFAULT_DOMAIN = settings.OPENSTACK_KEYSTONE_DEFAULT_DOMAIN
+
+
+class OpenStackAuthTestsMixin(object):
+ '''Common functions for version specific tests.'''
+
+ scenarios = [
+ ('pure', {'interface': None}),
+ ('public', {'interface': 'publicURL'}),
+ ('internal', {'interface': 'internalURL'}),
+ ('admin', {'interface': 'adminURL'})
+ ]
+
+ def _mock_unscoped_client(self, user):
+ plugin = self._create_password_auth()
+ plugin.get_access(mox.IsA(session.Session)). \
+ AndReturn(self.data.unscoped_access_info)
+ plugin.auth_url = settings.OPENSTACK_KEYSTONE_URL
+ return self.ks_client_module.Client(session=mox.IsA(session.Session),
+ auth=plugin)
+
+ def _mock_unscoped_client_with_token(self, user, unscoped):
+ plugin = token_endpoint.Token(settings.OPENSTACK_KEYSTONE_URL,
+ unscoped.auth_token)
+ return self.ks_client_module.Client(session=mox.IsA(session.Session),
+ auth=plugin)
+
+ def _mock_client_token_auth_failure(self, unscoped, tenant_id):
+ plugin = self._create_token_auth(tenant_id, unscoped.auth_token)
+ plugin.get_access(mox.IsA(session.Session)). \
+ AndRaise(keystone_exceptions.AuthorizationFailure)
+
+ def _mock_client_password_auth_failure(self, username, password, exc):
+ plugin = self._create_password_auth(username=username,
+ password=password)
+ plugin.get_access(mox.IsA(session.Session)).AndRaise(exc)
+
+ def _mock_scoped_client_for_tenant(self, auth_ref, tenant_id, url=None,
+ client=True, token=None):
+ if url is None:
+ url = settings.OPENSTACK_KEYSTONE_URL
+
+ if not token:
+ token = self.data.unscoped_access_info.auth_token
+
+ plugin = self._create_token_auth(
+ tenant_id,
+ token=token,
+ url=url)
+ self.scoped_token_auth = plugin
+ plugin.get_access(mox.IsA(session.Session)).AndReturn(auth_ref)
+ if client:
+ return self.ks_client_module.Client(
+ session=mox.IsA(session.Session),
+ auth=plugin)
+
+ def get_form_data(self, user):
+ return {'region': settings.OPENSTACK_KEYSTONE_URL,
+ 'domain': DEFAULT_DOMAIN,
+ 'password': user.password,
+ 'username': user.name}
+
+
+class OpenStackAuthFederatedTestsMixin(object):
+ """Common functions for federation"""
+ def _mock_unscoped_federated_list_projects(self, client, projects):
+ client.federation = self.mox.CreateMockAnything()
+ client.federation.projects = self.mox.CreateMockAnything()
+ client.federation.projects.list().AndReturn(projects)
+
+ def _mock_unscoped_list_domains(self, client, domains):
+ client.auth = self.mox.CreateMockAnything()
+ client.auth.domains().AndReturn(domains)
+
+ def _mock_unscoped_token_client(self, unscoped, auth_url=None,
+ client=True, plugin=None):
+ if not auth_url:
+ auth_url = settings.OPENSTACK_KEYSTONE_URL
+ if unscoped and not plugin:
+ plugin = self._create_token_auth(
+ None,
+ token=unscoped.auth_token,
+ url=auth_url)
+ plugin.get_access(mox.IsA(session.Session)).AndReturn(unscoped)
+ plugin.auth_url = auth_url
+ if client:
+ return self.ks_client_module.Client(
+ session=mox.IsA(session.Session),
+ auth=plugin)
+
+ def _mock_plugin(self, unscoped, auth_url=None):
+ if not auth_url:
+ auth_url = settings.OPENSTACK_KEYSTONE_URL
+ plugin = self._create_token_auth(
+ None,
+ token=unscoped.auth_token,
+ url=auth_url)
+ plugin.get_access(mox.IsA(session.Session)).AndReturn(unscoped)
+ plugin.auth_url = settings.OPENSTACK_KEYSTONE_URL
+ return plugin
+
+ def _mock_federated_client_list_projects(self, unscoped_auth, projects):
+ client = self._mock_unscoped_token_client(None, plugin=unscoped_auth)
+ self._mock_unscoped_federated_list_projects(client, projects)
+
+ def _mock_federated_client_list_domains(self, unscoped_auth, domains):
+ client = self._mock_unscoped_token_client(None, plugin=unscoped_auth)
+ self._mock_unscoped_list_domains(client, domains)
+
+
+class OpenStackAuthTestsV2(OpenStackAuthTestsMixin, test.TestCase):
+
+ def setUp(self):
+ super(OpenStackAuthTestsV2, self).setUp()
+
+ if getattr(self, 'interface', None):
+ override = self.settings(OPENSTACK_ENDPOINT_TYPE=self.interface)
+ override.enable()
+ self.addCleanup(override.disable)
+
+ self.mox = mox.Mox()
+ self.addCleanup(self.mox.VerifyAll)
+ self.addCleanup(self.mox.UnsetStubs)
+
+ self.data = data_v2.generate_test_data()
+ self.ks_client_module = client_v2
+
+ settings.OPENSTACK_API_VERSIONS['identity'] = 2.0
+ settings.OPENSTACK_KEYSTONE_URL = "http://localhost:5000/v2.0"
+
+ self.mox.StubOutClassWithMocks(token_endpoint, 'Token')
+ self.mox.StubOutClassWithMocks(v2_auth, 'Token')
+ self.mox.StubOutClassWithMocks(v2_auth, 'Password')
+ self.mox.StubOutClassWithMocks(client_v2, 'Client')
+
+ def _mock_unscoped_list_tenants(self, client, tenants):
+ client.tenants = self.mox.CreateMockAnything()
+ client.tenants.list().AndReturn(tenants)
+
+ def _mock_unscoped_client_list_tenants(self, user, tenants):
+ client = self._mock_unscoped_client(user)
+ self._mock_unscoped_list_tenants(client, tenants)
+
+ def _create_password_auth(self, username=None, password=None, url=None):
+ if not username:
+ username = self.data.user.name
+
+ if not password:
+ password = self.data.user.password
+
+ if not url:
+ url = settings.OPENSTACK_KEYSTONE_URL
+
+ return v2_auth.Password(auth_url=url,
+ password=password,
+ username=username)
+
+ def _create_token_auth(self, project_id, token=None, url=None):
+ if not token:
+ token = self.data.unscoped_access_info.auth_token
+
+ if not url:
+ url = settings.OPENSTACK_KEYSTONE_URL
+
+ return v2_auth.Token(auth_url=url,
+ token=token,
+ tenant_id=project_id,
+ reauthenticate=False)
+
+ def _login(self):
+ tenants = [self.data.tenant_one, self.data.tenant_two]
+ user = self.data.user
+ unscoped = self.data.unscoped_access_info
+
+ form_data = self.get_form_data(user)
+ self._mock_unscoped_client_list_tenants(user, tenants)
+ self._mock_scoped_client_for_tenant(unscoped, self.data.tenant_one.id)
+
+ self.mox.ReplayAll()
+
+ url = reverse('login')
+
+ # GET the page to set the test cookie.
+ response = self.client.get(url, form_data)
+ self.assertEqual(response.status_code, 200)
+
+ # POST to the page to log in.
+ response = self.client.post(url, form_data)
+ self.assertRedirects(response, settings.LOGIN_REDIRECT_URL)
+
+ def test_login(self):
+ self._login()
+
+ def test_login_with_disabled_tenant(self):
+ # Test to validate that authentication will not try to get
+ # scoped token for disabled project.
+ tenants = [self.data.tenant_two, self.data.tenant_one]
+ user = self.data.user
+ unscoped = self.data.unscoped_access_info
+
+ form_data = self.get_form_data(user)
+ self._mock_unscoped_client_list_tenants(user, tenants)
+ self._mock_scoped_client_for_tenant(unscoped, self.data.tenant_one.id)
+ self.mox.ReplayAll()
+
+ url = reverse('login')
+
+ # GET the page to set the test cookie.
+ response = self.client.get(url, form_data)
+ self.assertEqual(response.status_code, 200)
+
+ # POST to the page to log in.
+ response = self.client.post(url, form_data)
+ self.assertRedirects(response, settings.LOGIN_REDIRECT_URL)
+
+ def test_login_w_bad_region_cookie(self):
+ self.client.cookies['services_region'] = "bad_region"
+ self._login()
+ self.assertNotEqual("bad_region",
+ self.client.session['services_region'])
+ self.assertEqual("RegionOne",
+ self.client.session['services_region'])
+
+ def test_no_enabled_tenants(self):
+ tenants = [self.data.tenant_two]
+ user = self.data.user
+
+ form_data = self.get_form_data(user)
+ self._mock_unscoped_client_list_tenants(user, tenants)
+ self.mox.ReplayAll()
+
+ url = reverse('login')
+
+ # GET the page to set the test cookie.
+ response = self.client.get(url, form_data)
+ self.assertEqual(response.status_code, 200)
+
+ # POST to the page to log in.
+ response = self.client.post(url, form_data)
+ self.assertTemplateUsed(response, 'auth/login.html')
+ self.assertContains(response,
+ 'You are not authorized for any projects.')
+
+ def test_no_tenants(self):
+ user = self.data.user
+
+ form_data = self.get_form_data(user)
+ self._mock_unscoped_client_list_tenants(user, [])
+
+ self.mox.ReplayAll()
+
+ url = reverse('login')
+
+ # GET the page to set the test cookie.
+ response = self.client.get(url, form_data)
+ self.assertEqual(response.status_code, 200)
+
+ # POST to the page to log in.
+ response = self.client.post(url, form_data)
+ self.assertTemplateUsed(response, 'auth/login.html')
+ self.assertContains(response,
+ 'You are not authorized for any projects.')
+
+ def test_invalid_credentials(self):
+ user = self.data.user
+
+ form_data = self.get_form_data(user)
+ form_data['password'] = "invalid"
+
+ exc = keystone_exceptions.Unauthorized(401)
+ self._mock_client_password_auth_failure(user.name, "invalid", exc)
+
+ self.mox.ReplayAll()
+
+ url = reverse('login')
+
+ # GET the page to set the test cookie.
+ response = self.client.get(url, form_data)
+ self.assertEqual(response.status_code, 200)
+
+ # POST to the page to log in.
+ response = self.client.post(url, form_data)
+ self.assertTemplateUsed(response, 'auth/login.html')
+ self.assertContains(response, "Invalid credentials.")
+
+ def test_exception(self):
+ user = self.data.user
+
+ form_data = self.get_form_data(user)
+ exc = keystone_exceptions.ClientException(500)
+ self._mock_client_password_auth_failure(user.name, user.password, exc)
+ self.mox.ReplayAll()
+
+ url = reverse('login')
+
+ # GET the page to set the test cookie.
+ response = self.client.get(url, form_data)
+ self.assertEqual(response.status_code, 200)
+
+ # POST to the page to log in.
+ response = self.client.post(url, form_data)
+
+ self.assertTemplateUsed(response, 'auth/login.html')
+ self.assertContains(response,
+ ("An error occurred authenticating. Please try "
+ "again later."))
+
+ def test_redirect_when_already_logged_in(self):
+ self._login()
+
+ response = self.client.get(reverse('login'))
+ self.assertEqual(response.status_code, 302)
+ self.assertNotIn(reverse('login'), response['location'])
+
+ def test_dont_redirect_when_already_logged_in_if_next_is_set(self):
+ self._login()
+
+ expected_url = "%s?%s=/%s/" % (reverse('login'),
+ auth.REDIRECT_FIELD_NAME,
+ 'special')
+
+ response = self.client.get(expected_url)
+ self.assertEqual(response.status_code, 200)
+ self.assertTemplateUsed(response, 'auth/login.html')
+
+ def test_switch(self, next=None):
+ tenant = self.data.tenant_two
+ tenants = [self.data.tenant_one, self.data.tenant_two]
+ user = self.data.user
+ unscoped = self.data.unscoped_access_info
+ scoped = self.data.scoped_access_info
+ sc = self.data.service_catalog
+ et = getattr(settings, 'OPENSTACK_ENDPOINT_TYPE', 'publicURL')
+ endpoint = sc.url_for(service_type='identity', interface=et)
+
+ form_data = self.get_form_data(user)
+
+ self._mock_unscoped_client_list_tenants(user, tenants)
+ self._mock_scoped_client_for_tenant(unscoped, self.data.tenant_one.id)
+ self._mock_scoped_client_for_tenant(scoped, tenant.id, url=endpoint,
+ client=False)
+
+ self.mox.ReplayAll()
+
+ url = reverse('login')
+
+ response = self.client.get(url)
+ self.assertEqual(response.status_code, 200)
+
+ response = self.client.post(url, form_data)
+ self.assertRedirects(response, settings.LOGIN_REDIRECT_URL)
+
+ url = reverse('switch_tenants', args=[tenant.id])
+
+ scoped._token['tenant']['id'] = self.data.tenant_two.id
+
+ if next:
+ form_data.update({auth.REDIRECT_FIELD_NAME: next})
+
+ response = self.client.get(url, form_data)
+
+ if next:
+ if django.VERSION >= (1, 9):
+ expected_url = next
+ else:
+ expected_url = 'http://testserver%s' % next
+ self.assertEqual(response['location'], expected_url)
+ else:
+ self.assertRedirects(response, settings.LOGIN_REDIRECT_URL)
+
+ self.assertEqual(self.client.session['token'].tenant['id'],
+ scoped.tenant_id)
+
+ def test_switch_with_next(self):
+ self.test_switch(next='/next_url')
+
+ def test_switch_region(self, next=None):
+ tenants = [self.data.tenant_one, self.data.tenant_two]
+ user = self.data.user
+ scoped = self.data.scoped_access_info
+ sc = self.data.service_catalog
+
+ form_data = self.get_form_data(user)
+
+ self._mock_unscoped_client_list_tenants(user, tenants)
+ self._mock_scoped_client_for_tenant(scoped, self.data.tenant_one.id)
+
+ self.mox.ReplayAll()
+
+ url = reverse('login')
+
+ response = self.client.get(url)
+ self.assertEqual(response.status_code, 200)
+
+ response = self.client.post(url, form_data)
+ self.assertRedirects(response, settings.LOGIN_REDIRECT_URL)
+
+ old_region = sc.get_endpoints()['compute'][0]['region']
+ self.assertEqual(self.client.session['services_region'], old_region)
+
+ region = sc.get_endpoints()['compute'][1]['region']
+ url = reverse('switch_services_region', args=[region])
+
+ form_data['region_name'] = region
+
+ if next:
+ form_data.update({auth.REDIRECT_FIELD_NAME: next})
+
+ response = self.client.get(url, form_data)
+
+ if next:
+ if django.VERSION >= (1, 9):
+ expected_url = next
+ else:
+ expected_url = 'http://testserver%s' % next
+ self.assertEqual(response['location'], expected_url)
+ else:
+ self.assertRedirects(response, settings.LOGIN_REDIRECT_URL)
+
+ self.assertEqual(self.client.session['services_region'], region)
+ self.assertEqual(self.client.cookies['services_region'].value, region)
+
+ def test_switch_region_with_next(self, next=None):
+ self.test_switch_region(next='/next_url')
+
+ def test_tenant_sorting(self):
+ tenants = [self.data.tenant_two, self.data.tenant_one]
+ expected_tenants = [self.data.tenant_one, self.data.tenant_two]
+ user = self.data.user
+ unscoped = self.data.unscoped_access_info
+
+ client = self._mock_unscoped_client_with_token(user, unscoped)
+ self._mock_unscoped_list_tenants(client, tenants)
+
+ self.mox.ReplayAll()
+
+ tenant_list = utils.get_project_list(
+ user_id=user.id,
+ auth_url=settings.OPENSTACK_KEYSTONE_URL,
+ token=unscoped.auth_token)
+ self.assertEqual(tenant_list, expected_tenants)
+
+
+class OpenStackAuthTestsV3(OpenStackAuthTestsMixin,
+ OpenStackAuthFederatedTestsMixin,
+ test.TestCase):
+
+ def _mock_unscoped_client_list_projects(self, user, projects):
+ client = self._mock_unscoped_client(user)
+ self._mock_unscoped_list_projects(client, user, projects)
+
+ def _mock_unscoped_list_projects(self, client, user, projects):
+ client.projects = self.mox.CreateMockAnything()
+ client.projects.list(user=user.id).AndReturn(projects)
+
+ def _mock_unscoped_client_list_projects_fail(self, user):
+ client = self._mock_unscoped_client(user)
+ self._mock_unscoped_list_projects_fail(client, user)
+
+ def _mock_unscoped_list_projects_fail(self, client, user):
+ plugin = self._create_token_auth(
+ project_id=None,
+ domain_name=DEFAULT_DOMAIN,
+ token=self.data.unscoped_access_info.auth_token,
+ url=settings.OPENSTACK_KEYSTONE_URL)
+ plugin.get_access(mox.IsA(session.Session)).AndReturn(
+ self.data.domain_scoped_access_info)
+ client.projects = self.mox.CreateMockAnything()
+ client.projects.list(user=user.id).AndRaise(
+ keystone_exceptions.AuthorizationFailure)
+
+ def _mock_unscoped_and_domain_list_projects(self, user, projects):
+ client = self._mock_unscoped_client(user)
+ self._mock_scoped_for_domain(projects)
+ self._mock_unscoped_list_projects(client, user, projects)
+
+ def _mock_scoped_for_domain(self, projects):
+ url = settings.OPENSTACK_KEYSTONE_URL
+
+ plugin = self._create_token_auth(
+ project_id=None,
+ domain_name=DEFAULT_DOMAIN,
+ token=self.data.unscoped_access_info.auth_token,
+ url=url)
+
+ plugin.get_access(mox.IsA(session.Session)).AndReturn(
+ self.data.domain_scoped_access_info)
+
+ # if no projects or no enabled projects for user, but domain scoped
+ # token client auth gets set to domain scoped auth otherwise it's set
+ # to the project scoped auth and that happens in a different mock
+ enabled_projects = [project for project in projects if project.enabled]
+ if not projects or not enabled_projects:
+ return self.ks_client_module.Client(
+ session=mox.IsA(session.Session),
+ auth=plugin)
+
+ def _create_password_auth(self, username=None, password=None, url=None):
+ if not username:
+ username = self.data.user.name
+
+ if not password:
+ password = self.data.user.password
+
+ if not url:
+ url = settings.OPENSTACK_KEYSTONE_URL
+
+ return v3_auth.Password(auth_url=url,
+ password=password,
+ username=username,
+ user_domain_name=DEFAULT_DOMAIN,
+ unscoped=True)
+
+ def _create_token_auth(self, project_id, token=None, url=None,
+ domain_name=None):
+ if not token:
+ token = self.data.unscoped_access_info.auth_token
+
+ if not url:
+ url = settings.OPENSTACK_KEYSTONE_URL
+
+ if domain_name:
+ return v3_auth.Token(auth_url=url,
+ token=token,
+ domain_name=domain_name,
+ reauthenticate=False)
+ else:
+ return v3_auth.Token(auth_url=url,
+ token=token,
+ project_id=project_id,
+ reauthenticate=False)
+
+ def setUp(self):
+ super(OpenStackAuthTestsV3, self).setUp()
+
+ if getattr(self, 'interface', None):
+ override = self.settings(OPENSTACK_ENDPOINT_TYPE=self.interface)
+ override.enable()
+ self.addCleanup(override.disable)
+
+ self.mox = mox.Mox()
+ self.addCleanup(self.mox.VerifyAll)
+ self.addCleanup(self.mox.UnsetStubs)
+
+ self.data = data_v3.generate_test_data()
+ self.ks_client_module = client_v3
+ settings.OPENSTACK_API_VERSIONS['identity'] = 3
+ settings.OPENSTACK_KEYSTONE_URL = "http://localhost:5000/v3"
+
+ self.mox.StubOutClassWithMocks(token_endpoint, 'Token')
+ self.mox.StubOutClassWithMocks(v3_auth, 'Token')
+ self.mox.StubOutClassWithMocks(v3_auth, 'Password')
+ self.mox.StubOutClassWithMocks(client_v3, 'Client')
+ self.mox.StubOutClassWithMocks(v3_auth, 'Keystone2Keystone')
+
+ def test_login(self):
+ projects = [self.data.project_one, self.data.project_two]
+ user = self.data.user
+ unscoped = self.data.unscoped_access_info
+
+ form_data = self.get_form_data(user)
+ self._mock_unscoped_and_domain_list_projects(user, projects)
+ self._mock_scoped_client_for_tenant(unscoped, self.data.project_one.id)
+
+ self.mox.ReplayAll()
+
+ url = reverse('login')
+
+ # GET the page to set the test cookie.
+ response = self.client.get(url, form_data)
+ self.assertEqual(response.status_code, 200)
+
+ # POST to the page to log in.
+ response = self.client.post(url, form_data)
+ self.assertRedirects(response, settings.LOGIN_REDIRECT_URL)
+
+ def test_login_with_disabled_project(self):
+ # Test to validate that authentication will not try to get
+ # scoped token for disabled project.
+ projects = [self.data.project_two, self.data.project_one]
+ user = self.data.user
+ unscoped = self.data.unscoped_access_info
+
+ form_data = self.get_form_data(user)
+ self._mock_unscoped_and_domain_list_projects(user, projects)
+ self._mock_scoped_client_for_tenant(unscoped, self.data.project_one.id)
+ self.mox.ReplayAll()
+
+ url = reverse('login')
+
+ # GET the page to set the test cookie.
+ response = self.client.get(url, form_data)
+ self.assertEqual(response.status_code, 200)
+
+ # POST to the page to log in.
+ response = self.client.post(url, form_data)
+ self.assertRedirects(response, settings.LOGIN_REDIRECT_URL)
+
+ def test_no_enabled_projects(self):
+ projects = [self.data.project_two]
+ user = self.data.user
+
+ form_data = self.get_form_data(user)
+
+ self._mock_unscoped_and_domain_list_projects(user, projects)
+ self.mox.ReplayAll()
+
+ url = reverse('login')
+
+ # GET the page to set the test cookie.
+ response = self.client.get(url, form_data)
+ self.assertEqual(response.status_code, 200)
+
+ # POST to the page to log in.
+ response = self.client.post(url, form_data)
+ self.assertRedirects(response, settings.LOGIN_REDIRECT_URL)
+
+ def test_no_projects(self):
+ user = self.data.user
+ form_data = self.get_form_data(user)
+
+ self._mock_unscoped_and_domain_list_projects(user, [])
+ self.mox.ReplayAll()
+
+ url = reverse('login')
+
+ # GET the page to set the test cookie.
+ response = self.client.get(url, form_data)
+ self.assertEqual(response.status_code, 200)
+
+ # POST to the page to log in.
+ response = self.client.post(url, form_data)
+ self.assertRedirects(response, settings.LOGIN_REDIRECT_URL)
+
+ def test_fail_projects(self):
+ user = self.data.user
+
+ form_data = self.get_form_data(user)
+ self._mock_unscoped_client_list_projects_fail(user)
+ self.mox.ReplayAll()
+
+ url = reverse('login')
+
+ # GET the page to set the test cookie.
+ response = self.client.get(url, form_data)
+ self.assertEqual(response.status_code, 200)
+
+ # POST to the page to log in.
+ response = self.client.post(url, form_data)
+ self.assertTemplateUsed(response, 'auth/login.html')
+ self.assertContains(response,
+ 'Unable to retrieve authorized projects.')
+
+ def test_invalid_credentials(self):
+ user = self.data.user
+
+ form_data = self.get_form_data(user)
+
+ form_data['password'] = "invalid"
+
+ exc = keystone_exceptions.Unauthorized(401)
+ self._mock_client_password_auth_failure(user.name, "invalid", exc)
+
+ self.mox.ReplayAll()
+
+ url = reverse('login')
+
+ # GET the page to set the test cookie.
+ response = self.client.get(url, form_data)
+ self.assertEqual(response.status_code, 200)
+
+ # POST to the page to log in.
+ response = self.client.post(url, form_data)
+ self.assertTemplateUsed(response, 'auth/login.html')
+ self.assertContains(response, "Invalid credentials.")
+
+ def test_exception(self):
+ user = self.data.user
+ form_data = self.get_form_data(user)
+ exc = keystone_exceptions.ClientException(500)
+ self._mock_client_password_auth_failure(user.name, user.password, exc)
+ self.mox.ReplayAll()
+
+ url = reverse('login')
+
+ # GET the page to set the test cookie.
+ response = self.client.get(url, form_data)
+ self.assertEqual(response.status_code, 200)
+
+ # POST to the page to log in.
+ response = self.client.post(url, form_data)
+
+ self.assertTemplateUsed(response, 'auth/login.html')
+ self.assertContains(response,
+ ("An error occurred authenticating. Please try "
+ "again later."))
+
+ def test_switch(self, next=None):
+ project = self.data.project_two
+ projects = [self.data.project_one, self.data.project_two]
+ user = self.data.user
+ scoped = self.data.scoped_access_info
+ sc = self.data.service_catalog
+ et = getattr(settings, 'OPENSTACK_ENDPOINT_TYPE', 'publicURL')
+
+ form_data = self.get_form_data(user)
+
+ self._mock_unscoped_and_domain_list_projects(user, projects)
+ self._mock_scoped_client_for_tenant(scoped, self.data.project_one.id)
+ self._mock_scoped_client_for_tenant(
+ scoped,
+ project.id,
+ url=sc.url_for(service_type='identity', interface=et),
+ client=False)
+
+ self.mox.ReplayAll()
+
+ url = reverse('login')
+
+ response = self.client.get(url)
+ self.assertEqual(response.status_code, 200)
+
+ response = self.client.post(url, form_data)
+ self.assertRedirects(response, settings.LOGIN_REDIRECT_URL)
+
+ url = reverse('switch_tenants', args=[project.id])
+
+ scoped._project['id'] = self.data.project_two.id
+
+ if next:
+ form_data.update({auth.REDIRECT_FIELD_NAME: next})
+
+ response = self.client.get(url, form_data)
+
+ if next:
+ if django.VERSION >= (1, 9):
+ expected_url = next
+ else:
+ expected_url = 'http://testserver%s' % next
+ self.assertEqual(response['location'], expected_url)
+ else:
+ self.assertRedirects(response, settings.LOGIN_REDIRECT_URL)
+
+ self.assertEqual(self.client.session['token'].project['id'],
+ scoped.project_id)
+
+ def test_switch_with_next(self):
+ self.test_switch(next='/next_url')
+
+ def test_switch_region(self, next=None):
+ projects = [self.data.project_one, self.data.project_two]
+ user = self.data.user
+ scoped = self.data.unscoped_access_info
+ sc = self.data.service_catalog
+
+ form_data = self.get_form_data(user)
+ self._mock_unscoped_and_domain_list_projects(user, projects)
+ self._mock_scoped_client_for_tenant(scoped, self.data.project_one.id)
+
+ self.mox.ReplayAll()
+
+ url = reverse('login')
+
+ response = self.client.get(url)
+ self.assertEqual(response.status_code, 200)
+
+ response = self.client.post(url, form_data)
+ self.assertRedirects(response, settings.LOGIN_REDIRECT_URL)
+
+ old_region = sc.get_endpoints()['compute'][0]['region']
+ self.assertEqual(self.client.session['services_region'], old_region)
+
+ region = sc.get_endpoints()['compute'][1]['region']
+ url = reverse('switch_services_region', args=[region])
+
+ form_data['region_name'] = region
+
+ if next:
+ form_data.update({auth.REDIRECT_FIELD_NAME: next})
+
+ response = self.client.get(url, form_data)
+
+ if next:
+ if django.VERSION >= (1, 9):
+ expected_url = next
+ else:
+ expected_url = 'http://testserver%s' % next
+ self.assertEqual(response['location'], expected_url)
+ else:
+ self.assertRedirects(response, settings.LOGIN_REDIRECT_URL)
+
+ self.assertEqual(self.client.session['services_region'], region)
+
+ def test_switch_region_with_next(self, next=None):
+ self.test_switch_region(next='/next_url')
+
+ def test_switch_keystone_provider_remote_fail(self):
+ auth_url = settings.OPENSTACK_KEYSTONE_URL
+ target_provider = 'k2kserviceprovider'
+ self.data = data_v3.generate_test_data(service_providers=True)
+ self.sp_data = data_v3.generate_test_data(endpoint='http://sp2')
+ projects = [self.data.project_one, self.data.project_two]
+ user = self.data.user
+ unscoped = self.data.unscoped_access_info
+ form_data = self.get_form_data(user)
+
+ # mock authenticate
+ self._mock_unscoped_and_domain_list_projects(user, projects)
+ self._mock_scoped_client_for_tenant(unscoped, self.data.project_one.id)
+
+ # mock switch
+ plugin = v3_auth.Token(auth_url=auth_url,
+ token=unscoped.auth_token,
+ project_id=None,
+ reauthenticate=False)
+ plugin.get_access(mox.IsA(session.Session)
+ ).AndReturn(self.data.unscoped_access_info)
+ plugin.auth_url = auth_url
+ client = self.ks_client_module.Client(session=mox.IsA(session.Session),
+ auth=plugin)
+
+ self._mock_unscoped_list_projects(client, user, projects)
+ plugin = self._create_token_auth(
+ self.data.project_one.id,
+ token=self.data.unscoped_access_info.auth_token,
+ url=settings.OPENSTACK_KEYSTONE_URL)
+ plugin.get_access(mox.IsA(session.Session)).AndReturn(
+ settings.OPENSTACK_KEYSTONE_URL)
+ plugin.get_sp_auth_url(
+ mox.IsA(session.Session), target_provider
+ ).AndReturn('https://k2kserviceprovider/sp_url')
+
+ # let the K2K plugin fail when logging in
+ plugin = v3_auth.Keystone2Keystone(
+ base_plugin=plugin, service_provider=target_provider)
+ plugin.get_access(mox.IsA(session.Session)).AndRaise(
+ keystone_exceptions.AuthorizationFailure)
+ self.mox.ReplayAll()
+
+ # Log in
+ url = reverse('login')
+ response = self.client.get(url)
+ self.assertEqual(response.status_code, 200)
+
+ response = self.client.post(url, form_data)
+ self.assertRedirects(response, settings.LOGIN_REDIRECT_URL)
+
+ # Switch
+ url = reverse('switch_keystone_provider', args=[target_provider])
+ form_data['keystone_provider'] = target_provider
+ response = self.client.get(url, form_data, follow=True)
+ self.assertRedirects(response, settings.LOGIN_REDIRECT_URL)
+
+ # Assert that provider has not changed because of failure
+ self.assertEqual(self.client.session['keystone_provider_id'],
+ 'localkeystone')
+ # These should never change
+ self.assertEqual(self.client.session['k2k_base_unscoped_token'],
+ unscoped.auth_token)
+ self.assertEqual(self.client.session['k2k_auth_url'], auth_url)
+
+ def test_switch_keystone_provider_remote(self):
+ auth_url = settings.OPENSTACK_KEYSTONE_URL
+ target_provider = 'k2kserviceprovider'
+ self.data = data_v3.generate_test_data(service_providers=True)
+ self.sp_data = data_v3.generate_test_data(endpoint='http://sp2')
+ projects = [self.data.project_one, self.data.project_two]
+ domains = []
+ user = self.data.user
+ unscoped = self.data.unscoped_access_info
+ form_data = self.get_form_data(user)
+
+ # mock authenticate
+ self._mock_unscoped_and_domain_list_projects(user, projects)
+ self._mock_scoped_client_for_tenant(unscoped, self.data.project_one.id)
+
+ # mock switch
+ plugin = v3_auth.Token(auth_url=auth_url,
+ token=unscoped.auth_token,
+ project_id=None,
+ reauthenticate=False)
+ plugin.get_access(mox.IsA(session.Session)).AndReturn(
+ self.data.unscoped_access_info)
+
+ plugin.auth_url = auth_url
+ client = self.ks_client_module.Client(session=mox.IsA(session.Session),
+ auth=plugin)
+
+ self._mock_unscoped_list_projects(client, user, projects)
+ plugin = self._create_token_auth(
+ self.data.project_one.id,
+ token=self.data.unscoped_access_info.auth_token,
+ url=settings.OPENSTACK_KEYSTONE_URL)
+ plugin.get_access(mox.IsA(session.Session)).AndReturn(
+ settings.OPENSTACK_KEYSTONE_URL)
+
+ plugin.get_sp_auth_url(
+ mox.IsA(session.Session), target_provider
+ ).AndReturn('https://k2kserviceprovider/sp_url')
+ plugin = v3_auth.Keystone2Keystone(base_plugin=plugin,
+ service_provider=target_provider)
+ plugin.get_access(mox.IsA(session.Session)). \
+ AndReturn(self.sp_data.unscoped_access_info)
+ plugin.auth_url = 'http://service_provider_endp:5000/v3'
+
+ # mock authenticate for service provider
+ sp_projects = [self.sp_data.project_one, self.sp_data.project_two]
+ sp_unscoped = self.sp_data.federated_unscoped_access_info
+ sp_unscoped_auth = self._mock_plugin(sp_unscoped,
+ auth_url=plugin.auth_url)
+ client = self._mock_unscoped_token_client(None, plugin.auth_url,
+ plugin=sp_unscoped_auth)
+ self._mock_unscoped_list_domains(client, domains)
+ client = self._mock_unscoped_token_client(None, plugin.auth_url,
+ plugin=sp_unscoped_auth)
+ self._mock_unscoped_federated_list_projects(client, sp_projects)
+ self._mock_scoped_client_for_tenant(sp_unscoped,
+ self.sp_data.project_one.id,
+ url=plugin.auth_url,
+ token=sp_unscoped.auth_token)
+
+ self.mox.ReplayAll()
+
+ # Log in
+ url = reverse('login')
+ response = self.client.get(url)
+ self.assertEqual(response.status_code, 200)
+
+ response = self.client.post(url, form_data)
+ self.assertRedirects(response, settings.LOGIN_REDIRECT_URL)
+
+ # Switch
+ url = reverse('switch_keystone_provider', args=[target_provider])
+ form_data['keystone_provider'] = target_provider
+ response = self.client.get(url, form_data, follow=True)
+ self.assertRedirects(response, settings.LOGIN_REDIRECT_URL)
+
+ # Assert keystone provider has changed
+ self.assertEqual(self.client.session['keystone_provider_id'],
+ target_provider)
+ # These should not change
+ self.assertEqual(self.client.session['k2k_base_unscoped_token'],
+ unscoped.auth_token)
+ self.assertEqual(self.client.session['k2k_auth_url'], auth_url)
+
+ def test_switch_keystone_provider_local(self):
+ auth_url = settings.OPENSTACK_KEYSTONE_URL
+ self.data = data_v3.generate_test_data(service_providers=True)
+ keystone_provider = 'localkeystone'
+ projects = [self.data.project_one, self.data.project_two]
+ domains = []
+ user = self.data.user
+ unscoped = self.data.unscoped_access_info
+ form_data = self.get_form_data(user)
+
+ # mock authenticate
+ self._mock_unscoped_and_domain_list_projects(user, projects)
+ self._mock_scoped_client_for_tenant(unscoped, self.data.project_one.id)
+ self._mock_unscoped_token_client(unscoped,
+ auth_url=auth_url,
+ client=False)
+ unscoped_auth = self._mock_plugin(unscoped)
+ client = self._mock_unscoped_token_client(None, auth_url=auth_url,
+ plugin=unscoped_auth)
+ self._mock_unscoped_list_domains(client, domains)
+ client = self._mock_unscoped_token_client(None, auth_url=auth_url,
+ plugin=unscoped_auth)
+ self._mock_unscoped_list_projects(client, user, projects)
+ self._mock_scoped_client_for_tenant(unscoped, self.data.project_one.id)
+
+ self.mox.ReplayAll()
+
+ # Log in
+ url = reverse('login')
+ response = self.client.get(url)
+ self.assertEqual(response.status_code, 200)
+
+ response = self.client.post(url, form_data)
+ self.assertRedirects(response, settings.LOGIN_REDIRECT_URL)
+
+ # Switch
+ url = reverse('switch_keystone_provider', args=[keystone_provider])
+ form_data['keystone_provider'] = keystone_provider
+ response = self.client.get(url, form_data, follow=True)
+ self.assertRedirects(response, settings.LOGIN_REDIRECT_URL)
+
+ # Assert nothing has changed since we are going from local to local
+ self.assertEqual(self.client.session['keystone_provider_id'],
+ keystone_provider)
+ self.assertEqual(self.client.session['k2k_base_unscoped_token'],
+ unscoped.auth_token)
+ self.assertEqual(self.client.session['k2k_auth_url'], auth_url)
+
+ def test_switch_keystone_provider_local_fail(self):
+ auth_url = settings.OPENSTACK_KEYSTONE_URL
+ self.data = data_v3.generate_test_data(service_providers=True)
+ keystone_provider = 'localkeystone'
+ projects = [self.data.project_one, self.data.project_two]
+ user = self.data.user
+ unscoped = self.data.unscoped_access_info
+ form_data = self.get_form_data(user)
+
+ # mock authenticate
+ self._mock_unscoped_and_domain_list_projects(user, projects)
+ self._mock_scoped_client_for_tenant(unscoped, self.data.project_one.id)
+
+ # Let using the base token for logging in fail
+ plugin = v3_auth.Token(auth_url=auth_url,
+ token=unscoped.auth_token,
+ project_id=None,
+ reauthenticate=False)
+ plugin.get_access(mox.IsA(session.Session)). \
+ AndRaise(keystone_exceptions.AuthorizationFailure)
+ plugin.auth_url = auth_url
+ self.mox.ReplayAll()
+
+ # Log in
+ url = reverse('login')
+ response = self.client.get(url)
+ self.assertEqual(response.status_code, 200)
+
+ response = self.client.post(url, form_data)
+ self.assertRedirects(response, settings.LOGIN_REDIRECT_URL)
+
+ # Switch
+ url = reverse('switch_keystone_provider', args=[keystone_provider])
+ form_data['keystone_provider'] = keystone_provider
+ response = self.client.get(url, form_data, follow=True)
+ self.assertRedirects(response, settings.LOGIN_REDIRECT_URL)
+
+ # Assert
+ self.assertEqual(self.client.session['keystone_provider_id'],
+ keystone_provider)
+ self.assertEqual(self.client.session['k2k_base_unscoped_token'],
+ unscoped.auth_token)
+ self.assertEqual(self.client.session['k2k_auth_url'], auth_url)
+
+ def test_tenant_sorting(self):
+ projects = [self.data.project_two, self.data.project_one]
+ expected_projects = [self.data.project_one, self.data.project_two]
+ user = self.data.user
+ unscoped = self.data.unscoped_access_info
+
+ client = self._mock_unscoped_client_with_token(user, unscoped)
+ self._mock_unscoped_list_projects(client, user, projects)
+ self.mox.ReplayAll()
+
+ project_list = utils.get_project_list(
+ user_id=user.id,
+ auth_url=settings.OPENSTACK_KEYSTONE_URL,
+ token=unscoped.auth_token)
+ self.assertEqual(project_list, expected_projects)
+
+ def test_login_form_multidomain(self):
+ override = self.settings(OPENSTACK_KEYSTONE_MULTIDOMAIN_SUPPORT=True)
+ override.enable()
+ self.addCleanup(override.disable)
+
+ url = reverse('login')
+ response = self.client.get(url)
+ self.assertEqual(response.status_code, 200)
+ self.assertContains(response, 'id="id_domain"')
+ self.assertContains(response, 'name="domain"')
+
+ def test_login_form_multidomain_dropdown(self):
+ override = self.settings(OPENSTACK_KEYSTONE_MULTIDOMAIN_SUPPORT=True,
+ OPENSTACK_KEYSTONE_DOMAIN_DROPDOWN=True,
+ OPENSTACK_KEYSTONE_DOMAIN_CHOICES=(
+ ('Default', 'Default'),)
+ )
+ override.enable()
+ self.addCleanup(override.disable)
+
+ url = reverse('login')
+ response = self.client.get(url)
+ self.assertEqual(response.status_code, 200)
+ self.assertContains(response, 'id="id_domain"')
+ self.assertContains(response, 'name="domain"')
+ self.assertContains(response, 'option value="Default"')
+ settings.OPENSTACK_KEYSTONE_DOMAIN_DROPDOWN = False
+
+
+class OpenStackAuthTestsWebSSO(OpenStackAuthTestsMixin,
+ OpenStackAuthFederatedTestsMixin,
+ test.TestCase):
+
+ def _create_token_auth(self, project_id=None, token=None, url=None):
+ if not token:
+ token = self.data.federated_unscoped_access_info.auth_token
+
+ if not url:
+ url = settings.OPENSTACK_KEYSTONE_URL
+
+ return v3_auth.Token(auth_url=url,
+ token=token,
+ project_id=project_id,
+ reauthenticate=False)
+
+ def setUp(self):
+ super(OpenStackAuthTestsWebSSO, self).setUp()
+
+ self.mox = mox.Mox()
+ self.addCleanup(self.mox.VerifyAll)
+ self.addCleanup(self.mox.UnsetStubs)
+
+ self.data = data_v3.generate_test_data()
+ self.ks_client_module = client_v3
+
+ self.idp_id = uuid.uuid4().hex
+ self.idp_oidc_id = uuid.uuid4().hex
+ self.idp_saml2_id = uuid.uuid4().hex
+
+ settings.OPENSTACK_API_VERSIONS['identity'] = 3
+ settings.OPENSTACK_KEYSTONE_URL = 'http://localhost:5000/v3'
+ settings.WEBSSO_ENABLED = True
+ settings.WEBSSO_CHOICES = (
+ ('credentials', 'Keystone Credentials'),
+ ('oidc', 'OpenID Connect'),
+ ('saml2', 'Security Assertion Markup Language'),
+ (self.idp_oidc_id, 'IDP OIDC'),
+ (self.idp_saml2_id, 'IDP SAML2')
+ )
+ settings.WEBSSO_IDP_MAPPING = {
+ self.idp_oidc_id: (self.idp_id, 'oidc'),
+ self.idp_saml2_id: (self.idp_id, 'saml2')
+ }
+
+ self.mox.StubOutClassWithMocks(token_endpoint, 'Token')
+ self.mox.StubOutClassWithMocks(v3_auth, 'Token')
+ self.mox.StubOutClassWithMocks(v3_auth, 'Password')
+ self.mox.StubOutClassWithMocks(client_v3, 'Client')
+
+ def test_login_form(self):
+ url = reverse('login')
+
+ response = self.client.get(url)
+ self.assertEqual(response.status_code, 200)
+ self.assertContains(response, 'credentials')
+ self.assertContains(response, 'oidc')
+ self.assertContains(response, 'saml2')
+ self.assertContains(response, self.idp_oidc_id)
+ self.assertContains(response, self.idp_saml2_id)
+
+ def test_websso_redirect_by_protocol(self):
+ origin = 'http://testserver/auth/websso/'
+ protocol = 'oidc'
+ redirect_url = ('%s/auth/OS-FEDERATION/websso/%s?origin=%s' %
+ (settings.OPENSTACK_KEYSTONE_URL, protocol, origin))
+
+ form_data = {'auth_type': protocol,
+ 'region': settings.OPENSTACK_KEYSTONE_URL}
+ url = reverse('login')
+
+ # POST to the page and redirect to keystone.
+ response = self.client.post(url, form_data)
+ self.assertRedirects(response, redirect_url, status_code=302,
+ target_status_code=404)
+
+ def test_websso_redirect_by_idp(self):
+ origin = 'http://testserver/auth/websso/'
+ protocol = 'oidc'
+ redirect_url = ('%s/auth/OS-FEDERATION/identity_providers/%s'
+ '/protocols/%s/websso?origin=%s' %
+ (settings.OPENSTACK_KEYSTONE_URL, self.idp_id,
+ protocol, origin))
+
+ form_data = {'auth_type': self.idp_oidc_id,
+ 'region': settings.OPENSTACK_KEYSTONE_URL}
+ url = reverse('login')
+
+ # POST to the page and redirect to keystone.
+ response = self.client.post(url, form_data)
+ self.assertRedirects(response, redirect_url, status_code=302,
+ target_status_code=404)
+
+ def test_websso_login(self):
+ projects = [self.data.project_one, self.data.project_two]
+ domains = []
+ unscoped = self.data.federated_unscoped_access_info
+ token = unscoped.auth_token
+ unscoped_auth = self._mock_plugin(unscoped)
+
+ form_data = {'token': token}
+ self._mock_federated_client_list_domains(unscoped_auth, domains)
+ self._mock_federated_client_list_projects(unscoped_auth, projects)
+ self._mock_scoped_client_for_tenant(unscoped, self.data.project_one.id)
+
+ self.mox.ReplayAll()
+
+ url = reverse('websso')
+
+ # POST to the page to log in.
+ response = self.client.post(url, form_data)
+ self.assertRedirects(response, settings.LOGIN_REDIRECT_URL)
+
+ def test_websso_login_with_auth_in_url(self):
+ settings.OPENSTACK_KEYSTONE_URL = 'http://auth.openstack.org:5000/v3'
+
+ projects = [self.data.project_one, self.data.project_two]
+ domains = []
+ unscoped = self.data.federated_unscoped_access_info
+ token = unscoped.auth_token
+ unscoped_auth = self._mock_plugin(unscoped)
+
+ form_data = {'token': token}
+ self._mock_federated_client_list_domains(unscoped_auth, domains)
+ self._mock_federated_client_list_projects(unscoped_auth, projects)
+ self._mock_scoped_client_for_tenant(unscoped, self.data.project_one.id)
+
+ self.mox.ReplayAll()
+
+ url = reverse('websso')
+
+ # POST to the page to log in.
+ response = self.client.post(url, form_data)
+ self.assertRedirects(response, settings.LOGIN_REDIRECT_URL)
+
+load_tests = load_tests_apply_scenarios
+
+
+class PolicyLoaderTestCase(test.TestCase):
+ def test_policy_file_load(self):
+ policy.reset()
+ enforcer = policy._get_enforcer()
+ self.assertEqual(2, len(enforcer))
+ self.assertIn('identity', enforcer)
+ self.assertIn('compute', enforcer)
+
+ def test_policy_reset(self):
+ policy._get_enforcer()
+ self.assertEqual(2, len(policy._ENFORCER))
+ policy.reset()
+ self.assertIsNone(policy._ENFORCER)
+
+
+class PermTestCase(test.TestCase):
+ def test_has_perms(self):
+ testuser = user.User(id=1, roles=[])
+
+ def has_perm(perm, obj=None):
+ return perm in ('perm1', 'perm3')
+
+ with mock.patch.object(testuser, 'has_perm', side_effect=has_perm):
+ self.assertFalse(testuser.has_perms(['perm2']))
+
+ # perm1 AND perm3
+ self.assertFalse(testuser.has_perms(['perm1', 'perm2']))
+
+ # perm1 AND perm3
+ self.assertTrue(testuser.has_perms(['perm1', 'perm3']))
+
+ # perm1 AND (perm2 OR perm3)
+ perm_list = ['perm1', ('perm2', 'perm3')]
+ self.assertTrue(testuser.has_perms(perm_list))
+
+
+class PolicyTestCase(test.TestCase):
+ _roles = []
+
+ def setUp(self):
+ mock_user = user.User(id=1, roles=self._roles)
+ patcher = mock.patch('openstack_auth.utils.get_user',
+ return_value=mock_user)
+ self.MockClass = patcher.start()
+ self.addCleanup(patcher.stop)
+ self.request = http.HttpRequest()
+
+
+class PolicyTestCaseNonAdmin(PolicyTestCase):
+ _roles = [{'id': '1', 'name': 'member'}]
+
+ def test_check_admin_required_false(self):
+ policy.reset()
+ value = policy.check((("identity", "admin_required"),),
+ request=self.request)
+ self.assertFalse(value)
+
+ def test_check_identity_rule_not_found_false(self):
+ policy.reset()
+ value = policy.check((("identity", "i_dont_exist"),),
+ request=self.request)
+ # this should fail because the default check for
+ # identity is admin_required
+ self.assertFalse(value)
+
+ def test_check_nova_context_is_admin_false(self):
+ policy.reset()
+ value = policy.check((("compute", "context_is_admin"),),
+ request=self.request)
+ self.assertFalse(value)
+
+ def test_compound_check_false(self):
+ policy.reset()
+ value = policy.check((("identity", "admin_required"),
+ ("identity", "identity:default"),),
+ request=self.request)
+ self.assertFalse(value)
+
+ def test_scope_not_found(self):
+ policy.reset()
+ value = policy.check((("dummy", "default"),),
+ request=self.request)
+ self.assertTrue(value)
+
+
+class PolicyTestCaseAdmin(PolicyTestCase):
+ _roles = [{'id': '1', 'name': 'admin'}]
+
+ def test_check_admin_required_true(self):
+ policy.reset()
+ value = policy.check((("identity", "admin_required"),),
+ request=self.request)
+ self.assertTrue(value)
+
+ def test_check_identity_rule_not_found_true(self):
+ policy.reset()
+ value = policy.check((("identity", "i_dont_exist"),),
+ request=self.request)
+ # this should succeed because the default check for
+ # identity is admin_required
+ self.assertTrue(value)
+
+ def test_compound_check_true(self):
+ policy.reset()
+ value = policy.check((("identity", "admin_required"),
+ ("identity", "identity:default"),),
+ request=self.request)
+ self.assertTrue(value)
+
+ def test_check_nova_context_is_admin_true(self):
+ policy.reset()
+ value = policy.check((("compute", "context_is_admin"),),
+ request=self.request)
+ self.assertTrue(value)
+
+
+class PolicyTestCaseV3Admin(PolicyTestCase):
+ _roles = [{'id': '1', 'name': 'admin'}]
+
+ def setUp(self):
+ policy_files = {
+ 'identity': 'policy.v3cloudsample.json',
+ 'compute': 'nova_policy.json'}
+
+ override = self.settings(POLICY_FILES=policy_files)
+ override.enable()
+ self.addCleanup(override.disable)
+
+ mock_user = user.User(id=1, roles=self._roles,
+ user_domain_id='admin_domain_id')
+ patcher = mock.patch('openstack_auth.utils.get_user',
+ return_value=mock_user)
+ self.MockClass = patcher.start()
+ self.addCleanup(patcher.stop)
+ self.request = http.HttpRequest()
+
+ def test_check_cloud_admin_required_true(self):
+ policy.reset()
+ value = policy.check((("identity", "cloud_admin"),),
+ request=self.request)
+ self.assertTrue(value)
+
+ def test_check_domain_admin_required_true(self):
+ policy.reset()
+ value = policy.check((
+ ("identity", "admin_and_matching_domain_id"),),
+ request=self.request)
+ self.assertTrue(value)
+
+ def test_check_any_admin_required_true(self):
+ policy.reset()
+ value = policy.check((("identity", "admin_or_cloud_admin"),),
+ request=self.request)
+ self.assertTrue(value)
+
+
+class RoleTestCaseAdmin(test.TestCase):
+
+ def test_get_admin_roles_with_default_value(self):
+ admin_roles = utils.get_admin_roles()
+ self.assertSetEqual({'admin'}, admin_roles)
+
+ @override_settings(OPENSTACK_KEYSTONE_ADMIN_ROLES=['foO', 'BAR', 'admin'])
+ def test_get_admin_roles(self):
+ admin_roles = utils.get_admin_roles()
+ self.assertSetEqual({'foo', 'bar', 'admin'}, admin_roles)
+
+ @override_settings(OPENSTACK_KEYSTONE_ADMIN_ROLES=['foO', 'BAR', 'admin'])
+ def test_get_admin_permissions(self):
+ admin_permissions = utils.get_admin_permissions()
+ self.assertSetEqual({'openstack.roles.foo',
+ 'openstack.roles.bar',
+ 'openstack.roles.admin'}, admin_permissions)
+
+
+class UtilsTestCase(test.TestCase):
+
+ def test_fix_auth_url_version_v20(self):
+ settings.OPENSTACK_API_VERSIONS['identity'] = 2.0
+ test_urls = [
+ ("http://a/", ("http://a/v2.0", False)),
+ ("http://a", ("http://a/v2.0", False)),
+ ("http://a:8080/", ("http://a:8080/v2.0", False)),
+ ("http://a/v2.0", ("http://a/v2.0", False)),
+ ("http://a/v2.0/", ("http://a/v2.0/", False)),
+ ("http://a/identity", ("http://a/identity/v2.0", False)),
+ ("http://a/identity/", ("http://a/identity/v2.0", False)),
+ ("http://a:5000/identity/v2.0",
+ ("http://a:5000/identity/v2.0", False)),
+ ("http://a/identity/v2.0/", ("http://a/identity/v2.0/", False))
+ ]
+ for src, expected in test_urls:
+ self.assertEqual(expected, utils.fix_auth_url_version_prefix(src))
+
+ def test_fix_auth_url_version_v3(self):
+ settings.OPENSTACK_API_VERSIONS['identity'] = 3
+ test_urls = [
+ ("http://a/", ("http://a/v3", False)),
+ ("http://a", ("http://a/v3", False)),
+ ("http://a:8080/", ("http://a:8080/v3", False)),
+ ("http://a/v3", ("http://a/v3", False)),
+ ("http://a/v3/", ("http://a/v3/", False)),
+ ("http://a/v2.0/", ("http://a/v3/", True)),
+ ("http://a/v2.0", ("http://a/v3", True)),
+ ("http://a/identity", ("http://a/identity/v3", False)),
+ ("http://a:5000/identity/", ("http://a:5000/identity/v3", False)),
+ ("http://a/identity/v3", ("http://a/identity/v3", False)),
+ ("http://a/identity/v3/", ("http://a/identity/v3/", False))
+ ]
+ for src, expected in test_urls:
+ self.assertEqual(expected, utils.fix_auth_url_version_prefix(src))
+
+
+class UserTestCase(test.TestCase):
+
+ def setUp(self):
+ self.data = data_v3.generate_test_data(pki=True)
+
+ def test_unscoped_token_is_none(self):
+ created_token = user.Token(self.data.domain_scoped_access_info,
+ unscoped_token=None)
+ self.assertTrue(created_token._is_pki_token(
+ self.data.domain_scoped_access_info.auth_token))
+ self.assertFalse(created_token._is_pki_token(None))
+
+
+class BehindProxyTestCase(test.TestCase):
+
+ def setUp(self):
+ self.request = http.HttpRequest()
+
+ def test_without_proxy(self):
+ self.request.META['REMOTE_ADDR'] = '10.111.111.2'
+ from openstack_auth.utils import get_client_ip
+ self.assertEqual('10.111.111.2', get_client_ip(self.request))
+
+ def test_with_proxy_no_settings(self):
+ from openstack_auth.utils import get_client_ip
+ self.request.META['REMOTE_ADDR'] = '10.111.111.2'
+ self.request.META['HTTP_X_REAL_IP'] = '192.168.15.33'
+ self.request.META['HTTP_X_FORWARDED_FOR'] = '172.18.0.2'
+ self.assertEqual('10.111.111.2', get_client_ip(self.request))
+
+ def test_with_settings_without_proxy(self):
+ from openstack_auth.utils import get_client_ip
+ self.request.META['REMOTE_ADDR'] = '10.111.111.2'
+ self.assertEqual('10.111.111.2', get_client_ip(self.request))
+
+ @override_settings(SECURE_PROXY_ADDR_HEADER='HTTP_X_FORWARDED_FOR')
+ def test_with_settings_with_proxy_forwardfor(self):
+ from openstack_auth.utils import get_client_ip
+ self.request.META['REMOTE_ADDR'] = '10.111.111.2'
+ self.request.META['HTTP_X_FORWARDED_FOR'] = '172.18.0.2'
+ self.assertEqual('172.18.0.2', get_client_ip(self.request))
+
+ @override_settings(SECURE_PROXY_ADDR_HEADER='HTTP_X_REAL_IP')
+ def test_with_settings_with_proxy_real_ip(self):
+ from openstack_auth.utils import get_client_ip
+ self.request.META['REMOTE_ADDR'] = '10.111.111.2'
+ self.request.META['HTTP_X_REAL_IP'] = '192.168.15.33'
+ self.request.META['HTTP_X_FORWARDED_FOR'] = '172.18.0.2'
+ self.assertEqual('192.168.15.33', get_client_ip(self.request))