diff options
| author | Jamie Lennox <jamielennox@redhat.com> | 2015-02-11 19:03:25 +1100 |
|---|---|---|
| committer | Jamie Lennox <jamielennox@redhat.com> | 2015-02-11 19:03:25 +1100 |
| commit | 6bd93179a2966f2b5c67e297628510ac73689fb3 (patch) | |
| tree | faf3a93a16fb49b4a742f74b6fcdd20f8a0ebd0e /keystoneclient/tests/unit/auth | |
| parent | 58ac2de5d4a6b58e8bd5d430a04199a4d40427a8 (diff) | |
| download | python-keystoneclient-6bd93179a2966f2b5c67e297628510ac73689fb3.tar.gz | |
Move tests to the unit subdirectory
Move all the existing tests to the unit/ subdirectory. This gives us
some room to add a functional/ directory later with other tests.
Change-Id: I0fb8d5b628eb8ee1f35f05f42d0c0ac9f285e8c3
Implements: functional-testing
Diffstat (limited to 'keystoneclient/tests/unit/auth')
| -rw-r--r-- | keystoneclient/tests/unit/auth/__init__.py | 0 | ||||
| -rw-r--r-- | keystoneclient/tests/unit/auth/test_access.py | 61 | ||||
| -rw-r--r-- | keystoneclient/tests/unit/auth/test_cli.py | 196 | ||||
| -rw-r--r-- | keystoneclient/tests/unit/auth/test_conf.py | 177 | ||||
| -rw-r--r-- | keystoneclient/tests/unit/auth/test_identity_common.py | 422 | ||||
| -rw-r--r-- | keystoneclient/tests/unit/auth/test_identity_v2.py | 295 | ||||
| -rw-r--r-- | keystoneclient/tests/unit/auth/test_identity_v3.py | 490 | ||||
| -rw-r--r-- | keystoneclient/tests/unit/auth/test_password.py | 63 | ||||
| -rw-r--r-- | keystoneclient/tests/unit/auth/test_token.py | 47 | ||||
| -rw-r--r-- | keystoneclient/tests/unit/auth/test_token_endpoint.py | 63 | ||||
| -rw-r--r-- | keystoneclient/tests/unit/auth/utils.py | 200 |
11 files changed, 2014 insertions, 0 deletions
diff --git a/keystoneclient/tests/unit/auth/__init__.py b/keystoneclient/tests/unit/auth/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/keystoneclient/tests/unit/auth/__init__.py diff --git a/keystoneclient/tests/unit/auth/test_access.py b/keystoneclient/tests/unit/auth/test_access.py new file mode 100644 index 0000000..405fb8b --- /dev/null +++ b/keystoneclient/tests/unit/auth/test_access.py @@ -0,0 +1,61 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import uuid + +from keystoneclient import access +from keystoneclient import auth +from keystoneclient.auth.identity import access as access_plugin +from keystoneclient import fixture +from keystoneclient import session +from keystoneclient.tests.unit import utils + + +class AccessInfoPluginTests(utils.TestCase): + + def setUp(self): + super(AccessInfoPluginTests, self).setUp() + self.session = session.Session() + self.auth_token = uuid.uuid4().hex + + def _plugin(self, **kwargs): + token = fixture.V3Token() + s = token.add_service('identity') + s.add_standard_endpoints(public=self.TEST_ROOT_URL) + + auth_ref = access.AccessInfo.factory(body=token, + auth_token=self.auth_token) + return access_plugin.AccessInfoPlugin(auth_ref, **kwargs) + + def test_auth_ref(self): + plugin = self._plugin() + self.assertEqual(self.TEST_ROOT_URL, + plugin.get_endpoint(self.session, + service_type='identity', + interface='public')) + self.assertEqual(self.auth_token, plugin.get_token(session)) + + def test_auth_url(self): + auth_url = 'http://keystone.test.url' + plugin = self._plugin(auth_url=auth_url) + + self.assertEqual(auth_url, + plugin.get_endpoint(self.session, + interface=auth.AUTH_INTERFACE)) + + def test_invalidate(self): + plugin = self._plugin() + auth_ref = plugin.auth_ref + + self.assertIsInstance(auth_ref, access.AccessInfo) + self.assertFalse(plugin.invalidate()) + self.assertIs(auth_ref, plugin.auth_ref) diff --git a/keystoneclient/tests/unit/auth/test_cli.py b/keystoneclient/tests/unit/auth/test_cli.py new file mode 100644 index 0000000..d65de73 --- /dev/null +++ b/keystoneclient/tests/unit/auth/test_cli.py @@ -0,0 +1,196 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import argparse +import uuid + +import fixtures +import mock +from oslo_config import cfg + +from keystoneclient.auth import base +from keystoneclient.auth import cli +from keystoneclient.tests.unit.auth import utils + + +class TesterPlugin(base.BaseAuthPlugin): + + def get_token(self, *args, **kwargs): + return None + + @classmethod + def get_options(cls): + # NOTE(jamielennox): this is kind of horrible. If you specify this as + # a deprecated_name= value it will convert - to _ which is not what we + # want for a CLI option. + deprecated = [cfg.DeprecatedOpt('test-other')] + return [ + cfg.StrOpt('test-opt', help='tester', deprecated_opts=deprecated) + ] + + +class CliTests(utils.TestCase): + + def setUp(self): + super(CliTests, self).setUp() + self.p = argparse.ArgumentParser() + + def env(self, name, value=None): + if value is not None: + # environment variables are always strings + value = str(value) + + return self.useFixture(fixtures.EnvironmentVariable(name, value)) + + def test_creating_with_no_args(self): + ret = cli.register_argparse_arguments(self.p, []) + self.assertIsNone(ret) + self.assertIn('--os-auth-plugin', self.p.format_usage()) + + def test_load_with_nothing(self): + cli.register_argparse_arguments(self.p, []) + opts = self.p.parse_args([]) + self.assertIsNone(cli.load_from_argparse_arguments(opts)) + + @utils.mock_plugin + def test_basic_params_added(self, m): + name = uuid.uuid4().hex + argv = ['--os-auth-plugin', name] + ret = cli.register_argparse_arguments(self.p, argv) + self.assertIs(utils.MockPlugin, ret) + + for n in ('--os-a-int', '--os-a-bool', '--os-a-float'): + self.assertIn(n, self.p.format_usage()) + + m.assert_called_once_with(name) + + @utils.mock_plugin + def test_param_loading(self, m): + name = uuid.uuid4().hex + argv = ['--os-auth-plugin', name, + '--os-a-int', str(self.a_int), + '--os-a-float', str(self.a_float), + '--os-a-bool', str(self.a_bool)] + + klass = cli.register_argparse_arguments(self.p, argv) + self.assertIs(utils.MockPlugin, klass) + + opts = self.p.parse_args(argv) + self.assertEqual(name, opts.os_auth_plugin) + + a = cli.load_from_argparse_arguments(opts) + self.assertTestVals(a) + + self.assertEqual(name, opts.os_auth_plugin) + self.assertEqual(str(self.a_int), opts.os_a_int) + self.assertEqual(str(self.a_float), opts.os_a_float) + self.assertEqual(str(self.a_bool), opts.os_a_bool) + + @utils.mock_plugin + def test_default_options(self, m): + name = uuid.uuid4().hex + argv = ['--os-auth-plugin', name, + '--os-a-float', str(self.a_float)] + + klass = cli.register_argparse_arguments(self.p, argv) + self.assertIs(utils.MockPlugin, klass) + + opts = self.p.parse_args(argv) + self.assertEqual(name, opts.os_auth_plugin) + + a = cli.load_from_argparse_arguments(opts) + + self.assertEqual(self.a_float, a['a_float']) + self.assertEqual(3, a['a_int']) + + @utils.mock_plugin + def test_with_default_string_value(self, m): + name = uuid.uuid4().hex + klass = cli.register_argparse_arguments(self.p, [], default=name) + self.assertIs(utils.MockPlugin, klass) + m.assert_called_once_with(name) + + @utils.mock_plugin + def test_overrides_default_string_value(self, m): + name = uuid.uuid4().hex + default = uuid.uuid4().hex + argv = ['--os-auth-plugin', name] + klass = cli.register_argparse_arguments(self.p, argv, default=default) + self.assertIs(utils.MockPlugin, klass) + m.assert_called_once_with(name) + + @utils.mock_plugin + def test_with_default_type_value(self, m): + klass = cli.register_argparse_arguments(self.p, [], + default=utils.MockPlugin) + self.assertIs(utils.MockPlugin, klass) + self.assertEqual(0, m.call_count) + + @utils.mock_plugin + def test_overrides_default_type_value(self, m): + # using this test plugin would fail if called because there + # is no get_options() function + class TestPlugin(object): + pass + name = uuid.uuid4().hex + argv = ['--os-auth-plugin', name] + klass = cli.register_argparse_arguments(self.p, argv, + default=TestPlugin) + self.assertIs(utils.MockPlugin, klass) + m.assert_called_once_with(name) + + @utils.mock_plugin + def test_env_overrides_default_opt(self, m): + name = uuid.uuid4().hex + val = uuid.uuid4().hex + self.env('OS_A_STR', val) + + klass = cli.register_argparse_arguments(self.p, [], default=name) + opts = self.p.parse_args([]) + a = klass.load_from_argparse_arguments(opts) + + self.assertEqual(val, a['a_str']) + + def test_deprecated_cli_options(self): + TesterPlugin.register_argparse_arguments(self.p) + val = uuid.uuid4().hex + opts = self.p.parse_args(['--os-test-other', val]) + self.assertEqual(val, opts.os_test_opt) + + def test_deprecated_multi_cli_options(self): + TesterPlugin.register_argparse_arguments(self.p) + val1 = uuid.uuid4().hex + val2 = uuid.uuid4().hex + # argarse rules say that the last specified wins. + opts = self.p.parse_args(['--os-test-other', val2, + '--os-test-opt', val1]) + self.assertEqual(val1, opts.os_test_opt) + + def test_deprecated_env_options(self): + val = uuid.uuid4().hex + + with mock.patch.dict('os.environ', {'OS_TEST_OTHER': val}): + TesterPlugin.register_argparse_arguments(self.p) + + opts = self.p.parse_args([]) + self.assertEqual(val, opts.os_test_opt) + + def test_deprecated_env_multi_options(self): + val1 = uuid.uuid4().hex + val2 = uuid.uuid4().hex + + with mock.patch.dict('os.environ', {'OS_TEST_OPT': val1, + 'OS_TEST_OTHER': val2}): + TesterPlugin.register_argparse_arguments(self.p) + + opts = self.p.parse_args([]) + self.assertEqual(val1, opts.os_test_opt) diff --git a/keystoneclient/tests/unit/auth/test_conf.py b/keystoneclient/tests/unit/auth/test_conf.py new file mode 100644 index 0000000..c3ce8eb --- /dev/null +++ b/keystoneclient/tests/unit/auth/test_conf.py @@ -0,0 +1,177 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import uuid + +import mock +from oslo_config import cfg +from oslo_config import fixture as config +import stevedore + +from keystoneclient.auth import base +from keystoneclient.auth import conf +from keystoneclient.auth.identity import v2 as v2_auth +from keystoneclient.auth.identity import v3 as v3_auth +from keystoneclient import exceptions +from keystoneclient.tests.unit.auth import utils + + +class ConfTests(utils.TestCase): + + def setUp(self): + super(ConfTests, self).setUp() + self.conf_fixture = self.useFixture(config.Config()) + + # NOTE(jamielennox): we register the basic config options first because + # we need them in place before we can stub them. We will need to run + # the register again after we stub the auth section and auth plugin so + # it can load the plugin specific options. + conf.register_conf_options(self.conf_fixture.conf, group=self.GROUP) + + def test_loading_v2(self): + section = uuid.uuid4().hex + username = uuid.uuid4().hex + password = uuid.uuid4().hex + trust_id = uuid.uuid4().hex + tenant_id = uuid.uuid4().hex + + self.conf_fixture.config(auth_section=section, group=self.GROUP) + conf.register_conf_options(self.conf_fixture.conf, group=self.GROUP) + + self.conf_fixture.register_opts(v2_auth.Password.get_options(), + group=section) + + self.conf_fixture.config(auth_plugin=self.V2PASS, + username=username, + password=password, + trust_id=trust_id, + tenant_id=tenant_id, + group=section) + + a = conf.load_from_conf_options(self.conf_fixture.conf, self.GROUP) + + self.assertEqual(username, a.username) + self.assertEqual(password, a.password) + self.assertEqual(trust_id, a.trust_id) + self.assertEqual(tenant_id, a.tenant_id) + + def test_loading_v3(self): + section = uuid.uuid4().hex + token = uuid.uuid4().hex + trust_id = uuid.uuid4().hex + project_id = uuid.uuid4().hex + project_domain_name = uuid.uuid4().hex + + self.conf_fixture.config(auth_section=section, group=self.GROUP) + conf.register_conf_options(self.conf_fixture.conf, group=self.GROUP) + + self.conf_fixture.register_opts(v3_auth.Token.get_options(), + group=section) + + self.conf_fixture.config(auth_plugin=self.V3TOKEN, + token=token, + trust_id=trust_id, + project_id=project_id, + project_domain_name=project_domain_name, + group=section) + + a = conf.load_from_conf_options(self.conf_fixture.conf, self.GROUP) + + self.assertEqual(token, a.auth_methods[0].token) + self.assertEqual(trust_id, a.trust_id) + self.assertEqual(project_id, a.project_id) + self.assertEqual(project_domain_name, a.project_domain_name) + + def test_loading_invalid_plugin(self): + auth_plugin = uuid.uuid4().hex + self.conf_fixture.config(auth_plugin=auth_plugin, + group=self.GROUP) + + e = self.assertRaises(exceptions.NoMatchingPlugin, + conf.load_from_conf_options, + self.conf_fixture.conf, + self.GROUP) + + self.assertEqual(auth_plugin, e.name) + + def test_loading_with_no_data(self): + self.assertIsNone(conf.load_from_conf_options(self.conf_fixture.conf, + self.GROUP)) + + @mock.patch('stevedore.DriverManager') + def test_other_params(self, m): + m.return_value = utils.MockManager(utils.MockPlugin) + driver_name = uuid.uuid4().hex + + self.conf_fixture.register_opts(utils.MockPlugin.get_options(), + group=self.GROUP) + self.conf_fixture.config(auth_plugin=driver_name, + group=self.GROUP, + **self.TEST_VALS) + + a = conf.load_from_conf_options(self.conf_fixture.conf, self.GROUP) + self.assertTestVals(a) + + m.assert_called_once_with(namespace=base.PLUGIN_NAMESPACE, + name=driver_name, + invoke_on_load=False) + + @utils.mock_plugin + def test_same_section(self, m): + self.conf_fixture.register_opts(utils.MockPlugin.get_options(), + group=self.GROUP) + conf.register_conf_options(self.conf_fixture.conf, group=self.GROUP) + self.conf_fixture.config(auth_plugin=uuid.uuid4().hex, + group=self.GROUP, + **self.TEST_VALS) + + a = conf.load_from_conf_options(self.conf_fixture.conf, self.GROUP) + self.assertTestVals(a) + + @utils.mock_plugin + def test_diff_section(self, m): + section = uuid.uuid4().hex + + self.conf_fixture.config(auth_section=section, group=self.GROUP) + conf.register_conf_options(self.conf_fixture.conf, group=self.GROUP) + + self.conf_fixture.register_opts(utils.MockPlugin.get_options(), + group=section) + self.conf_fixture.config(group=section, + auth_plugin=uuid.uuid4().hex, + **self.TEST_VALS) + + a = conf.load_from_conf_options(self.conf_fixture.conf, self.GROUP) + self.assertTestVals(a) + + def test_plugins_are_all_opts(self): + manager = stevedore.ExtensionManager(base.PLUGIN_NAMESPACE, + invoke_on_load=False, + propagate_map_exceptions=True) + + def inner(driver): + for p in driver.plugin.get_options(): + self.assertIsInstance(p, cfg.Opt) + + manager.map(inner) + + def test_get_common(self): + opts = conf.get_common_conf_options() + for opt in opts: + self.assertIsInstance(opt, cfg.Opt) + self.assertEqual(2, len(opts)) + + def test_get_named(self): + loaded_opts = conf.get_plugin_options('v2password') + plugin_opts = v2_auth.Password.get_options() + + self.assertEqual(plugin_opts, loaded_opts) diff --git a/keystoneclient/tests/unit/auth/test_identity_common.py b/keystoneclient/tests/unit/auth/test_identity_common.py new file mode 100644 index 0000000..db30bea --- /dev/null +++ b/keystoneclient/tests/unit/auth/test_identity_common.py @@ -0,0 +1,422 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import abc +import datetime +import uuid + +from oslo_utils import timeutils +import six + +from keystoneclient import access +from keystoneclient.auth import base +from keystoneclient.auth import identity +from keystoneclient import fixture +from keystoneclient import session +from keystoneclient.tests.unit import utils + + +@six.add_metaclass(abc.ABCMeta) +class CommonIdentityTests(object): + + TEST_ROOT_URL = 'http://127.0.0.1:5000/' + TEST_ROOT_ADMIN_URL = 'http://127.0.0.1:35357/' + + TEST_COMPUTE_PUBLIC = 'http://nova/novapi/public' + TEST_COMPUTE_INTERNAL = 'http://nova/novapi/internal' + TEST_COMPUTE_ADMIN = 'http://nova/novapi/admin' + + TEST_PASS = uuid.uuid4().hex + + def setUp(self): + super(CommonIdentityTests, self).setUp() + + self.TEST_URL = '%s%s' % (self.TEST_ROOT_URL, self.version) + self.TEST_ADMIN_URL = '%s%s' % (self.TEST_ROOT_ADMIN_URL, self.version) + self.TEST_DISCOVERY = fixture.DiscoveryList(href=self.TEST_ROOT_URL) + + self.stub_auth_data() + + @abc.abstractmethod + def create_auth_plugin(self, **kwargs): + """Create an auth plugin that makes sense for the auth data. + + It doesn't really matter what auth mechanism is used but it should be + appropriate to the API version. + """ + + @abc.abstractmethod + def get_auth_data(self, **kwargs): + """Return fake authentication data. + + This should register a valid token response and ensure that the compute + endpoints are set to TEST_COMPUTE_PUBLIC, _INTERNAL and _ADMIN. + """ + + def stub_auth_data(self, **kwargs): + token = self.get_auth_data(**kwargs) + self.user_id = token.user_id + + try: + self.project_id = token.project_id + except AttributeError: + self.project_id = token.tenant_id + + self.stub_auth(json=token) + + @abc.abstractproperty + def version(self): + """The API version being tested.""" + + def test_discovering(self): + self.stub_url('GET', [], + base_url=self.TEST_COMPUTE_ADMIN, + json=self.TEST_DISCOVERY) + + body = 'SUCCESS' + + # which gives our sample values + self.stub_url('GET', ['path'], text=body) + + a = self.create_auth_plugin() + s = session.Session(auth=a) + + resp = s.get('/path', endpoint_filter={'service_type': 'compute', + 'interface': 'admin', + 'version': self.version}) + + self.assertEqual(200, resp.status_code) + self.assertEqual(body, resp.text) + + new_body = 'SC SUCCESS' + # if we don't specify a version, we use the URL from the SC + self.stub_url('GET', ['path'], + base_url=self.TEST_COMPUTE_ADMIN, + text=new_body) + + resp = s.get('/path', endpoint_filter={'service_type': 'compute', + 'interface': 'admin'}) + + self.assertEqual(200, resp.status_code) + self.assertEqual(new_body, resp.text) + + def test_discovery_uses_session_cache(self): + # register responses such that if the discovery URL is hit more than + # once then the response will be invalid and not point to COMPUTE_ADMIN + resps = [{'json': self.TEST_DISCOVERY}, {'status_code': 500}] + self.requests.get(self.TEST_COMPUTE_ADMIN, resps) + + body = 'SUCCESS' + self.stub_url('GET', ['path'], text=body) + + # now either of the two plugins I use, it should not cause a second + # request to the discovery url. + s = session.Session() + a = self.create_auth_plugin() + b = self.create_auth_plugin() + + for auth in (a, b): + resp = s.get('/path', + auth=auth, + endpoint_filter={'service_type': 'compute', + 'interface': 'admin', + 'version': self.version}) + + self.assertEqual(200, resp.status_code) + self.assertEqual(body, resp.text) + + def test_discovery_uses_plugin_cache(self): + # register responses such that if the discovery URL is hit more than + # once then the response will be invalid and not point to COMPUTE_ADMIN + resps = [{'json': self.TEST_DISCOVERY}, {'status_code': 500}] + self.requests.get(self.TEST_COMPUTE_ADMIN, resps) + + body = 'SUCCESS' + self.stub_url('GET', ['path'], text=body) + + # now either of the two sessions I use, it should not cause a second + # request to the discovery url. + sa = session.Session() + sb = session.Session() + auth = self.create_auth_plugin() + + for sess in (sa, sb): + resp = sess.get('/path', + auth=auth, + endpoint_filter={'service_type': 'compute', + 'interface': 'admin', + 'version': self.version}) + + self.assertEqual(200, resp.status_code) + self.assertEqual(body, resp.text) + + def test_discovering_with_no_data(self): + # which returns discovery information pointing to TEST_URL but there is + # no data there. + self.stub_url('GET', [], + base_url=self.TEST_COMPUTE_ADMIN, + status_code=400) + + # so the url that will be used is the same TEST_COMPUTE_ADMIN + body = 'SUCCESS' + self.stub_url('GET', ['path'], base_url=self.TEST_COMPUTE_ADMIN, + text=body, status_code=200) + + a = self.create_auth_plugin() + s = session.Session(auth=a) + + resp = s.get('/path', endpoint_filter={'service_type': 'compute', + 'interface': 'admin', + 'version': self.version}) + + self.assertEqual(200, resp.status_code) + self.assertEqual(body, resp.text) + + def test_asking_for_auth_endpoint_ignores_checks(self): + a = self.create_auth_plugin() + s = session.Session(auth=a) + + auth_url = s.get_endpoint(service_type='compute', + interface=base.AUTH_INTERFACE) + + self.assertEqual(self.TEST_URL, auth_url) + + def _create_expired_auth_plugin(self, **kwargs): + expires = timeutils.utcnow() - datetime.timedelta(minutes=20) + expired_token = self.get_auth_data(expires=expires) + expired_auth_ref = access.AccessInfo.factory(body=expired_token) + + body = 'SUCCESS' + self.stub_url('GET', ['path'], + base_url=self.TEST_COMPUTE_ADMIN, text=body) + + a = self.create_auth_plugin(**kwargs) + a.auth_ref = expired_auth_ref + return a + + def test_reauthenticate(self): + a = self._create_expired_auth_plugin() + expired_auth_ref = a.auth_ref + s = session.Session(auth=a) + self.assertIsNot(expired_auth_ref, a.get_access(s)) + + def test_no_reauthenticate(self): + a = self._create_expired_auth_plugin(reauthenticate=False) + expired_auth_ref = a.auth_ref + s = session.Session(auth=a) + self.assertIs(expired_auth_ref, a.get_access(s)) + + def test_invalidate(self): + a = self.create_auth_plugin() + s = session.Session(auth=a) + + # trigger token fetching + s.get_auth_headers() + + self.assertTrue(a.auth_ref) + self.assertTrue(a.invalidate()) + self.assertIsNone(a.auth_ref) + self.assertFalse(a.invalidate()) + + def test_get_auth_properties(self): + a = self.create_auth_plugin() + s = session.Session() + + self.assertEqual(self.user_id, a.get_user_id(s)) + self.assertEqual(self.project_id, a.get_project_id(s)) + + +class V3(CommonIdentityTests, utils.TestCase): + + @property + def version(self): + return 'v3' + + def get_auth_data(self, **kwargs): + token = fixture.V3Token(**kwargs) + region = 'RegionOne' + + svc = token.add_service('identity') + svc.add_standard_endpoints(admin=self.TEST_ADMIN_URL, region=region) + + svc = token.add_service('compute') + svc.add_standard_endpoints(admin=self.TEST_COMPUTE_ADMIN, + public=self.TEST_COMPUTE_PUBLIC, + internal=self.TEST_COMPUTE_INTERNAL, + region=region) + + return token + + def stub_auth(self, subject_token=None, **kwargs): + if not subject_token: + subject_token = self.TEST_TOKEN + + kwargs.setdefault('headers', {})['X-Subject-Token'] = subject_token + self.stub_url('POST', ['auth', 'tokens'], **kwargs) + + def create_auth_plugin(self, **kwargs): + kwargs.setdefault('auth_url', self.TEST_URL) + kwargs.setdefault('username', self.TEST_USER) + kwargs.setdefault('password', self.TEST_PASS) + return identity.V3Password(**kwargs) + + +class V2(CommonIdentityTests, utils.TestCase): + + @property + def version(self): + return 'v2.0' + + def create_auth_plugin(self, **kwargs): + kwargs.setdefault('auth_url', self.TEST_URL) + kwargs.setdefault('username', self.TEST_USER) + kwargs.setdefault('password', self.TEST_PASS) + return identity.V2Password(**kwargs) + + def get_auth_data(self, **kwargs): + token = fixture.V2Token(**kwargs) + region = 'RegionOne' + + svc = token.add_service('identity') + svc.add_endpoint(self.TEST_ADMIN_URL, region=region) + + svc = token.add_service('compute') + svc.add_endpoint(public=self.TEST_COMPUTE_PUBLIC, + internal=self.TEST_COMPUTE_INTERNAL, + admin=self.TEST_COMPUTE_ADMIN, + region=region) + + return token + + def stub_auth(self, **kwargs): + self.stub_url('POST', ['tokens'], **kwargs) + + +class CatalogHackTests(utils.TestCase): + + TEST_URL = 'http://keystone.server:5000/v2.0' + OTHER_URL = 'http://other.server:5000/path' + + IDENTITY = 'identity' + + BASE_URL = 'http://keystone.server:5000/' + V2_URL = BASE_URL + 'v2.0' + V3_URL = BASE_URL + 'v3' + + def test_getting_endpoints(self): + disc = fixture.DiscoveryList(href=self.BASE_URL) + self.stub_url('GET', + ['/'], + base_url=self.BASE_URL, + json=disc) + + token = fixture.V2Token() + service = token.add_service(self.IDENTITY) + service.add_endpoint(public=self.V2_URL, + admin=self.V2_URL, + internal=self.V2_URL) + + self.stub_url('POST', + ['tokens'], + base_url=self.V2_URL, + json=token) + + v2_auth = identity.V2Password(self.V2_URL, + username=uuid.uuid4().hex, + password=uuid.uuid4().hex) + + sess = session.Session(auth=v2_auth) + + endpoint = sess.get_endpoint(service_type=self.IDENTITY, + interface='public', + version=(3, 0)) + + self.assertEqual(self.V3_URL, endpoint) + + def test_returns_original_when_discover_fails(self): + token = fixture.V2Token() + service = token.add_service(self.IDENTITY) + service.add_endpoint(public=self.V2_URL, + admin=self.V2_URL, + internal=self.V2_URL) + + self.stub_url('POST', + ['tokens'], + base_url=self.V2_URL, + json=token) + + self.stub_url('GET', [], base_url=self.BASE_URL, status_code=404) + + v2_auth = identity.V2Password(self.V2_URL, + username=uuid.uuid4().hex, + password=uuid.uuid4().hex) + + sess = session.Session(auth=v2_auth) + + endpoint = sess.get_endpoint(service_type=self.IDENTITY, + interface='public', + version=(3, 0)) + + self.assertEqual(self.V2_URL, endpoint) + + +class GenericPlugin(base.BaseAuthPlugin): + + BAD_TOKEN = uuid.uuid4().hex + + def __init__(self): + super(GenericPlugin, self).__init__() + + self.endpoint = 'http://keystone.host:5000' + + self.headers = {'headerA': 'valueA', + 'headerB': 'valueB'} + + def url(self, prefix): + return '%s/%s' % (self.endpoint, prefix) + + def get_token(self, session, **kwargs): + # NOTE(jamielennox): by specifying get_headers this should not be used + return self.BAD_TOKEN + + def get_headers(self, session, **kwargs): + return self.headers + + def get_endpoint(self, session, **kwargs): + return self.endpoint + + +class GenericAuthPluginTests(utils.TestCase): + + # filter doesn't matter to GenericPlugin, but we have to specify one + ENDPOINT_FILTER = {uuid.uuid4().hex: uuid.uuid4().hex} + + def setUp(self): + super(GenericAuthPluginTests, self).setUp() + self.auth = GenericPlugin() + self.session = session.Session(auth=self.auth) + + def test_setting_headers(self): + text = uuid.uuid4().hex + self.stub_url('GET', base_url=self.auth.url('prefix'), text=text) + + resp = self.session.get('prefix', endpoint_filter=self.ENDPOINT_FILTER) + + self.assertEqual(text, resp.text) + + for k, v in six.iteritems(self.auth.headers): + self.assertRequestHeaderEqual(k, v) + + self.assertIsNone(self.session.get_token()) + self.assertEqual(self.auth.headers, + self.session.get_auth_headers()) + self.assertNotIn('X-Auth-Token', self.requests.last_request.headers) diff --git a/keystoneclient/tests/unit/auth/test_identity_v2.py b/keystoneclient/tests/unit/auth/test_identity_v2.py new file mode 100644 index 0000000..6d432a7 --- /dev/null +++ b/keystoneclient/tests/unit/auth/test_identity_v2.py @@ -0,0 +1,295 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import copy +import uuid + +from keystoneclient.auth.identity import v2 +from keystoneclient import exceptions +from keystoneclient import session +from keystoneclient.tests.unit import utils + + +class V2IdentityPlugin(utils.TestCase): + + TEST_ROOT_URL = 'http://127.0.0.1:5000/' + TEST_URL = '%s%s' % (TEST_ROOT_URL, 'v2.0') + TEST_ROOT_ADMIN_URL = 'http://127.0.0.1:35357/' + TEST_ADMIN_URL = '%s%s' % (TEST_ROOT_ADMIN_URL, 'v2.0') + + TEST_PASS = 'password' + + TEST_SERVICE_CATALOG = [{ + "endpoints": [{ + "adminURL": "http://cdn.admin-nets.local:8774/v1.0", + "region": "RegionOne", + "internalURL": "http://127.0.0.1:8774/v1.0", + "publicURL": "http://cdn.admin-nets.local:8774/v1.0/" + }], + "type": "nova_compat", + "name": "nova_compat" + }, { + "endpoints": [{ + "adminURL": "http://nova/novapi/admin", + "region": "RegionOne", + "internalURL": "http://nova/novapi/internal", + "publicURL": "http://nova/novapi/public" + }], + "type": "compute", + "name": "nova" + }, { + "endpoints": [{ + "adminURL": "http://glance/glanceapi/admin", + "region": "RegionOne", + "internalURL": "http://glance/glanceapi/internal", + "publicURL": "http://glance/glanceapi/public" + }], + "type": "image", + "name": "glance" + }, { + "endpoints": [{ + "adminURL": TEST_ADMIN_URL, + "region": "RegionOne", + "internalURL": "http://127.0.0.1:5000/v2.0", + "publicURL": "http://127.0.0.1:5000/v2.0" + }], + "type": "identity", + "name": "keystone" + }, { + "endpoints": [{ + "adminURL": "http://swift/swiftapi/admin", + "region": "RegionOne", + "internalURL": "http://swift/swiftapi/internal", + "publicURL": "http://swift/swiftapi/public" + }], + "type": "object-store", + "name": "swift" + }] + + def setUp(self): + super(V2IdentityPlugin, self).setUp() + self.TEST_RESPONSE_DICT = { + "access": { + "token": { + "expires": "2020-01-01T00:00:10.000123Z", + "id": self.TEST_TOKEN, + "tenant": { + "id": self.TEST_TENANT_ID + }, + }, + "user": { + "id": self.TEST_USER + }, + "serviceCatalog": self.TEST_SERVICE_CATALOG, + }, + } + + def stub_auth(self, **kwargs): + self.stub_url('POST', ['tokens'], **kwargs) + + def test_authenticate_with_username_password(self): + self.stub_auth(json=self.TEST_RESPONSE_DICT) + a = v2.Password(self.TEST_URL, username=self.TEST_USER, + password=self.TEST_PASS) + self.assertIsNone(a.user_id) + s = session.Session(a) + self.assertEqual({'X-Auth-Token': self.TEST_TOKEN}, + s.get_auth_headers()) + + req = {'auth': {'passwordCredentials': {'username': self.TEST_USER, + 'password': self.TEST_PASS}}} + self.assertRequestBodyIs(json=req) + self.assertRequestHeaderEqual('Content-Type', 'application/json') + self.assertRequestHeaderEqual('Accept', 'application/json') + self.assertEqual(s.auth.auth_ref.auth_token, self.TEST_TOKEN) + + def test_authenticate_with_user_id_password(self): + self.stub_auth(json=self.TEST_RESPONSE_DICT) + a = v2.Password(self.TEST_URL, user_id=self.TEST_USER, + password=self.TEST_PASS) + self.assertIsNone(a.username) + s = session.Session(a) + self.assertEqual({'X-Auth-Token': self.TEST_TOKEN}, + s.get_auth_headers()) + + req = {'auth': {'passwordCredentials': {'userId': self.TEST_USER, + 'password': self.TEST_PASS}}} + self.assertRequestBodyIs(json=req) + self.assertRequestHeaderEqual('Content-Type', 'application/json') + self.assertRequestHeaderEqual('Accept', 'application/json') + self.assertEqual(s.auth.auth_ref.auth_token, self.TEST_TOKEN) + + def test_authenticate_with_username_password_scoped(self): + self.stub_auth(json=self.TEST_RESPONSE_DICT) + a = v2.Password(self.TEST_URL, username=self.TEST_USER, + password=self.TEST_PASS, tenant_id=self.TEST_TENANT_ID) + self.assertIsNone(a.user_id) + s = session.Session(a) + self.assertEqual({'X-Auth-Token': self.TEST_TOKEN}, + s.get_auth_headers()) + + req = {'auth': {'passwordCredentials': {'username': self.TEST_USER, + 'password': self.TEST_PASS}, + 'tenantId': self.TEST_TENANT_ID}} + self.assertRequestBodyIs(json=req) + self.assertEqual(s.auth.auth_ref.auth_token, self.TEST_TOKEN) + + def test_authenticate_with_user_id_password_scoped(self): + self.stub_auth(json=self.TEST_RESPONSE_DICT) + a = v2.Password(self.TEST_URL, user_id=self.TEST_USER, + password=self.TEST_PASS, tenant_id=self.TEST_TENANT_ID) + self.assertIsNone(a.username) + s = session.Session(a) + self.assertEqual({'X-Auth-Token': self.TEST_TOKEN}, + s.get_auth_headers()) + + req = {'auth': {'passwordCredentials': {'userId': self.TEST_USER, + 'password': self.TEST_PASS}, + 'tenantId': self.TEST_TENANT_ID}} + self.assertRequestBodyIs(json=req) + self.assertEqual(s.auth.auth_ref.auth_token, self.TEST_TOKEN) + + def test_authenticate_with_token(self): + self.stub_auth(json=self.TEST_RESPONSE_DICT) + a = v2.Token(self.TEST_URL, 'foo') + s = session.Session(a) + self.assertEqual({'X-Auth-Token': self.TEST_TOKEN}, + s.get_auth_headers()) + + req = {'auth': {'token': {'id': 'foo'}}} + self.assertRequestBodyIs(json=req) + self.assertRequestHeaderEqual('x-Auth-Token', 'foo') + self.assertRequestHeaderEqual('Content-Type', 'application/json') + self.assertRequestHeaderEqual('Accept', 'application/json') + self.assertEqual(s.auth.auth_ref.auth_token, self.TEST_TOKEN) + + def test_with_trust_id(self): + self.stub_auth(json=self.TEST_RESPONSE_DICT) + a = v2.Password(self.TEST_URL, username=self.TEST_USER, + password=self.TEST_PASS, trust_id='trust') + s = session.Session(a) + self.assertEqual({'X-Auth-Token': self.TEST_TOKEN}, + s.get_auth_headers()) + + req = {'auth': {'passwordCredentials': {'username': self.TEST_USER, + 'password': self.TEST_PASS}, + 'trust_id': 'trust'}} + + self.assertRequestBodyIs(json=req) + self.assertEqual(s.auth.auth_ref.auth_token, self.TEST_TOKEN) + + def _do_service_url_test(self, base_url, endpoint_filter): + self.stub_auth(json=self.TEST_RESPONSE_DICT) + self.stub_url('GET', ['path'], + base_url=base_url, + text='SUCCESS', status_code=200) + + a = v2.Password(self.TEST_URL, username=self.TEST_USER, + password=self.TEST_PASS) + s = session.Session(auth=a) + + resp = s.get('/path', endpoint_filter=endpoint_filter) + + self.assertEqual(resp.status_code, 200) + self.assertEqual(self.requests.last_request.url, base_url + '/path') + + def test_service_url(self): + endpoint_filter = {'service_type': 'compute', + 'interface': 'admin', + 'service_name': 'nova'} + self._do_service_url_test('http://nova/novapi/admin', endpoint_filter) + + def test_service_url_defaults_to_public(self): + endpoint_filter = {'service_type': 'compute'} + self._do_service_url_test('http://nova/novapi/public', endpoint_filter) + + def test_endpoint_filter_without_service_type_fails(self): + self.stub_auth(json=self.TEST_RESPONSE_DICT) + + a = v2.Password(self.TEST_URL, username=self.TEST_USER, + password=self.TEST_PASS) + s = session.Session(auth=a) + + self.assertRaises(exceptions.EndpointNotFound, s.get, '/path', + endpoint_filter={'interface': 'admin'}) + + def test_full_url_overrides_endpoint_filter(self): + self.stub_auth(json=self.TEST_RESPONSE_DICT) + self.stub_url('GET', [], + base_url='http://testurl/', + text='SUCCESS', status_code=200) + + a = v2.Password(self.TEST_URL, username=self.TEST_USER, + password=self.TEST_PASS) + s = session.Session(auth=a) + + resp = s.get('http://testurl/', + endpoint_filter={'service_type': 'compute'}) + self.assertEqual(resp.status_code, 200) + self.assertEqual(resp.text, 'SUCCESS') + + def test_invalid_auth_response_dict(self): + self.stub_auth(json={'hello': 'world'}) + + a = v2.Password(self.TEST_URL, username=self.TEST_USER, + password=self.TEST_PASS) + s = session.Session(auth=a) + + self.assertRaises(exceptions.InvalidResponse, s.get, 'http://any', + authenticated=True) + + def test_invalid_auth_response_type(self): + self.stub_url('POST', ['tokens'], text='testdata') + + a = v2.Password(self.TEST_URL, username=self.TEST_USER, + password=self.TEST_PASS) + s = session.Session(auth=a) + + self.assertRaises(exceptions.InvalidResponse, s.get, 'http://any', + authenticated=True) + + def test_invalidate_response(self): + resp_data1 = copy.deepcopy(self.TEST_RESPONSE_DICT) + resp_data2 = copy.deepcopy(self.TEST_RESPONSE_DICT) + + resp_data1['access']['token']['id'] = 'token1' + resp_data2['access']['token']['id'] = 'token2' + + auth_responses = [{'json': resp_data1}, {'json': resp_data2}] + self.stub_auth(response_list=auth_responses) + + a = v2.Password(self.TEST_URL, username=self.TEST_USER, + password=self.TEST_PASS) + s = session.Session(auth=a) + + self.assertEqual('token1', s.get_token()) + self.assertEqual({'X-Auth-Token': 'token1'}, s.get_auth_headers()) + + a.invalidate() + self.assertEqual('token2', s.get_token()) + self.assertEqual({'X-Auth-Token': 'token2'}, s.get_auth_headers()) + + def test_doesnt_log_password(self): + self.stub_auth(json=self.TEST_RESPONSE_DICT) + password = uuid.uuid4().hex + + a = v2.Password(self.TEST_URL, username=self.TEST_USER, + password=password) + s = session.Session(auth=a) + self.assertEqual(self.TEST_TOKEN, s.get_token()) + self.assertEqual({'X-Auth-Token': self.TEST_TOKEN}, + s.get_auth_headers()) + self.assertNotIn(password, self.logger.output) + + def test_password_with_no_user_id_or_name(self): + self.assertRaises(TypeError, + v2.Password, self.TEST_URL, password=self.TEST_PASS) diff --git a/keystoneclient/tests/unit/auth/test_identity_v3.py b/keystoneclient/tests/unit/auth/test_identity_v3.py new file mode 100644 index 0000000..29cbb0e --- /dev/null +++ b/keystoneclient/tests/unit/auth/test_identity_v3.py @@ -0,0 +1,490 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import copy +import uuid + +from keystoneclient import access +from keystoneclient.auth.identity import v3 +from keystoneclient import client +from keystoneclient import exceptions +from keystoneclient import fixture +from keystoneclient import session +from keystoneclient.tests.unit import utils + + +class V3IdentityPlugin(utils.TestCase): + + TEST_ROOT_URL = 'http://127.0.0.1:5000/' + TEST_URL = '%s%s' % (TEST_ROOT_URL, 'v3') + TEST_ROOT_ADMIN_URL = 'http://127.0.0.1:35357/' + TEST_ADMIN_URL = '%s%s' % (TEST_ROOT_ADMIN_URL, 'v3') + + TEST_PASS = 'password' + + TEST_SERVICE_CATALOG = [{ + "endpoints": [{ + "url": "http://cdn.admin-nets.local:8774/v1.0/", + "region": "RegionOne", + "interface": "public" + }, { + "url": "http://127.0.0.1:8774/v1.0", + "region": "RegionOne", + "interface": "internal" + }, { + "url": "http://cdn.admin-nets.local:8774/v1.0", + "region": "RegionOne", + "interface": "admin" + }], + "type": "nova_compat" + }, { + "endpoints": [{ + "url": "http://nova/novapi/public", + "region": "RegionOne", + "interface": "public" + }, { + "url": "http://nova/novapi/internal", + "region": "RegionOne", + "interface": "internal" + }, { + "url": "http://nova/novapi/admin", + "region": "RegionOne", + "interface": "admin" + }], + "type": "compute", + "name": "nova", + }, { + "endpoints": [{ + "url": "http://glance/glanceapi/public", + "region": "RegionOne", + "interface": "public" + }, { + "url": "http://glance/glanceapi/internal", + "region": "RegionOne", + "interface": "internal" + }, { + "url": "http://glance/glanceapi/admin", + "region": "RegionOne", + "interface": "admin" + }], + "type": "image", + "name": "glance" + }, { + "endpoints": [{ + "url": "http://127.0.0.1:5000/v3", + "region": "RegionOne", + "interface": "public" + }, { + "url": "http://127.0.0.1:5000/v3", + "region": "RegionOne", + "interface": "internal" + }, { + "url": TEST_ADMIN_URL, + "region": "RegionOne", + "interface": "admin" + }], + "type": "identity" + }, { + "endpoints": [{ + "url": "http://swift/swiftapi/public", + "region": "RegionOne", + "interface": "public" + }, { + "url": "http://swift/swiftapi/internal", + "region": "RegionOne", + "interface": "internal" + }, { + "url": "http://swift/swiftapi/admin", + "region": "RegionOne", + "interface": "admin" + }], + "type": "object-store" + }] + + def setUp(self): + super(V3IdentityPlugin, self).setUp() + + V3_URL = "%sv3" % self.TEST_URL + self.TEST_DISCOVERY_RESPONSE = { + 'versions': {'values': [fixture.V3Discovery(V3_URL)]}} + + self.TEST_RESPONSE_DICT = { + "token": { + "methods": [ + "token", + "password" + ], + + "expires_at": "2020-01-01T00:00:10.000123Z", + "project": { + "domain": { + "id": self.TEST_DOMAIN_ID, + "name": self.TEST_DOMAIN_NAME + }, + "id": self.TEST_TENANT_ID, + "name": self.TEST_TENANT_NAME + }, + "user": { + "domain": { + "id": self.TEST_DOMAIN_ID, + "name": self.TEST_DOMAIN_NAME + }, + "id": self.TEST_USER, + "name": self.TEST_USER + }, + "issued_at": "2013-05-29T16:55:21.468960Z", + "catalog": self.TEST_SERVICE_CATALOG + }, + } + self.TEST_PROJECTS_RESPONSE = { + "projects": [ + { + "domain_id": "1789d1", + "enabled": "True", + "id": "263fd9", + "links": { + "self": "https://identity:5000/v3/projects/263fd9" + }, + "name": "Dev Group A" + }, + { + "domain_id": "1789d1", + "enabled": "True", + "id": "e56ad3", + "links": { + "self": "https://identity:5000/v3/projects/e56ad3" + }, + "name": "Dev Group B" + } + ], + "links": { + "self": "https://identity:5000/v3/projects", + } + } + + def stub_auth(self, subject_token=None, **kwargs): + if not subject_token: + subject_token = self.TEST_TOKEN + + self.stub_url('POST', ['auth', 'tokens'], + headers={'X-Subject-Token': subject_token}, **kwargs) + + def test_authenticate_with_username_password(self): + self.stub_auth(json=self.TEST_RESPONSE_DICT) + a = v3.Password(self.TEST_URL, + username=self.TEST_USER, + password=self.TEST_PASS) + s = session.Session(auth=a) + + self.assertEqual({'X-Auth-Token': self.TEST_TOKEN}, + s.get_auth_headers()) + + req = {'auth': {'identity': + {'methods': ['password'], + 'password': {'user': {'name': self.TEST_USER, + 'password': self.TEST_PASS}}}}} + + self.assertRequestBodyIs(json=req) + self.assertRequestHeaderEqual('Content-Type', 'application/json') + self.assertRequestHeaderEqual('Accept', 'application/json') + self.assertEqual(s.auth.auth_ref.auth_token, self.TEST_TOKEN) + + def test_authenticate_with_username_password_unscoped(self): + del self.TEST_RESPONSE_DICT['token']['catalog'] + del self.TEST_RESPONSE_DICT['token']['project'] + + self.stub_auth(json=self.TEST_RESPONSE_DICT) + self.stub_url(method="GET", json=self.TEST_DISCOVERY_RESPONSE) + test_user_id = self.TEST_RESPONSE_DICT['token']['user']['id'] + self.stub_url(method="GET", + json=self.TEST_PROJECTS_RESPONSE, + parts=['users', test_user_id, 'projects']) + + a = v3.Password(self.TEST_URL, + username=self.TEST_USER, + password=self.TEST_PASS) + s = session.Session(auth=a) + cs = client.Client(session=s, auth_url=self.TEST_URL) + + # As a sanity check on the auth_ref, make sure client has the + # proper user id, that it fetches the right project response + self.assertEqual(test_user_id, a.auth_ref.user_id) + t = cs.projects.list(user=a.auth_ref.user_id) + self.assertEqual(2, len(t)) + + def test_authenticate_with_username_password_domain_scoped(self): + self.stub_auth(json=self.TEST_RESPONSE_DICT) + a = v3.Password(self.TEST_URL, username=self.TEST_USER, + password=self.TEST_PASS, domain_id=self.TEST_DOMAIN_ID) + s = session.Session(a) + + self.assertEqual({'X-Auth-Token': self.TEST_TOKEN}, + s.get_auth_headers()) + + req = {'auth': {'identity': + {'methods': ['password'], + 'password': {'user': {'name': self.TEST_USER, + 'password': self.TEST_PASS}}}, + 'scope': {'domain': {'id': self.TEST_DOMAIN_ID}}}} + self.assertRequestBodyIs(json=req) + self.assertEqual(s.auth.auth_ref.auth_token, self.TEST_TOKEN) + + def test_authenticate_with_username_password_project_scoped(self): + self.stub_auth(json=self.TEST_RESPONSE_DICT) + a = v3.Password(self.TEST_URL, username=self.TEST_USER, + password=self.TEST_PASS, + project_id=self.TEST_DOMAIN_ID) + s = session.Session(a) + + self.assertEqual({'X-Auth-Token': self.TEST_TOKEN}, + s.get_auth_headers()) + + req = {'auth': {'identity': + {'methods': ['password'], + 'password': {'user': {'name': self.TEST_USER, + 'password': self.TEST_PASS}}}, + 'scope': {'project': {'id': self.TEST_DOMAIN_ID}}}} + self.assertRequestBodyIs(json=req) + self.assertEqual(s.auth.auth_ref.auth_token, self.TEST_TOKEN) + self.assertEqual(s.auth.auth_ref.project_id, self.TEST_DOMAIN_ID) + + def test_authenticate_with_token(self): + self.stub_auth(json=self.TEST_RESPONSE_DICT) + a = v3.Token(self.TEST_URL, self.TEST_TOKEN) + s = session.Session(auth=a) + + self.assertEqual({'X-Auth-Token': self.TEST_TOKEN}, + s.get_auth_headers()) + + req = {'auth': {'identity': + {'methods': ['token'], + 'token': {'id': self.TEST_TOKEN}}}} + + self.assertRequestBodyIs(json=req) + + self.assertRequestHeaderEqual('Content-Type', 'application/json') + self.assertRequestHeaderEqual('Accept', 'application/json') + self.assertEqual(s.auth.auth_ref.auth_token, self.TEST_TOKEN) + + def test_with_expired(self): + self.stub_auth(json=self.TEST_RESPONSE_DICT) + + d = copy.deepcopy(self.TEST_RESPONSE_DICT) + d['token']['expires_at'] = '2000-01-01T00:00:10.000123Z' + + a = v3.Password(self.TEST_URL, username='username', + password='password') + a.auth_ref = access.AccessInfo.factory(body=d) + s = session.Session(auth=a) + + self.assertEqual({'X-Auth-Token': self.TEST_TOKEN}, + s.get_auth_headers()) + + self.assertEqual(a.auth_ref['expires_at'], + self.TEST_RESPONSE_DICT['token']['expires_at']) + + def test_with_domain_and_project_scoping(self): + a = v3.Password(self.TEST_URL, username='username', + password='password', project_id='project', + domain_id='domain') + + self.assertRaises(exceptions.AuthorizationFailure, + a.get_token, None) + self.assertRaises(exceptions.AuthorizationFailure, + a.get_headers, None) + + def test_with_trust_id(self): + self.stub_auth(json=self.TEST_RESPONSE_DICT) + a = v3.Password(self.TEST_URL, username=self.TEST_USER, + password=self.TEST_PASS, trust_id='trust') + s = session.Session(a) + + self.assertEqual({'X-Auth-Token': self.TEST_TOKEN}, + s.get_auth_headers()) + + req = {'auth': {'identity': + {'methods': ['password'], + 'password': {'user': {'name': self.TEST_USER, + 'password': self.TEST_PASS}}}, + 'scope': {'OS-TRUST:trust': {'id': 'trust'}}}} + self.assertRequestBodyIs(json=req) + self.assertEqual(s.auth.auth_ref.auth_token, self.TEST_TOKEN) + + def test_with_multiple_mechanisms_factory(self): + self.stub_auth(json=self.TEST_RESPONSE_DICT) + p = v3.PasswordMethod(username=self.TEST_USER, password=self.TEST_PASS) + t = v3.TokenMethod(token='foo') + a = v3.Auth(self.TEST_URL, [p, t], trust_id='trust') + s = session.Session(a) + + self.assertEqual({'X-Auth-Token': self.TEST_TOKEN}, + s.get_auth_headers()) + + req = {'auth': {'identity': + {'methods': ['password', 'token'], + 'password': {'user': {'name': self.TEST_USER, + 'password': self.TEST_PASS}}, + 'token': {'id': 'foo'}}, + 'scope': {'OS-TRUST:trust': {'id': 'trust'}}}} + self.assertRequestBodyIs(json=req) + self.assertEqual(s.auth.auth_ref.auth_token, self.TEST_TOKEN) + + def test_with_multiple_mechanisms(self): + self.stub_auth(json=self.TEST_RESPONSE_DICT) + p = v3.PasswordMethod(username=self.TEST_USER, + password=self.TEST_PASS) + t = v3.TokenMethod(token='foo') + a = v3.Auth(self.TEST_URL, [p, t], trust_id='trust') + s = session.Session(auth=a) + + self.assertEqual({'X-Auth-Token': self.TEST_TOKEN}, + s.get_auth_headers()) + + req = {'auth': {'identity': + {'methods': ['password', 'token'], + 'password': {'user': {'name': self.TEST_USER, + 'password': self.TEST_PASS}}, + 'token': {'id': 'foo'}}, + 'scope': {'OS-TRUST:trust': {'id': 'trust'}}}} + self.assertRequestBodyIs(json=req) + self.assertEqual(s.auth.auth_ref.auth_token, self.TEST_TOKEN) + + def test_with_multiple_scopes(self): + s = session.Session() + + a = v3.Password(self.TEST_URL, + username=self.TEST_USER, password=self.TEST_PASS, + domain_id='x', project_id='x') + self.assertRaises(exceptions.AuthorizationFailure, a.get_auth_ref, s) + + a = v3.Password(self.TEST_URL, + username=self.TEST_USER, password=self.TEST_PASS, + domain_id='x', trust_id='x') + self.assertRaises(exceptions.AuthorizationFailure, a.get_auth_ref, s) + + def _do_service_url_test(self, base_url, endpoint_filter): + self.stub_auth(json=self.TEST_RESPONSE_DICT) + self.stub_url('GET', ['path'], + base_url=base_url, + text='SUCCESS', status_code=200) + + a = v3.Password(self.TEST_URL, username=self.TEST_USER, + password=self.TEST_PASS) + s = session.Session(auth=a) + + resp = s.get('/path', endpoint_filter=endpoint_filter) + + self.assertEqual(resp.status_code, 200) + self.assertEqual(self.requests.last_request.url, base_url + '/path') + + def test_service_url(self): + endpoint_filter = {'service_type': 'compute', + 'interface': 'admin', + 'service_name': 'nova'} + self._do_service_url_test('http://nova/novapi/admin', endpoint_filter) + + def test_service_url_defaults_to_public(self): + endpoint_filter = {'service_type': 'compute'} + self._do_service_url_test('http://nova/novapi/public', endpoint_filter) + + def test_endpoint_filter_without_service_type_fails(self): + self.stub_auth(json=self.TEST_RESPONSE_DICT) + + a = v3.Password(self.TEST_URL, username=self.TEST_USER, + password=self.TEST_PASS) + s = session.Session(auth=a) + + self.assertRaises(exceptions.EndpointNotFound, s.get, '/path', + endpoint_filter={'interface': 'admin'}) + + def test_full_url_overrides_endpoint_filter(self): + self.stub_auth(json=self.TEST_RESPONSE_DICT) + self.stub_url('GET', [], + base_url='http://testurl/', + text='SUCCESS', status_code=200) + + a = v3.Password(self.TEST_URL, username=self.TEST_USER, + password=self.TEST_PASS) + s = session.Session(auth=a) + + resp = s.get('http://testurl/', + endpoint_filter={'service_type': 'compute'}) + self.assertEqual(resp.status_code, 200) + self.assertEqual(resp.text, 'SUCCESS') + + def test_invalid_auth_response_dict(self): + self.stub_auth(json={'hello': 'world'}) + + a = v3.Password(self.TEST_URL, username=self.TEST_USER, + password=self.TEST_PASS) + s = session.Session(auth=a) + + self.assertRaises(exceptions.InvalidResponse, s.get, 'http://any', + authenticated=True) + + def test_invalid_auth_response_type(self): + self.stub_url('POST', ['auth', 'tokens'], text='testdata') + + a = v3.Password(self.TEST_URL, username=self.TEST_USER, + password=self.TEST_PASS) + s = session.Session(auth=a) + + self.assertRaises(exceptions.InvalidResponse, s.get, 'http://any', + authenticated=True) + + def test_invalidate_response(self): + auth_responses = [{'status_code': 200, 'json': self.TEST_RESPONSE_DICT, + 'headers': {'X-Subject-Token': 'token1'}}, + {'status_code': 200, 'json': self.TEST_RESPONSE_DICT, + 'headers': {'X-Subject-Token': 'token2'}}] + + self.requests.post('%s/auth/tokens' % self.TEST_URL, auth_responses) + + a = v3.Password(self.TEST_URL, username=self.TEST_USER, + password=self.TEST_PASS) + s = session.Session(auth=a) + + self.assertEqual('token1', s.get_token()) + self.assertEqual({'X-Auth-Token': 'token1'}, s.get_auth_headers()) + a.invalidate() + self.assertEqual('token2', s.get_token()) + self.assertEqual({'X-Auth-Token': 'token2'}, s.get_auth_headers()) + + def test_doesnt_log_password(self): + self.stub_auth(json=self.TEST_RESPONSE_DICT) + + password = uuid.uuid4().hex + a = v3.Password(self.TEST_URL, username=self.TEST_USER, + password=password) + s = session.Session(a) + self.assertEqual(self.TEST_TOKEN, s.get_token()) + self.assertEqual({'X-Auth-Token': self.TEST_TOKEN}, + s.get_auth_headers()) + + self.assertNotIn(password, self.logger.output) + + def test_sends_nocatalog(self): + del self.TEST_RESPONSE_DICT['token']['catalog'] + self.stub_auth(json=self.TEST_RESPONSE_DICT) + + a = v3.Password(self.TEST_URL, + username=self.TEST_USER, + password=self.TEST_PASS, + include_catalog=False) + s = session.Session(auth=a) + + s.get_token() + + auth_url = self.TEST_URL + '/auth/tokens' + self.assertEqual(auth_url, a.token_url) + self.assertEqual(auth_url + '?nocatalog', + self.requests.last_request.url) diff --git a/keystoneclient/tests/unit/auth/test_password.py b/keystoneclient/tests/unit/auth/test_password.py new file mode 100644 index 0000000..c5067c0 --- /dev/null +++ b/keystoneclient/tests/unit/auth/test_password.py @@ -0,0 +1,63 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import uuid + +from keystoneclient.auth.identity.generic import password +from keystoneclient.auth.identity import v2 +from keystoneclient.auth.identity import v3 +from keystoneclient.tests.unit.auth import utils + + +class PasswordTests(utils.GenericPluginTestCase): + + PLUGIN_CLASS = password.Password + V2_PLUGIN_CLASS = v2.Password + V3_PLUGIN_CLASS = v3.Password + + def new_plugin(self, **kwargs): + kwargs.setdefault('username', uuid.uuid4().hex) + kwargs.setdefault('password', uuid.uuid4().hex) + return super(PasswordTests, self).new_plugin(**kwargs) + + def test_with_user_domain_params(self): + self.stub_discovery() + + self.assertCreateV3(domain_id=uuid.uuid4().hex, + user_domain_id=uuid.uuid4().hex) + + def test_v3_user_params_v2_url(self): + self.stub_discovery(v3=False) + self.assertDiscoveryFailure(user_domain_id=uuid.uuid4().hex) + + def test_options(self): + opts = [o.name for o in self.PLUGIN_CLASS.get_options()] + + allowed_opts = ['user-name', + 'user-domain-id', + 'user-domain-name', + 'user-id', + 'password', + + 'domain-id', + 'domain-name', + 'tenant-id', + 'tenant-name', + 'project-id', + 'project-name', + 'project-domain-id', + 'project-domain-name', + 'trust-id', + 'auth-url'] + + self.assertEqual(set(allowed_opts), set(opts)) + self.assertEqual(len(allowed_opts), len(opts)) diff --git a/keystoneclient/tests/unit/auth/test_token.py b/keystoneclient/tests/unit/auth/test_token.py new file mode 100644 index 0000000..928e2b2 --- /dev/null +++ b/keystoneclient/tests/unit/auth/test_token.py @@ -0,0 +1,47 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import uuid + +from keystoneclient.auth.identity.generic import token +from keystoneclient.auth.identity import v2 +from keystoneclient.auth.identity import v3 +from keystoneclient.tests.unit.auth import utils + + +class TokenTests(utils.GenericPluginTestCase): + + PLUGIN_CLASS = token.Token + V2_PLUGIN_CLASS = v2.Token + V3_PLUGIN_CLASS = v3.Token + + def new_plugin(self, **kwargs): + kwargs.setdefault('token', uuid.uuid4().hex) + return super(TokenTests, self).new_plugin(**kwargs) + + def test_options(self): + opts = [o.name for o in self.PLUGIN_CLASS.get_options()] + + allowed_opts = ['token', + 'domain-id', + 'domain-name', + 'tenant-id', + 'tenant-name', + 'project-id', + 'project-name', + 'project-domain-id', + 'project-domain-name', + 'trust-id', + 'auth-url'] + + self.assertEqual(set(allowed_opts), set(opts)) + self.assertEqual(len(allowed_opts), len(opts)) diff --git a/keystoneclient/tests/unit/auth/test_token_endpoint.py b/keystoneclient/tests/unit/auth/test_token_endpoint.py new file mode 100644 index 0000000..4b5f82c --- /dev/null +++ b/keystoneclient/tests/unit/auth/test_token_endpoint.py @@ -0,0 +1,63 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from testtools import matchers + +from keystoneclient.auth import token_endpoint +from keystoneclient import session +from keystoneclient.tests.unit import utils + + +class TokenEndpointTest(utils.TestCase): + + TEST_TOKEN = 'aToken' + TEST_URL = 'http://server/prefix' + + def test_basic_case(self): + self.requests.get(self.TEST_URL, text='body') + + a = token_endpoint.Token(self.TEST_URL, self.TEST_TOKEN) + s = session.Session(auth=a) + + data = s.get(self.TEST_URL, authenticated=True) + + self.assertEqual(data.text, 'body') + self.assertRequestHeaderEqual('X-Auth-Token', self.TEST_TOKEN) + + def test_basic_endpoint_case(self): + self.stub_url('GET', ['p'], text='body') + a = token_endpoint.Token(self.TEST_URL, self.TEST_TOKEN) + s = session.Session(auth=a) + + data = s.get('/p', + authenticated=True, + endpoint_filter={'service': 'identity'}) + + self.assertEqual(self.TEST_URL, a.get_endpoint(s)) + self.assertEqual('body', data.text) + self.assertRequestHeaderEqual('X-Auth-Token', self.TEST_TOKEN) + + def test_token_endpoint_options(self): + opt_names = [opt.name for opt in token_endpoint.Token.get_options()] + + self.assertThat(opt_names, matchers.HasLength(2)) + + self.assertIn('token', opt_names) + self.assertIn('endpoint', opt_names) + + def test_token_endpoint_user_id(self): + a = token_endpoint.Token(self.TEST_URL, self.TEST_TOKEN) + s = session.Session() + + # we can't know this information about this sort of plugin + self.assertIsNone(a.get_user_id(s)) + self.assertIsNone(a.get_project_id(s)) diff --git a/keystoneclient/tests/unit/auth/utils.py b/keystoneclient/tests/unit/auth/utils.py new file mode 100644 index 0000000..6580c73 --- /dev/null +++ b/keystoneclient/tests/unit/auth/utils.py @@ -0,0 +1,200 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import functools +import uuid + +import mock +from oslo_config import cfg +import six + +from keystoneclient import access +from keystoneclient.auth import base +from keystoneclient import exceptions +from keystoneclient import fixture +from keystoneclient import session +from keystoneclient.tests.unit import utils + + +class MockPlugin(base.BaseAuthPlugin): + + INT_DESC = 'test int' + FLOAT_DESC = 'test float' + BOOL_DESC = 'test bool' + STR_DESC = 'test str' + STR_DEFAULT = uuid.uuid4().hex + + def __init__(self, **kwargs): + self._data = kwargs + + def __getitem__(self, key): + return self._data[key] + + def get_token(self, *args, **kwargs): + return 'aToken' + + def get_endpoint(self, *args, **kwargs): + return 'http://test' + + @classmethod + def get_options(cls): + return [ + cfg.IntOpt('a-int', default='3', help=cls.INT_DESC), + cfg.BoolOpt('a-bool', help=cls.BOOL_DESC), + cfg.FloatOpt('a-float', help=cls.FLOAT_DESC), + cfg.StrOpt('a-str', help=cls.STR_DESC, default=cls.STR_DEFAULT), + ] + + +class MockManager(object): + + def __init__(self, driver): + self.driver = driver + + +def mock_plugin(f): + @functools.wraps(f) + def inner(*args, **kwargs): + with mock.patch.object(base, 'get_plugin_class') as m: + m.return_value = MockPlugin + args = list(args) + [m] + return f(*args, **kwargs) + + return inner + + +class TestCase(utils.TestCase): + + GROUP = 'auth' + V2PASS = 'v2password' + V3TOKEN = 'v3token' + + a_int = 88 + a_float = 88.8 + a_bool = False + + TEST_VALS = {'a_int': a_int, + 'a_float': a_float, + 'a_bool': a_bool} + + def assertTestVals(self, plugin, vals=TEST_VALS): + for k, v in six.iteritems(vals): + self.assertEqual(v, plugin[k]) + + +class GenericPluginTestCase(utils.TestCase): + + TEST_URL = 'http://keystone.host:5000/' + + # OVERRIDE THESE IN SUB CLASSES + PLUGIN_CLASS = None + V2_PLUGIN_CLASS = None + V3_PLUGIN_CLASS = None + + def setUp(self): + super(GenericPluginTestCase, self).setUp() + + self.token_v2 = fixture.V2Token() + self.token_v3 = fixture.V3Token() + self.token_v3_id = uuid.uuid4().hex + self.session = session.Session() + + self.stub_url('POST', ['v2.0', 'tokens'], json=self.token_v2) + self.stub_url('POST', ['v3', 'auth', 'tokens'], + headers={'X-Subject-Token': self.token_v3_id}, + json=self.token_v3) + + def new_plugin(self, **kwargs): + kwargs.setdefault('auth_url', self.TEST_URL) + return self.PLUGIN_CLASS(**kwargs) + + def stub_discovery(self, base_url=None, **kwargs): + kwargs.setdefault('href', self.TEST_URL) + disc = fixture.DiscoveryList(**kwargs) + self.stub_url('GET', json=disc, base_url=base_url, status_code=300) + return disc + + def assertCreateV3(self, **kwargs): + auth = self.new_plugin(**kwargs) + auth_ref = auth.get_auth_ref(self.session) + self.assertIsInstance(auth_ref, access.AccessInfoV3) + self.assertEqual(self.TEST_URL + 'v3/auth/tokens', + self.requests.last_request.url) + self.assertIsInstance(auth._plugin, self.V3_PLUGIN_CLASS) + return auth + + def assertCreateV2(self, **kwargs): + auth = self.new_plugin(**kwargs) + auth_ref = auth.get_auth_ref(self.session) + self.assertIsInstance(auth_ref, access.AccessInfoV2) + self.assertEqual(self.TEST_URL + 'v2.0/tokens', + self.requests.last_request.url) + self.assertIsInstance(auth._plugin, self.V2_PLUGIN_CLASS) + return auth + + def assertDiscoveryFailure(self, **kwargs): + plugin = self.new_plugin(**kwargs) + self.assertRaises(exceptions.DiscoveryFailure, + plugin.get_auth_ref, + self.session) + + def test_create_v3_if_domain_params(self): + self.stub_discovery() + + self.assertCreateV3(domain_id=uuid.uuid4().hex) + self.assertCreateV3(domain_name=uuid.uuid4().hex) + self.assertCreateV3(project_name=uuid.uuid4().hex, + project_domain_name=uuid.uuid4().hex) + self.assertCreateV3(project_name=uuid.uuid4().hex, + project_domain_id=uuid.uuid4().hex) + + def test_create_v2_if_no_domain_params(self): + self.stub_discovery() + self.assertCreateV2() + self.assertCreateV2(project_id=uuid.uuid4().hex) + self.assertCreateV2(project_name=uuid.uuid4().hex) + self.assertCreateV2(tenant_id=uuid.uuid4().hex) + self.assertCreateV2(tenant_name=uuid.uuid4().hex) + + def test_v3_params_v2_url(self): + self.stub_discovery(v3=False) + self.assertDiscoveryFailure(domain_name=uuid.uuid4().hex) + + def test_v2_params_v3_url(self): + self.stub_discovery(v2=False) + self.assertCreateV3() + + def test_no_urls(self): + self.stub_discovery(v2=False, v3=False) + self.assertDiscoveryFailure() + + def test_path_based_url_v2(self): + self.stub_url('GET', ['v2.0'], status_code=403) + self.assertCreateV2(auth_url=self.TEST_URL + 'v2.0') + + def test_path_based_url_v3(self): + self.stub_url('GET', ['v3'], status_code=403) + self.assertCreateV3(auth_url=self.TEST_URL + 'v3') + + def test_disc_error_for_failure(self): + self.stub_url('GET', [], status_code=403) + self.assertDiscoveryFailure() + + def test_v3_plugin_from_failure(self): + url = self.TEST_URL + 'v3' + self.stub_url('GET', [], base_url=url, status_code=403) + self.assertCreateV3(auth_url=url) + + def test_unknown_discovery_version(self): + # make a v4 entry that's mostly the same as a v3 + self.stub_discovery(v2=False, v3_id='v4.0') + self.assertDiscoveryFailure() |
