diff options
| author | Michael Basnight <mbasnight@gmail.com> | 2013-10-22 17:48:09 +0000 |
|---|---|---|
| committer | Michael Basnight <mbasnight@gmail.com> | 2013-10-23 18:23:48 +0000 |
| commit | d56ac15ebf86c5ef0899bd23c75aa4e0770a7f93 (patch) | |
| tree | 90401bffaa587291f258731053d05ae2f8a08490 /troveclient/compat/tests | |
| parent | 6431cc825807ddac6a4aa0dd174efe0b42e13760 (diff) | |
| download | python-troveclient-d56ac15ebf86c5ef0899bd23c75aa4e0770a7f93.tar.gz | |
Removing unused files with new client
fixes bug 1243452
Change-Id: Ia51e6d48e486baa818ebe3108d37bb1de537702d
Diffstat (limited to 'troveclient/compat/tests')
| -rw-r--r-- | troveclient/compat/tests/test_auth.py | 431 |
1 files changed, 431 insertions, 0 deletions
diff --git a/troveclient/compat/tests/test_auth.py b/troveclient/compat/tests/test_auth.py new file mode 100644 index 0000000..cb3b6bc --- /dev/null +++ b/troveclient/compat/tests/test_auth.py @@ -0,0 +1,431 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2011 OpenStack Foundation +# Copyright 2013 Rackspace Hosting +# Copyright 2013 Hewlett-Packard Development Company, L.P. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from testtools import TestCase +from troveclient.compat import auth +from mock import Mock + +from troveclient.compat import exceptions + +""" +Unit tests for the classes and functions in auth.py. +""" + + +def check_url_none(test_case, auth_class): + # url is None, it must throw exception + authObj = auth_class(url=None, type=auth_class, client=None, + username=None, password=None, tenant=None) + try: + authObj.authenticate() + test_case.fail("AuthUrlNotGiven exception expected") + except exceptions.AuthUrlNotGiven: + pass + + +class AuthenticatorTest(TestCase): + + def setUp(self): + super(AuthenticatorTest, self).setUp() + self.orig_load = auth.ServiceCatalog._load + self.orig__init = auth.ServiceCatalog.__init__ + + def tearDown(self): + super(AuthenticatorTest, self).tearDown() + auth.ServiceCatalog._load = self.orig_load + auth.ServiceCatalog.__init__ = self.orig__init + + def test_get_authenticator_cls(self): + class_list = (auth.KeyStoneV2Authenticator, + auth.RaxAuthenticator, + auth.Auth1_1, + auth.FakeAuth) + + for c in class_list: + self.assertEqual(c, auth.get_authenticator_cls(c)) + + class_names = {"keystone": auth.KeyStoneV2Authenticator, + "rax": auth.RaxAuthenticator, + "auth1.1": auth.Auth1_1, + "fake": auth.FakeAuth} + + for cn in class_names.keys(): + self.assertEqual(class_names[cn], auth.get_authenticator_cls(cn)) + + cls_or_name = "_unknown_" + self.assertRaises(ValueError, auth.get_authenticator_cls, cls_or_name) + + def test__authenticate(self): + authObj = auth.Authenticator(Mock(), auth.KeyStoneV2Authenticator, + Mock(), Mock(), Mock(), Mock()) + # test response code 200 + resp = Mock() + resp.status = 200 + body = "test_body" + + auth.ServiceCatalog._load = Mock(return_value=1) + authObj.client._time_request = Mock(return_value=(resp, body)) + + sc = authObj._authenticate(Mock(), Mock()) + self.assertEqual(body, sc.catalog) + + # test AmbiguousEndpoints exception + auth.ServiceCatalog.__init__ = \ + Mock(side_effect=exceptions.AmbiguousEndpoints) + self.assertRaises(exceptions.AmbiguousEndpoints, + authObj._authenticate, Mock(), Mock()) + + # test handling KeyError and raising AuthorizationFailure exception + auth.ServiceCatalog.__init__ = Mock(side_effect=KeyError) + self.assertRaises(exceptions.AuthorizationFailure, + authObj._authenticate, Mock(), Mock()) + + # test EndpointNotFound exception + mock = Mock(side_effect=exceptions.EndpointNotFound) + auth.ServiceCatalog.__init__ = mock + self.assertRaises(exceptions.EndpointNotFound, + authObj._authenticate, Mock(), Mock()) + mock.side_effect = None + + # test response code 305 + resp.__getitem__ = Mock(return_value='loc') + resp.status = 305 + body = "test_body" + authObj.client._time_request = Mock(return_value=(resp, body)) + + l = authObj._authenticate(Mock(), Mock()) + self.assertEqual('loc', l) + + # test any response code other than 200 and 305 + resp.status = 404 + exceptions.from_response = Mock(side_effect=ValueError) + self.assertRaises(ValueError, authObj._authenticate, Mock(), Mock()) + + def test_authenticate(self): + authObj = auth.Authenticator(Mock(), auth.KeyStoneV2Authenticator, + Mock(), Mock(), Mock(), Mock()) + self.assertRaises(NotImplementedError, authObj.authenticate) + + +class KeyStoneV2AuthenticatorTest(TestCase): + + def test_authenticate(self): + # url is None + check_url_none(self, auth.KeyStoneV2Authenticator) + + # url is not None, so it must not throw exception + url = "test_url" + cls_type = auth.KeyStoneV2Authenticator + authObj = auth.KeyStoneV2Authenticator(url=url, type=cls_type, + client=None, username=None, + password=None, tenant=None) + + def side_effect_func(url): + return url + + mock = Mock() + mock.side_effect = side_effect_func + authObj._v2_auth = mock + r = authObj.authenticate() + self.assertEqual(url, r) + + def test__v2_auth(self): + username = "trove_user" + password = "trove_password" + tenant = "tenant" + cls_type = auth.KeyStoneV2Authenticator + authObj = auth.KeyStoneV2Authenticator(url=None, type=cls_type, + client=None, + username=username, + password=password, + tenant=tenant) + + def side_effect_func(url, body): + return body + mock = Mock() + mock.side_effect = side_effect_func + authObj._authenticate = mock + body = authObj._v2_auth(Mock()) + self.assertEqual(username, + body['auth']['passwordCredentials']['username']) + self.assertEqual(password, + body['auth']['passwordCredentials']['password']) + self.assertEqual(tenant, body['auth']['tenantName']) + + +class Auth1_1Test(TestCase): + + def test_authenticate(self): + # handle when url is None + check_url_none(self, auth.Auth1_1) + + # url is not none + username = "trove_user" + password = "trove_password" + url = "test_url" + authObj = auth.Auth1_1(url=url, + type=auth.Auth1_1, + client=None, username=username, + password=password, tenant=None) + + def side_effect_func(auth_url, body, root_key): + return auth_url, body, root_key + + mock = Mock() + mock.side_effect = side_effect_func + authObj._authenticate = mock + auth_url, body, root_key = authObj.authenticate() + + self.assertEqual(username, body['credentials']['username']) + self.assertEqual(password, body['credentials']['key']) + self.assertEqual(auth_url, url) + self.assertEqual('auth', root_key) + + +class RaxAuthenticatorTest(TestCase): + + def test_authenticate(self): + # url is None + check_url_none(self, auth.RaxAuthenticator) + + # url is not None, so it must not throw exception + url = "test_url" + authObj = auth.RaxAuthenticator(url=url, + type=auth.RaxAuthenticator, + client=None, username=None, + password=None, tenant=None) + + def side_effect_func(url): + return url + + mock = Mock() + mock.side_effect = side_effect_func + authObj._rax_auth = mock + r = authObj.authenticate() + self.assertEqual(url, r) + + def test__rax_auth(self): + username = "trove_user" + password = "trove_password" + tenant = "tenant" + authObj = auth.RaxAuthenticator(url=None, + type=auth.RaxAuthenticator, + client=None, username=username, + password=password, tenant=tenant) + + def side_effect_func(url, body): + return body + + mock = Mock() + mock.side_effect = side_effect_func + authObj._authenticate = mock + body = authObj._rax_auth(Mock()) + + v = body['auth']['RAX-KSKEY:apiKeyCredentials']['username'] + self.assertEqual(username, v) + + v = body['auth']['RAX-KSKEY:apiKeyCredentials']['apiKey'] + self.assertEqual(password, v) + + v = body['auth']['RAX-KSKEY:apiKeyCredentials']['tenantName'] + self.assertEqual(tenant, v) + + +class FakeAuthTest(TestCase): + + def test_authenticate(self): + tenant = "tenant" + authObj = auth.FakeAuth(url=None, + type=auth.FakeAuth, + client=None, username=None, + password=None, tenant=tenant) + + fc = authObj.authenticate() + public_url = "%s/%s" % ('http://localhost:8779/v1.0', tenant) + self.assertEqual(public_url, fc.get_public_url()) + self.assertEqual(tenant, fc.get_token()) + + +class ServiceCatalogTest(TestCase): + + def setUp(self): + super(ServiceCatalogTest, self).setUp() + self.orig_url_for = auth.ServiceCatalog._url_for + self.orig__init__ = auth.ServiceCatalog.__init__ + auth.ServiceCatalog.__init__ = Mock(return_value=None) + self.test_url = "http://localhost:1234/test" + + def tearDown(self): + super(ServiceCatalogTest, self).tearDown() + auth.ServiceCatalog._url_for = self.orig_url_for + auth.ServiceCatalog.__init__ = self.orig__init__ + + def test__load(self): + url = "random_url" + auth.ServiceCatalog._url_for = Mock(return_value=url) + + # when service_url is None + scObj = auth.ServiceCatalog() + scObj.region = None + scObj.service_url = None + scObj._load() + self.assertEqual(url, scObj.public_url) + self.assertEqual(url, scObj.management_url) + + # service url is not None + service_url = "service_url" + scObj = auth.ServiceCatalog() + scObj.region = None + scObj.service_url = service_url + scObj._load() + self.assertEqual(service_url, scObj.public_url) + self.assertEqual(service_url, scObj.management_url) + + def test_get_token(self): + test_id = "test_id" + scObj = auth.ServiceCatalog() + scObj.root_key = "root_key" + scObj.catalog = dict() + scObj.catalog[scObj.root_key] = dict() + scObj.catalog[scObj.root_key]['token'] = dict() + scObj.catalog[scObj.root_key]['token']['id'] = test_id + self.assertEqual(test_id, scObj.get_token()) + + def test_get_management_url(self): + test_mng_url = "test_management_url" + scObj = auth.ServiceCatalog() + scObj.management_url = test_mng_url + self.assertEqual(test_mng_url, scObj.get_management_url()) + + def test_get_public_url(self): + test_public_url = "test_public_url" + scObj = auth.ServiceCatalog() + scObj.public_url = test_public_url + self.assertEqual(test_public_url, scObj.get_public_url()) + + def test__url_for(self): + scObj = auth.ServiceCatalog() + + # case for no endpoint found + self.case_no_endpoint_match(scObj) + + # case for empty service catalog + self.case_endpoing_with_empty_catalog(scObj) + + # more than one matching endpoints + self.case_ambiguous_endpoint(scObj) + + # happy case + self.case_unique_endpoint(scObj) + + # testing if-statements in for-loop to iterate services in catalog + self.case_iterating_services_in_catalog(scObj) + + def case_no_endpoint_match(self, scObj): + # empty endpoint list + scObj.catalog = dict() + scObj.catalog['endpoints'] = list() + self.assertRaises(exceptions.EndpointNotFound, scObj._url_for) + + def side_effect_func_ep(attr): + return "test_attr_value" + + # simulating dict + endpoint = Mock() + mock = Mock() + mock.side_effect = side_effect_func_ep + endpoint.__getitem__ = mock + scObj.catalog['endpoints'].append(endpoint) + + # not-empty list but not matching endpoint + filter_value = "not_matching_value" + self.assertRaises(exceptions.EndpointNotFound, scObj._url_for, + attr="test_attr", filter_value=filter_value) + + filter_value = "test_attr_value" # so that we have an endpoint match + scObj.root_key = "access" + scObj.catalog[scObj.root_key] = dict() + self.assertRaises(exceptions.EndpointNotFound, scObj._url_for, + attr="test_attr", filter_value=filter_value) + + def case_endpoing_with_empty_catalog(self, scObj): + # first, test with empty catalog, this should pass since + # there is already enpoint added + scObj.catalog[scObj.root_key]['serviceCatalog'] = list() + + endpoint = scObj.catalog['endpoints'][0] + endpoint.get = Mock(return_value=self.test_url) + r_url = scObj._url_for(attr="test_attr", + filter_value="test_attr_value") + self.assertEqual(self.test_url, r_url) + + def case_ambiguous_endpoint(self, scObj): + scObj.service_type = "trove" + scObj.service_name = "test_service_name" + + def side_effect_func_service(key): + if key == "type": + return "trove" + elif key == "name": + return "test_service_name" + return None + + mock1 = Mock() + mock1.side_effect = side_effect_func_service + service1 = Mock() + service1.get = mock1 + + endpoint2 = {"test_attr": "test_attr_value"} + service1.__getitem__ = Mock(return_value=[endpoint2]) + scObj.catalog[scObj.root_key]['serviceCatalog'] = [service1] + self.assertRaises(exceptions.AmbiguousEndpoints, scObj._url_for, + attr="test_attr", filter_value="test_attr_value") + + def case_unique_endpoint(self, scObj): + # changing the endpoint2 attribute to pass the filter + service1 = scObj.catalog[scObj.root_key]['serviceCatalog'][0] + endpoint2 = service1[0][0] + endpoint2["test_attr"] = "new value not matching filter" + r_url = scObj._url_for(attr="test_attr", + filter_value="test_attr_value") + self.assertEqual(self.test_url, r_url) + + def case_iterating_services_in_catalog(self, scObj): + service1 = scObj.catalog[scObj.root_key]['serviceCatalog'][0] + + scObj.catalog = dict() + scObj.root_key = "access" + scObj.catalog[scObj.root_key] = dict() + scObj.service_type = "no_match" + + scObj.catalog[scObj.root_key]['serviceCatalog'] = [service1] + self.assertRaises(exceptions.EndpointNotFound, scObj._url_for) + + scObj.service_type = "database" + scObj.service_name = "no_match" + self.assertRaises(exceptions.EndpointNotFound, scObj._url_for) + + # no endpoints and no 'serviceCatalog' in catalog => raise exception + scObj = auth.ServiceCatalog() + scObj.catalog = dict() + scObj.root_key = "access" + scObj.catalog[scObj.root_key] = dict() + scObj.catalog[scObj.root_key]['serviceCatalog'] = [] + self.assertRaises(exceptions.EndpointNotFound, scObj._url_for, + attr="test_attr", filter_value="test_attr_value") |
