summaryrefslogtreecommitdiff
path: root/keystoneclient/tests/unit/auth
diff options
context:
space:
mode:
authorJamie Lennox <jamielennox@redhat.com>2015-02-11 19:03:25 +1100
committerJamie Lennox <jamielennox@redhat.com>2015-02-11 19:03:25 +1100
commit6bd93179a2966f2b5c67e297628510ac73689fb3 (patch)
treefaf3a93a16fb49b4a742f74b6fcdd20f8a0ebd0e /keystoneclient/tests/unit/auth
parent58ac2de5d4a6b58e8bd5d430a04199a4d40427a8 (diff)
downloadpython-keystoneclient-6bd93179a2966f2b5c67e297628510ac73689fb3.tar.gz
Move tests to the unit subdirectory
Move all the existing tests to the unit/ subdirectory. This gives us some room to add a functional/ directory later with other tests. Change-Id: I0fb8d5b628eb8ee1f35f05f42d0c0ac9f285e8c3 Implements: functional-testing
Diffstat (limited to 'keystoneclient/tests/unit/auth')
-rw-r--r--keystoneclient/tests/unit/auth/__init__.py0
-rw-r--r--keystoneclient/tests/unit/auth/test_access.py61
-rw-r--r--keystoneclient/tests/unit/auth/test_cli.py196
-rw-r--r--keystoneclient/tests/unit/auth/test_conf.py177
-rw-r--r--keystoneclient/tests/unit/auth/test_identity_common.py422
-rw-r--r--keystoneclient/tests/unit/auth/test_identity_v2.py295
-rw-r--r--keystoneclient/tests/unit/auth/test_identity_v3.py490
-rw-r--r--keystoneclient/tests/unit/auth/test_password.py63
-rw-r--r--keystoneclient/tests/unit/auth/test_token.py47
-rw-r--r--keystoneclient/tests/unit/auth/test_token_endpoint.py63
-rw-r--r--keystoneclient/tests/unit/auth/utils.py200
11 files changed, 2014 insertions, 0 deletions
diff --git a/keystoneclient/tests/unit/auth/__init__.py b/keystoneclient/tests/unit/auth/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/keystoneclient/tests/unit/auth/__init__.py
diff --git a/keystoneclient/tests/unit/auth/test_access.py b/keystoneclient/tests/unit/auth/test_access.py
new file mode 100644
index 0000000..405fb8b
--- /dev/null
+++ b/keystoneclient/tests/unit/auth/test_access.py
@@ -0,0 +1,61 @@
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import uuid
+
+from keystoneclient import access
+from keystoneclient import auth
+from keystoneclient.auth.identity import access as access_plugin
+from keystoneclient import fixture
+from keystoneclient import session
+from keystoneclient.tests.unit import utils
+
+
+class AccessInfoPluginTests(utils.TestCase):
+
+ def setUp(self):
+ super(AccessInfoPluginTests, self).setUp()
+ self.session = session.Session()
+ self.auth_token = uuid.uuid4().hex
+
+ def _plugin(self, **kwargs):
+ token = fixture.V3Token()
+ s = token.add_service('identity')
+ s.add_standard_endpoints(public=self.TEST_ROOT_URL)
+
+ auth_ref = access.AccessInfo.factory(body=token,
+ auth_token=self.auth_token)
+ return access_plugin.AccessInfoPlugin(auth_ref, **kwargs)
+
+ def test_auth_ref(self):
+ plugin = self._plugin()
+ self.assertEqual(self.TEST_ROOT_URL,
+ plugin.get_endpoint(self.session,
+ service_type='identity',
+ interface='public'))
+ self.assertEqual(self.auth_token, plugin.get_token(session))
+
+ def test_auth_url(self):
+ auth_url = 'http://keystone.test.url'
+ plugin = self._plugin(auth_url=auth_url)
+
+ self.assertEqual(auth_url,
+ plugin.get_endpoint(self.session,
+ interface=auth.AUTH_INTERFACE))
+
+ def test_invalidate(self):
+ plugin = self._plugin()
+ auth_ref = plugin.auth_ref
+
+ self.assertIsInstance(auth_ref, access.AccessInfo)
+ self.assertFalse(plugin.invalidate())
+ self.assertIs(auth_ref, plugin.auth_ref)
diff --git a/keystoneclient/tests/unit/auth/test_cli.py b/keystoneclient/tests/unit/auth/test_cli.py
new file mode 100644
index 0000000..d65de73
--- /dev/null
+++ b/keystoneclient/tests/unit/auth/test_cli.py
@@ -0,0 +1,196 @@
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import argparse
+import uuid
+
+import fixtures
+import mock
+from oslo_config import cfg
+
+from keystoneclient.auth import base
+from keystoneclient.auth import cli
+from keystoneclient.tests.unit.auth import utils
+
+
+class TesterPlugin(base.BaseAuthPlugin):
+
+ def get_token(self, *args, **kwargs):
+ return None
+
+ @classmethod
+ def get_options(cls):
+ # NOTE(jamielennox): this is kind of horrible. If you specify this as
+ # a deprecated_name= value it will convert - to _ which is not what we
+ # want for a CLI option.
+ deprecated = [cfg.DeprecatedOpt('test-other')]
+ return [
+ cfg.StrOpt('test-opt', help='tester', deprecated_opts=deprecated)
+ ]
+
+
+class CliTests(utils.TestCase):
+
+ def setUp(self):
+ super(CliTests, self).setUp()
+ self.p = argparse.ArgumentParser()
+
+ def env(self, name, value=None):
+ if value is not None:
+ # environment variables are always strings
+ value = str(value)
+
+ return self.useFixture(fixtures.EnvironmentVariable(name, value))
+
+ def test_creating_with_no_args(self):
+ ret = cli.register_argparse_arguments(self.p, [])
+ self.assertIsNone(ret)
+ self.assertIn('--os-auth-plugin', self.p.format_usage())
+
+ def test_load_with_nothing(self):
+ cli.register_argparse_arguments(self.p, [])
+ opts = self.p.parse_args([])
+ self.assertIsNone(cli.load_from_argparse_arguments(opts))
+
+ @utils.mock_plugin
+ def test_basic_params_added(self, m):
+ name = uuid.uuid4().hex
+ argv = ['--os-auth-plugin', name]
+ ret = cli.register_argparse_arguments(self.p, argv)
+ self.assertIs(utils.MockPlugin, ret)
+
+ for n in ('--os-a-int', '--os-a-bool', '--os-a-float'):
+ self.assertIn(n, self.p.format_usage())
+
+ m.assert_called_once_with(name)
+
+ @utils.mock_plugin
+ def test_param_loading(self, m):
+ name = uuid.uuid4().hex
+ argv = ['--os-auth-plugin', name,
+ '--os-a-int', str(self.a_int),
+ '--os-a-float', str(self.a_float),
+ '--os-a-bool', str(self.a_bool)]
+
+ klass = cli.register_argparse_arguments(self.p, argv)
+ self.assertIs(utils.MockPlugin, klass)
+
+ opts = self.p.parse_args(argv)
+ self.assertEqual(name, opts.os_auth_plugin)
+
+ a = cli.load_from_argparse_arguments(opts)
+ self.assertTestVals(a)
+
+ self.assertEqual(name, opts.os_auth_plugin)
+ self.assertEqual(str(self.a_int), opts.os_a_int)
+ self.assertEqual(str(self.a_float), opts.os_a_float)
+ self.assertEqual(str(self.a_bool), opts.os_a_bool)
+
+ @utils.mock_plugin
+ def test_default_options(self, m):
+ name = uuid.uuid4().hex
+ argv = ['--os-auth-plugin', name,
+ '--os-a-float', str(self.a_float)]
+
+ klass = cli.register_argparse_arguments(self.p, argv)
+ self.assertIs(utils.MockPlugin, klass)
+
+ opts = self.p.parse_args(argv)
+ self.assertEqual(name, opts.os_auth_plugin)
+
+ a = cli.load_from_argparse_arguments(opts)
+
+ self.assertEqual(self.a_float, a['a_float'])
+ self.assertEqual(3, a['a_int'])
+
+ @utils.mock_plugin
+ def test_with_default_string_value(self, m):
+ name = uuid.uuid4().hex
+ klass = cli.register_argparse_arguments(self.p, [], default=name)
+ self.assertIs(utils.MockPlugin, klass)
+ m.assert_called_once_with(name)
+
+ @utils.mock_plugin
+ def test_overrides_default_string_value(self, m):
+ name = uuid.uuid4().hex
+ default = uuid.uuid4().hex
+ argv = ['--os-auth-plugin', name]
+ klass = cli.register_argparse_arguments(self.p, argv, default=default)
+ self.assertIs(utils.MockPlugin, klass)
+ m.assert_called_once_with(name)
+
+ @utils.mock_plugin
+ def test_with_default_type_value(self, m):
+ klass = cli.register_argparse_arguments(self.p, [],
+ default=utils.MockPlugin)
+ self.assertIs(utils.MockPlugin, klass)
+ self.assertEqual(0, m.call_count)
+
+ @utils.mock_plugin
+ def test_overrides_default_type_value(self, m):
+ # using this test plugin would fail if called because there
+ # is no get_options() function
+ class TestPlugin(object):
+ pass
+ name = uuid.uuid4().hex
+ argv = ['--os-auth-plugin', name]
+ klass = cli.register_argparse_arguments(self.p, argv,
+ default=TestPlugin)
+ self.assertIs(utils.MockPlugin, klass)
+ m.assert_called_once_with(name)
+
+ @utils.mock_plugin
+ def test_env_overrides_default_opt(self, m):
+ name = uuid.uuid4().hex
+ val = uuid.uuid4().hex
+ self.env('OS_A_STR', val)
+
+ klass = cli.register_argparse_arguments(self.p, [], default=name)
+ opts = self.p.parse_args([])
+ a = klass.load_from_argparse_arguments(opts)
+
+ self.assertEqual(val, a['a_str'])
+
+ def test_deprecated_cli_options(self):
+ TesterPlugin.register_argparse_arguments(self.p)
+ val = uuid.uuid4().hex
+ opts = self.p.parse_args(['--os-test-other', val])
+ self.assertEqual(val, opts.os_test_opt)
+
+ def test_deprecated_multi_cli_options(self):
+ TesterPlugin.register_argparse_arguments(self.p)
+ val1 = uuid.uuid4().hex
+ val2 = uuid.uuid4().hex
+ # argarse rules say that the last specified wins.
+ opts = self.p.parse_args(['--os-test-other', val2,
+ '--os-test-opt', val1])
+ self.assertEqual(val1, opts.os_test_opt)
+
+ def test_deprecated_env_options(self):
+ val = uuid.uuid4().hex
+
+ with mock.patch.dict('os.environ', {'OS_TEST_OTHER': val}):
+ TesterPlugin.register_argparse_arguments(self.p)
+
+ opts = self.p.parse_args([])
+ self.assertEqual(val, opts.os_test_opt)
+
+ def test_deprecated_env_multi_options(self):
+ val1 = uuid.uuid4().hex
+ val2 = uuid.uuid4().hex
+
+ with mock.patch.dict('os.environ', {'OS_TEST_OPT': val1,
+ 'OS_TEST_OTHER': val2}):
+ TesterPlugin.register_argparse_arguments(self.p)
+
+ opts = self.p.parse_args([])
+ self.assertEqual(val1, opts.os_test_opt)
diff --git a/keystoneclient/tests/unit/auth/test_conf.py b/keystoneclient/tests/unit/auth/test_conf.py
new file mode 100644
index 0000000..c3ce8eb
--- /dev/null
+++ b/keystoneclient/tests/unit/auth/test_conf.py
@@ -0,0 +1,177 @@
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import uuid
+
+import mock
+from oslo_config import cfg
+from oslo_config import fixture as config
+import stevedore
+
+from keystoneclient.auth import base
+from keystoneclient.auth import conf
+from keystoneclient.auth.identity import v2 as v2_auth
+from keystoneclient.auth.identity import v3 as v3_auth
+from keystoneclient import exceptions
+from keystoneclient.tests.unit.auth import utils
+
+
+class ConfTests(utils.TestCase):
+
+ def setUp(self):
+ super(ConfTests, self).setUp()
+ self.conf_fixture = self.useFixture(config.Config())
+
+ # NOTE(jamielennox): we register the basic config options first because
+ # we need them in place before we can stub them. We will need to run
+ # the register again after we stub the auth section and auth plugin so
+ # it can load the plugin specific options.
+ conf.register_conf_options(self.conf_fixture.conf, group=self.GROUP)
+
+ def test_loading_v2(self):
+ section = uuid.uuid4().hex
+ username = uuid.uuid4().hex
+ password = uuid.uuid4().hex
+ trust_id = uuid.uuid4().hex
+ tenant_id = uuid.uuid4().hex
+
+ self.conf_fixture.config(auth_section=section, group=self.GROUP)
+ conf.register_conf_options(self.conf_fixture.conf, group=self.GROUP)
+
+ self.conf_fixture.register_opts(v2_auth.Password.get_options(),
+ group=section)
+
+ self.conf_fixture.config(auth_plugin=self.V2PASS,
+ username=username,
+ password=password,
+ trust_id=trust_id,
+ tenant_id=tenant_id,
+ group=section)
+
+ a = conf.load_from_conf_options(self.conf_fixture.conf, self.GROUP)
+
+ self.assertEqual(username, a.username)
+ self.assertEqual(password, a.password)
+ self.assertEqual(trust_id, a.trust_id)
+ self.assertEqual(tenant_id, a.tenant_id)
+
+ def test_loading_v3(self):
+ section = uuid.uuid4().hex
+ token = uuid.uuid4().hex
+ trust_id = uuid.uuid4().hex
+ project_id = uuid.uuid4().hex
+ project_domain_name = uuid.uuid4().hex
+
+ self.conf_fixture.config(auth_section=section, group=self.GROUP)
+ conf.register_conf_options(self.conf_fixture.conf, group=self.GROUP)
+
+ self.conf_fixture.register_opts(v3_auth.Token.get_options(),
+ group=section)
+
+ self.conf_fixture.config(auth_plugin=self.V3TOKEN,
+ token=token,
+ trust_id=trust_id,
+ project_id=project_id,
+ project_domain_name=project_domain_name,
+ group=section)
+
+ a = conf.load_from_conf_options(self.conf_fixture.conf, self.GROUP)
+
+ self.assertEqual(token, a.auth_methods[0].token)
+ self.assertEqual(trust_id, a.trust_id)
+ self.assertEqual(project_id, a.project_id)
+ self.assertEqual(project_domain_name, a.project_domain_name)
+
+ def test_loading_invalid_plugin(self):
+ auth_plugin = uuid.uuid4().hex
+ self.conf_fixture.config(auth_plugin=auth_plugin,
+ group=self.GROUP)
+
+ e = self.assertRaises(exceptions.NoMatchingPlugin,
+ conf.load_from_conf_options,
+ self.conf_fixture.conf,
+ self.GROUP)
+
+ self.assertEqual(auth_plugin, e.name)
+
+ def test_loading_with_no_data(self):
+ self.assertIsNone(conf.load_from_conf_options(self.conf_fixture.conf,
+ self.GROUP))
+
+ @mock.patch('stevedore.DriverManager')
+ def test_other_params(self, m):
+ m.return_value = utils.MockManager(utils.MockPlugin)
+ driver_name = uuid.uuid4().hex
+
+ self.conf_fixture.register_opts(utils.MockPlugin.get_options(),
+ group=self.GROUP)
+ self.conf_fixture.config(auth_plugin=driver_name,
+ group=self.GROUP,
+ **self.TEST_VALS)
+
+ a = conf.load_from_conf_options(self.conf_fixture.conf, self.GROUP)
+ self.assertTestVals(a)
+
+ m.assert_called_once_with(namespace=base.PLUGIN_NAMESPACE,
+ name=driver_name,
+ invoke_on_load=False)
+
+ @utils.mock_plugin
+ def test_same_section(self, m):
+ self.conf_fixture.register_opts(utils.MockPlugin.get_options(),
+ group=self.GROUP)
+ conf.register_conf_options(self.conf_fixture.conf, group=self.GROUP)
+ self.conf_fixture.config(auth_plugin=uuid.uuid4().hex,
+ group=self.GROUP,
+ **self.TEST_VALS)
+
+ a = conf.load_from_conf_options(self.conf_fixture.conf, self.GROUP)
+ self.assertTestVals(a)
+
+ @utils.mock_plugin
+ def test_diff_section(self, m):
+ section = uuid.uuid4().hex
+
+ self.conf_fixture.config(auth_section=section, group=self.GROUP)
+ conf.register_conf_options(self.conf_fixture.conf, group=self.GROUP)
+
+ self.conf_fixture.register_opts(utils.MockPlugin.get_options(),
+ group=section)
+ self.conf_fixture.config(group=section,
+ auth_plugin=uuid.uuid4().hex,
+ **self.TEST_VALS)
+
+ a = conf.load_from_conf_options(self.conf_fixture.conf, self.GROUP)
+ self.assertTestVals(a)
+
+ def test_plugins_are_all_opts(self):
+ manager = stevedore.ExtensionManager(base.PLUGIN_NAMESPACE,
+ invoke_on_load=False,
+ propagate_map_exceptions=True)
+
+ def inner(driver):
+ for p in driver.plugin.get_options():
+ self.assertIsInstance(p, cfg.Opt)
+
+ manager.map(inner)
+
+ def test_get_common(self):
+ opts = conf.get_common_conf_options()
+ for opt in opts:
+ self.assertIsInstance(opt, cfg.Opt)
+ self.assertEqual(2, len(opts))
+
+ def test_get_named(self):
+ loaded_opts = conf.get_plugin_options('v2password')
+ plugin_opts = v2_auth.Password.get_options()
+
+ self.assertEqual(plugin_opts, loaded_opts)
diff --git a/keystoneclient/tests/unit/auth/test_identity_common.py b/keystoneclient/tests/unit/auth/test_identity_common.py
new file mode 100644
index 0000000..db30bea
--- /dev/null
+++ b/keystoneclient/tests/unit/auth/test_identity_common.py
@@ -0,0 +1,422 @@
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import abc
+import datetime
+import uuid
+
+from oslo_utils import timeutils
+import six
+
+from keystoneclient import access
+from keystoneclient.auth import base
+from keystoneclient.auth import identity
+from keystoneclient import fixture
+from keystoneclient import session
+from keystoneclient.tests.unit import utils
+
+
+@six.add_metaclass(abc.ABCMeta)
+class CommonIdentityTests(object):
+
+ TEST_ROOT_URL = 'http://127.0.0.1:5000/'
+ TEST_ROOT_ADMIN_URL = 'http://127.0.0.1:35357/'
+
+ TEST_COMPUTE_PUBLIC = 'http://nova/novapi/public'
+ TEST_COMPUTE_INTERNAL = 'http://nova/novapi/internal'
+ TEST_COMPUTE_ADMIN = 'http://nova/novapi/admin'
+
+ TEST_PASS = uuid.uuid4().hex
+
+ def setUp(self):
+ super(CommonIdentityTests, self).setUp()
+
+ self.TEST_URL = '%s%s' % (self.TEST_ROOT_URL, self.version)
+ self.TEST_ADMIN_URL = '%s%s' % (self.TEST_ROOT_ADMIN_URL, self.version)
+ self.TEST_DISCOVERY = fixture.DiscoveryList(href=self.TEST_ROOT_URL)
+
+ self.stub_auth_data()
+
+ @abc.abstractmethod
+ def create_auth_plugin(self, **kwargs):
+ """Create an auth plugin that makes sense for the auth data.
+
+ It doesn't really matter what auth mechanism is used but it should be
+ appropriate to the API version.
+ """
+
+ @abc.abstractmethod
+ def get_auth_data(self, **kwargs):
+ """Return fake authentication data.
+
+ This should register a valid token response and ensure that the compute
+ endpoints are set to TEST_COMPUTE_PUBLIC, _INTERNAL and _ADMIN.
+ """
+
+ def stub_auth_data(self, **kwargs):
+ token = self.get_auth_data(**kwargs)
+ self.user_id = token.user_id
+
+ try:
+ self.project_id = token.project_id
+ except AttributeError:
+ self.project_id = token.tenant_id
+
+ self.stub_auth(json=token)
+
+ @abc.abstractproperty
+ def version(self):
+ """The API version being tested."""
+
+ def test_discovering(self):
+ self.stub_url('GET', [],
+ base_url=self.TEST_COMPUTE_ADMIN,
+ json=self.TEST_DISCOVERY)
+
+ body = 'SUCCESS'
+
+ # which gives our sample values
+ self.stub_url('GET', ['path'], text=body)
+
+ a = self.create_auth_plugin()
+ s = session.Session(auth=a)
+
+ resp = s.get('/path', endpoint_filter={'service_type': 'compute',
+ 'interface': 'admin',
+ 'version': self.version})
+
+ self.assertEqual(200, resp.status_code)
+ self.assertEqual(body, resp.text)
+
+ new_body = 'SC SUCCESS'
+ # if we don't specify a version, we use the URL from the SC
+ self.stub_url('GET', ['path'],
+ base_url=self.TEST_COMPUTE_ADMIN,
+ text=new_body)
+
+ resp = s.get('/path', endpoint_filter={'service_type': 'compute',
+ 'interface': 'admin'})
+
+ self.assertEqual(200, resp.status_code)
+ self.assertEqual(new_body, resp.text)
+
+ def test_discovery_uses_session_cache(self):
+ # register responses such that if the discovery URL is hit more than
+ # once then the response will be invalid and not point to COMPUTE_ADMIN
+ resps = [{'json': self.TEST_DISCOVERY}, {'status_code': 500}]
+ self.requests.get(self.TEST_COMPUTE_ADMIN, resps)
+
+ body = 'SUCCESS'
+ self.stub_url('GET', ['path'], text=body)
+
+ # now either of the two plugins I use, it should not cause a second
+ # request to the discovery url.
+ s = session.Session()
+ a = self.create_auth_plugin()
+ b = self.create_auth_plugin()
+
+ for auth in (a, b):
+ resp = s.get('/path',
+ auth=auth,
+ endpoint_filter={'service_type': 'compute',
+ 'interface': 'admin',
+ 'version': self.version})
+
+ self.assertEqual(200, resp.status_code)
+ self.assertEqual(body, resp.text)
+
+ def test_discovery_uses_plugin_cache(self):
+ # register responses such that if the discovery URL is hit more than
+ # once then the response will be invalid and not point to COMPUTE_ADMIN
+ resps = [{'json': self.TEST_DISCOVERY}, {'status_code': 500}]
+ self.requests.get(self.TEST_COMPUTE_ADMIN, resps)
+
+ body = 'SUCCESS'
+ self.stub_url('GET', ['path'], text=body)
+
+ # now either of the two sessions I use, it should not cause a second
+ # request to the discovery url.
+ sa = session.Session()
+ sb = session.Session()
+ auth = self.create_auth_plugin()
+
+ for sess in (sa, sb):
+ resp = sess.get('/path',
+ auth=auth,
+ endpoint_filter={'service_type': 'compute',
+ 'interface': 'admin',
+ 'version': self.version})
+
+ self.assertEqual(200, resp.status_code)
+ self.assertEqual(body, resp.text)
+
+ def test_discovering_with_no_data(self):
+ # which returns discovery information pointing to TEST_URL but there is
+ # no data there.
+ self.stub_url('GET', [],
+ base_url=self.TEST_COMPUTE_ADMIN,
+ status_code=400)
+
+ # so the url that will be used is the same TEST_COMPUTE_ADMIN
+ body = 'SUCCESS'
+ self.stub_url('GET', ['path'], base_url=self.TEST_COMPUTE_ADMIN,
+ text=body, status_code=200)
+
+ a = self.create_auth_plugin()
+ s = session.Session(auth=a)
+
+ resp = s.get('/path', endpoint_filter={'service_type': 'compute',
+ 'interface': 'admin',
+ 'version': self.version})
+
+ self.assertEqual(200, resp.status_code)
+ self.assertEqual(body, resp.text)
+
+ def test_asking_for_auth_endpoint_ignores_checks(self):
+ a = self.create_auth_plugin()
+ s = session.Session(auth=a)
+
+ auth_url = s.get_endpoint(service_type='compute',
+ interface=base.AUTH_INTERFACE)
+
+ self.assertEqual(self.TEST_URL, auth_url)
+
+ def _create_expired_auth_plugin(self, **kwargs):
+ expires = timeutils.utcnow() - datetime.timedelta(minutes=20)
+ expired_token = self.get_auth_data(expires=expires)
+ expired_auth_ref = access.AccessInfo.factory(body=expired_token)
+
+ body = 'SUCCESS'
+ self.stub_url('GET', ['path'],
+ base_url=self.TEST_COMPUTE_ADMIN, text=body)
+
+ a = self.create_auth_plugin(**kwargs)
+ a.auth_ref = expired_auth_ref
+ return a
+
+ def test_reauthenticate(self):
+ a = self._create_expired_auth_plugin()
+ expired_auth_ref = a.auth_ref
+ s = session.Session(auth=a)
+ self.assertIsNot(expired_auth_ref, a.get_access(s))
+
+ def test_no_reauthenticate(self):
+ a = self._create_expired_auth_plugin(reauthenticate=False)
+ expired_auth_ref = a.auth_ref
+ s = session.Session(auth=a)
+ self.assertIs(expired_auth_ref, a.get_access(s))
+
+ def test_invalidate(self):
+ a = self.create_auth_plugin()
+ s = session.Session(auth=a)
+
+ # trigger token fetching
+ s.get_auth_headers()
+
+ self.assertTrue(a.auth_ref)
+ self.assertTrue(a.invalidate())
+ self.assertIsNone(a.auth_ref)
+ self.assertFalse(a.invalidate())
+
+ def test_get_auth_properties(self):
+ a = self.create_auth_plugin()
+ s = session.Session()
+
+ self.assertEqual(self.user_id, a.get_user_id(s))
+ self.assertEqual(self.project_id, a.get_project_id(s))
+
+
+class V3(CommonIdentityTests, utils.TestCase):
+
+ @property
+ def version(self):
+ return 'v3'
+
+ def get_auth_data(self, **kwargs):
+ token = fixture.V3Token(**kwargs)
+ region = 'RegionOne'
+
+ svc = token.add_service('identity')
+ svc.add_standard_endpoints(admin=self.TEST_ADMIN_URL, region=region)
+
+ svc = token.add_service('compute')
+ svc.add_standard_endpoints(admin=self.TEST_COMPUTE_ADMIN,
+ public=self.TEST_COMPUTE_PUBLIC,
+ internal=self.TEST_COMPUTE_INTERNAL,
+ region=region)
+
+ return token
+
+ def stub_auth(self, subject_token=None, **kwargs):
+ if not subject_token:
+ subject_token = self.TEST_TOKEN
+
+ kwargs.setdefault('headers', {})['X-Subject-Token'] = subject_token
+ self.stub_url('POST', ['auth', 'tokens'], **kwargs)
+
+ def create_auth_plugin(self, **kwargs):
+ kwargs.setdefault('auth_url', self.TEST_URL)
+ kwargs.setdefault('username', self.TEST_USER)
+ kwargs.setdefault('password', self.TEST_PASS)
+ return identity.V3Password(**kwargs)
+
+
+class V2(CommonIdentityTests, utils.TestCase):
+
+ @property
+ def version(self):
+ return 'v2.0'
+
+ def create_auth_plugin(self, **kwargs):
+ kwargs.setdefault('auth_url', self.TEST_URL)
+ kwargs.setdefault('username', self.TEST_USER)
+ kwargs.setdefault('password', self.TEST_PASS)
+ return identity.V2Password(**kwargs)
+
+ def get_auth_data(self, **kwargs):
+ token = fixture.V2Token(**kwargs)
+ region = 'RegionOne'
+
+ svc = token.add_service('identity')
+ svc.add_endpoint(self.TEST_ADMIN_URL, region=region)
+
+ svc = token.add_service('compute')
+ svc.add_endpoint(public=self.TEST_COMPUTE_PUBLIC,
+ internal=self.TEST_COMPUTE_INTERNAL,
+ admin=self.TEST_COMPUTE_ADMIN,
+ region=region)
+
+ return token
+
+ def stub_auth(self, **kwargs):
+ self.stub_url('POST', ['tokens'], **kwargs)
+
+
+class CatalogHackTests(utils.TestCase):
+
+ TEST_URL = 'http://keystone.server:5000/v2.0'
+ OTHER_URL = 'http://other.server:5000/path'
+
+ IDENTITY = 'identity'
+
+ BASE_URL = 'http://keystone.server:5000/'
+ V2_URL = BASE_URL + 'v2.0'
+ V3_URL = BASE_URL + 'v3'
+
+ def test_getting_endpoints(self):
+ disc = fixture.DiscoveryList(href=self.BASE_URL)
+ self.stub_url('GET',
+ ['/'],
+ base_url=self.BASE_URL,
+ json=disc)
+
+ token = fixture.V2Token()
+ service = token.add_service(self.IDENTITY)
+ service.add_endpoint(public=self.V2_URL,
+ admin=self.V2_URL,
+ internal=self.V2_URL)
+
+ self.stub_url('POST',
+ ['tokens'],
+ base_url=self.V2_URL,
+ json=token)
+
+ v2_auth = identity.V2Password(self.V2_URL,
+ username=uuid.uuid4().hex,
+ password=uuid.uuid4().hex)
+
+ sess = session.Session(auth=v2_auth)
+
+ endpoint = sess.get_endpoint(service_type=self.IDENTITY,
+ interface='public',
+ version=(3, 0))
+
+ self.assertEqual(self.V3_URL, endpoint)
+
+ def test_returns_original_when_discover_fails(self):
+ token = fixture.V2Token()
+ service = token.add_service(self.IDENTITY)
+ service.add_endpoint(public=self.V2_URL,
+ admin=self.V2_URL,
+ internal=self.V2_URL)
+
+ self.stub_url('POST',
+ ['tokens'],
+ base_url=self.V2_URL,
+ json=token)
+
+ self.stub_url('GET', [], base_url=self.BASE_URL, status_code=404)
+
+ v2_auth = identity.V2Password(self.V2_URL,
+ username=uuid.uuid4().hex,
+ password=uuid.uuid4().hex)
+
+ sess = session.Session(auth=v2_auth)
+
+ endpoint = sess.get_endpoint(service_type=self.IDENTITY,
+ interface='public',
+ version=(3, 0))
+
+ self.assertEqual(self.V2_URL, endpoint)
+
+
+class GenericPlugin(base.BaseAuthPlugin):
+
+ BAD_TOKEN = uuid.uuid4().hex
+
+ def __init__(self):
+ super(GenericPlugin, self).__init__()
+
+ self.endpoint = 'http://keystone.host:5000'
+
+ self.headers = {'headerA': 'valueA',
+ 'headerB': 'valueB'}
+
+ def url(self, prefix):
+ return '%s/%s' % (self.endpoint, prefix)
+
+ def get_token(self, session, **kwargs):
+ # NOTE(jamielennox): by specifying get_headers this should not be used
+ return self.BAD_TOKEN
+
+ def get_headers(self, session, **kwargs):
+ return self.headers
+
+ def get_endpoint(self, session, **kwargs):
+ return self.endpoint
+
+
+class GenericAuthPluginTests(utils.TestCase):
+
+ # filter doesn't matter to GenericPlugin, but we have to specify one
+ ENDPOINT_FILTER = {uuid.uuid4().hex: uuid.uuid4().hex}
+
+ def setUp(self):
+ super(GenericAuthPluginTests, self).setUp()
+ self.auth = GenericPlugin()
+ self.session = session.Session(auth=self.auth)
+
+ def test_setting_headers(self):
+ text = uuid.uuid4().hex
+ self.stub_url('GET', base_url=self.auth.url('prefix'), text=text)
+
+ resp = self.session.get('prefix', endpoint_filter=self.ENDPOINT_FILTER)
+
+ self.assertEqual(text, resp.text)
+
+ for k, v in six.iteritems(self.auth.headers):
+ self.assertRequestHeaderEqual(k, v)
+
+ self.assertIsNone(self.session.get_token())
+ self.assertEqual(self.auth.headers,
+ self.session.get_auth_headers())
+ self.assertNotIn('X-Auth-Token', self.requests.last_request.headers)
diff --git a/keystoneclient/tests/unit/auth/test_identity_v2.py b/keystoneclient/tests/unit/auth/test_identity_v2.py
new file mode 100644
index 0000000..6d432a7
--- /dev/null
+++ b/keystoneclient/tests/unit/auth/test_identity_v2.py
@@ -0,0 +1,295 @@
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import copy
+import uuid
+
+from keystoneclient.auth.identity import v2
+from keystoneclient import exceptions
+from keystoneclient import session
+from keystoneclient.tests.unit import utils
+
+
+class V2IdentityPlugin(utils.TestCase):
+
+ TEST_ROOT_URL = 'http://127.0.0.1:5000/'
+ TEST_URL = '%s%s' % (TEST_ROOT_URL, 'v2.0')
+ TEST_ROOT_ADMIN_URL = 'http://127.0.0.1:35357/'
+ TEST_ADMIN_URL = '%s%s' % (TEST_ROOT_ADMIN_URL, 'v2.0')
+
+ TEST_PASS = 'password'
+
+ TEST_SERVICE_CATALOG = [{
+ "endpoints": [{
+ "adminURL": "http://cdn.admin-nets.local:8774/v1.0",
+ "region": "RegionOne",
+ "internalURL": "http://127.0.0.1:8774/v1.0",
+ "publicURL": "http://cdn.admin-nets.local:8774/v1.0/"
+ }],
+ "type": "nova_compat",
+ "name": "nova_compat"
+ }, {
+ "endpoints": [{
+ "adminURL": "http://nova/novapi/admin",
+ "region": "RegionOne",
+ "internalURL": "http://nova/novapi/internal",
+ "publicURL": "http://nova/novapi/public"
+ }],
+ "type": "compute",
+ "name": "nova"
+ }, {
+ "endpoints": [{
+ "adminURL": "http://glance/glanceapi/admin",
+ "region": "RegionOne",
+ "internalURL": "http://glance/glanceapi/internal",
+ "publicURL": "http://glance/glanceapi/public"
+ }],
+ "type": "image",
+ "name": "glance"
+ }, {
+ "endpoints": [{
+ "adminURL": TEST_ADMIN_URL,
+ "region": "RegionOne",
+ "internalURL": "http://127.0.0.1:5000/v2.0",
+ "publicURL": "http://127.0.0.1:5000/v2.0"
+ }],
+ "type": "identity",
+ "name": "keystone"
+ }, {
+ "endpoints": [{
+ "adminURL": "http://swift/swiftapi/admin",
+ "region": "RegionOne",
+ "internalURL": "http://swift/swiftapi/internal",
+ "publicURL": "http://swift/swiftapi/public"
+ }],
+ "type": "object-store",
+ "name": "swift"
+ }]
+
+ def setUp(self):
+ super(V2IdentityPlugin, self).setUp()
+ self.TEST_RESPONSE_DICT = {
+ "access": {
+ "token": {
+ "expires": "2020-01-01T00:00:10.000123Z",
+ "id": self.TEST_TOKEN,
+ "tenant": {
+ "id": self.TEST_TENANT_ID
+ },
+ },
+ "user": {
+ "id": self.TEST_USER
+ },
+ "serviceCatalog": self.TEST_SERVICE_CATALOG,
+ },
+ }
+
+ def stub_auth(self, **kwargs):
+ self.stub_url('POST', ['tokens'], **kwargs)
+
+ def test_authenticate_with_username_password(self):
+ self.stub_auth(json=self.TEST_RESPONSE_DICT)
+ a = v2.Password(self.TEST_URL, username=self.TEST_USER,
+ password=self.TEST_PASS)
+ self.assertIsNone(a.user_id)
+ s = session.Session(a)
+ self.assertEqual({'X-Auth-Token': self.TEST_TOKEN},
+ s.get_auth_headers())
+
+ req = {'auth': {'passwordCredentials': {'username': self.TEST_USER,
+ 'password': self.TEST_PASS}}}
+ self.assertRequestBodyIs(json=req)
+ self.assertRequestHeaderEqual('Content-Type', 'application/json')
+ self.assertRequestHeaderEqual('Accept', 'application/json')
+ self.assertEqual(s.auth.auth_ref.auth_token, self.TEST_TOKEN)
+
+ def test_authenticate_with_user_id_password(self):
+ self.stub_auth(json=self.TEST_RESPONSE_DICT)
+ a = v2.Password(self.TEST_URL, user_id=self.TEST_USER,
+ password=self.TEST_PASS)
+ self.assertIsNone(a.username)
+ s = session.Session(a)
+ self.assertEqual({'X-Auth-Token': self.TEST_TOKEN},
+ s.get_auth_headers())
+
+ req = {'auth': {'passwordCredentials': {'userId': self.TEST_USER,
+ 'password': self.TEST_PASS}}}
+ self.assertRequestBodyIs(json=req)
+ self.assertRequestHeaderEqual('Content-Type', 'application/json')
+ self.assertRequestHeaderEqual('Accept', 'application/json')
+ self.assertEqual(s.auth.auth_ref.auth_token, self.TEST_TOKEN)
+
+ def test_authenticate_with_username_password_scoped(self):
+ self.stub_auth(json=self.TEST_RESPONSE_DICT)
+ a = v2.Password(self.TEST_URL, username=self.TEST_USER,
+ password=self.TEST_PASS, tenant_id=self.TEST_TENANT_ID)
+ self.assertIsNone(a.user_id)
+ s = session.Session(a)
+ self.assertEqual({'X-Auth-Token': self.TEST_TOKEN},
+ s.get_auth_headers())
+
+ req = {'auth': {'passwordCredentials': {'username': self.TEST_USER,
+ 'password': self.TEST_PASS},
+ 'tenantId': self.TEST_TENANT_ID}}
+ self.assertRequestBodyIs(json=req)
+ self.assertEqual(s.auth.auth_ref.auth_token, self.TEST_TOKEN)
+
+ def test_authenticate_with_user_id_password_scoped(self):
+ self.stub_auth(json=self.TEST_RESPONSE_DICT)
+ a = v2.Password(self.TEST_URL, user_id=self.TEST_USER,
+ password=self.TEST_PASS, tenant_id=self.TEST_TENANT_ID)
+ self.assertIsNone(a.username)
+ s = session.Session(a)
+ self.assertEqual({'X-Auth-Token': self.TEST_TOKEN},
+ s.get_auth_headers())
+
+ req = {'auth': {'passwordCredentials': {'userId': self.TEST_USER,
+ 'password': self.TEST_PASS},
+ 'tenantId': self.TEST_TENANT_ID}}
+ self.assertRequestBodyIs(json=req)
+ self.assertEqual(s.auth.auth_ref.auth_token, self.TEST_TOKEN)
+
+ def test_authenticate_with_token(self):
+ self.stub_auth(json=self.TEST_RESPONSE_DICT)
+ a = v2.Token(self.TEST_URL, 'foo')
+ s = session.Session(a)
+ self.assertEqual({'X-Auth-Token': self.TEST_TOKEN},
+ s.get_auth_headers())
+
+ req = {'auth': {'token': {'id': 'foo'}}}
+ self.assertRequestBodyIs(json=req)
+ self.assertRequestHeaderEqual('x-Auth-Token', 'foo')
+ self.assertRequestHeaderEqual('Content-Type', 'application/json')
+ self.assertRequestHeaderEqual('Accept', 'application/json')
+ self.assertEqual(s.auth.auth_ref.auth_token, self.TEST_TOKEN)
+
+ def test_with_trust_id(self):
+ self.stub_auth(json=self.TEST_RESPONSE_DICT)
+ a = v2.Password(self.TEST_URL, username=self.TEST_USER,
+ password=self.TEST_PASS, trust_id='trust')
+ s = session.Session(a)
+ self.assertEqual({'X-Auth-Token': self.TEST_TOKEN},
+ s.get_auth_headers())
+
+ req = {'auth': {'passwordCredentials': {'username': self.TEST_USER,
+ 'password': self.TEST_PASS},
+ 'trust_id': 'trust'}}
+
+ self.assertRequestBodyIs(json=req)
+ self.assertEqual(s.auth.auth_ref.auth_token, self.TEST_TOKEN)
+
+ def _do_service_url_test(self, base_url, endpoint_filter):
+ self.stub_auth(json=self.TEST_RESPONSE_DICT)
+ self.stub_url('GET', ['path'],
+ base_url=base_url,
+ text='SUCCESS', status_code=200)
+
+ a = v2.Password(self.TEST_URL, username=self.TEST_USER,
+ password=self.TEST_PASS)
+ s = session.Session(auth=a)
+
+ resp = s.get('/path', endpoint_filter=endpoint_filter)
+
+ self.assertEqual(resp.status_code, 200)
+ self.assertEqual(self.requests.last_request.url, base_url + '/path')
+
+ def test_service_url(self):
+ endpoint_filter = {'service_type': 'compute',
+ 'interface': 'admin',
+ 'service_name': 'nova'}
+ self._do_service_url_test('http://nova/novapi/admin', endpoint_filter)
+
+ def test_service_url_defaults_to_public(self):
+ endpoint_filter = {'service_type': 'compute'}
+ self._do_service_url_test('http://nova/novapi/public', endpoint_filter)
+
+ def test_endpoint_filter_without_service_type_fails(self):
+ self.stub_auth(json=self.TEST_RESPONSE_DICT)
+
+ a = v2.Password(self.TEST_URL, username=self.TEST_USER,
+ password=self.TEST_PASS)
+ s = session.Session(auth=a)
+
+ self.assertRaises(exceptions.EndpointNotFound, s.get, '/path',
+ endpoint_filter={'interface': 'admin'})
+
+ def test_full_url_overrides_endpoint_filter(self):
+ self.stub_auth(json=self.TEST_RESPONSE_DICT)
+ self.stub_url('GET', [],
+ base_url='http://testurl/',
+ text='SUCCESS', status_code=200)
+
+ a = v2.Password(self.TEST_URL, username=self.TEST_USER,
+ password=self.TEST_PASS)
+ s = session.Session(auth=a)
+
+ resp = s.get('http://testurl/',
+ endpoint_filter={'service_type': 'compute'})
+ self.assertEqual(resp.status_code, 200)
+ self.assertEqual(resp.text, 'SUCCESS')
+
+ def test_invalid_auth_response_dict(self):
+ self.stub_auth(json={'hello': 'world'})
+
+ a = v2.Password(self.TEST_URL, username=self.TEST_USER,
+ password=self.TEST_PASS)
+ s = session.Session(auth=a)
+
+ self.assertRaises(exceptions.InvalidResponse, s.get, 'http://any',
+ authenticated=True)
+
+ def test_invalid_auth_response_type(self):
+ self.stub_url('POST', ['tokens'], text='testdata')
+
+ a = v2.Password(self.TEST_URL, username=self.TEST_USER,
+ password=self.TEST_PASS)
+ s = session.Session(auth=a)
+
+ self.assertRaises(exceptions.InvalidResponse, s.get, 'http://any',
+ authenticated=True)
+
+ def test_invalidate_response(self):
+ resp_data1 = copy.deepcopy(self.TEST_RESPONSE_DICT)
+ resp_data2 = copy.deepcopy(self.TEST_RESPONSE_DICT)
+
+ resp_data1['access']['token']['id'] = 'token1'
+ resp_data2['access']['token']['id'] = 'token2'
+
+ auth_responses = [{'json': resp_data1}, {'json': resp_data2}]
+ self.stub_auth(response_list=auth_responses)
+
+ a = v2.Password(self.TEST_URL, username=self.TEST_USER,
+ password=self.TEST_PASS)
+ s = session.Session(auth=a)
+
+ self.assertEqual('token1', s.get_token())
+ self.assertEqual({'X-Auth-Token': 'token1'}, s.get_auth_headers())
+
+ a.invalidate()
+ self.assertEqual('token2', s.get_token())
+ self.assertEqual({'X-Auth-Token': 'token2'}, s.get_auth_headers())
+
+ def test_doesnt_log_password(self):
+ self.stub_auth(json=self.TEST_RESPONSE_DICT)
+ password = uuid.uuid4().hex
+
+ a = v2.Password(self.TEST_URL, username=self.TEST_USER,
+ password=password)
+ s = session.Session(auth=a)
+ self.assertEqual(self.TEST_TOKEN, s.get_token())
+ self.assertEqual({'X-Auth-Token': self.TEST_TOKEN},
+ s.get_auth_headers())
+ self.assertNotIn(password, self.logger.output)
+
+ def test_password_with_no_user_id_or_name(self):
+ self.assertRaises(TypeError,
+ v2.Password, self.TEST_URL, password=self.TEST_PASS)
diff --git a/keystoneclient/tests/unit/auth/test_identity_v3.py b/keystoneclient/tests/unit/auth/test_identity_v3.py
new file mode 100644
index 0000000..29cbb0e
--- /dev/null
+++ b/keystoneclient/tests/unit/auth/test_identity_v3.py
@@ -0,0 +1,490 @@
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import copy
+import uuid
+
+from keystoneclient import access
+from keystoneclient.auth.identity import v3
+from keystoneclient import client
+from keystoneclient import exceptions
+from keystoneclient import fixture
+from keystoneclient import session
+from keystoneclient.tests.unit import utils
+
+
+class V3IdentityPlugin(utils.TestCase):
+
+ TEST_ROOT_URL = 'http://127.0.0.1:5000/'
+ TEST_URL = '%s%s' % (TEST_ROOT_URL, 'v3')
+ TEST_ROOT_ADMIN_URL = 'http://127.0.0.1:35357/'
+ TEST_ADMIN_URL = '%s%s' % (TEST_ROOT_ADMIN_URL, 'v3')
+
+ TEST_PASS = 'password'
+
+ TEST_SERVICE_CATALOG = [{
+ "endpoints": [{
+ "url": "http://cdn.admin-nets.local:8774/v1.0/",
+ "region": "RegionOne",
+ "interface": "public"
+ }, {
+ "url": "http://127.0.0.1:8774/v1.0",
+ "region": "RegionOne",
+ "interface": "internal"
+ }, {
+ "url": "http://cdn.admin-nets.local:8774/v1.0",
+ "region": "RegionOne",
+ "interface": "admin"
+ }],
+ "type": "nova_compat"
+ }, {
+ "endpoints": [{
+ "url": "http://nova/novapi/public",
+ "region": "RegionOne",
+ "interface": "public"
+ }, {
+ "url": "http://nova/novapi/internal",
+ "region": "RegionOne",
+ "interface": "internal"
+ }, {
+ "url": "http://nova/novapi/admin",
+ "region": "RegionOne",
+ "interface": "admin"
+ }],
+ "type": "compute",
+ "name": "nova",
+ }, {
+ "endpoints": [{
+ "url": "http://glance/glanceapi/public",
+ "region": "RegionOne",
+ "interface": "public"
+ }, {
+ "url": "http://glance/glanceapi/internal",
+ "region": "RegionOne",
+ "interface": "internal"
+ }, {
+ "url": "http://glance/glanceapi/admin",
+ "region": "RegionOne",
+ "interface": "admin"
+ }],
+ "type": "image",
+ "name": "glance"
+ }, {
+ "endpoints": [{
+ "url": "http://127.0.0.1:5000/v3",
+ "region": "RegionOne",
+ "interface": "public"
+ }, {
+ "url": "http://127.0.0.1:5000/v3",
+ "region": "RegionOne",
+ "interface": "internal"
+ }, {
+ "url": TEST_ADMIN_URL,
+ "region": "RegionOne",
+ "interface": "admin"
+ }],
+ "type": "identity"
+ }, {
+ "endpoints": [{
+ "url": "http://swift/swiftapi/public",
+ "region": "RegionOne",
+ "interface": "public"
+ }, {
+ "url": "http://swift/swiftapi/internal",
+ "region": "RegionOne",
+ "interface": "internal"
+ }, {
+ "url": "http://swift/swiftapi/admin",
+ "region": "RegionOne",
+ "interface": "admin"
+ }],
+ "type": "object-store"
+ }]
+
+ def setUp(self):
+ super(V3IdentityPlugin, self).setUp()
+
+ V3_URL = "%sv3" % self.TEST_URL
+ self.TEST_DISCOVERY_RESPONSE = {
+ 'versions': {'values': [fixture.V3Discovery(V3_URL)]}}
+
+ self.TEST_RESPONSE_DICT = {
+ "token": {
+ "methods": [
+ "token",
+ "password"
+ ],
+
+ "expires_at": "2020-01-01T00:00:10.000123Z",
+ "project": {
+ "domain": {
+ "id": self.TEST_DOMAIN_ID,
+ "name": self.TEST_DOMAIN_NAME
+ },
+ "id": self.TEST_TENANT_ID,
+ "name": self.TEST_TENANT_NAME
+ },
+ "user": {
+ "domain": {
+ "id": self.TEST_DOMAIN_ID,
+ "name": self.TEST_DOMAIN_NAME
+ },
+ "id": self.TEST_USER,
+ "name": self.TEST_USER
+ },
+ "issued_at": "2013-05-29T16:55:21.468960Z",
+ "catalog": self.TEST_SERVICE_CATALOG
+ },
+ }
+ self.TEST_PROJECTS_RESPONSE = {
+ "projects": [
+ {
+ "domain_id": "1789d1",
+ "enabled": "True",
+ "id": "263fd9",
+ "links": {
+ "self": "https://identity:5000/v3/projects/263fd9"
+ },
+ "name": "Dev Group A"
+ },
+ {
+ "domain_id": "1789d1",
+ "enabled": "True",
+ "id": "e56ad3",
+ "links": {
+ "self": "https://identity:5000/v3/projects/e56ad3"
+ },
+ "name": "Dev Group B"
+ }
+ ],
+ "links": {
+ "self": "https://identity:5000/v3/projects",
+ }
+ }
+
+ def stub_auth(self, subject_token=None, **kwargs):
+ if not subject_token:
+ subject_token = self.TEST_TOKEN
+
+ self.stub_url('POST', ['auth', 'tokens'],
+ headers={'X-Subject-Token': subject_token}, **kwargs)
+
+ def test_authenticate_with_username_password(self):
+ self.stub_auth(json=self.TEST_RESPONSE_DICT)
+ a = v3.Password(self.TEST_URL,
+ username=self.TEST_USER,
+ password=self.TEST_PASS)
+ s = session.Session(auth=a)
+
+ self.assertEqual({'X-Auth-Token': self.TEST_TOKEN},
+ s.get_auth_headers())
+
+ req = {'auth': {'identity':
+ {'methods': ['password'],
+ 'password': {'user': {'name': self.TEST_USER,
+ 'password': self.TEST_PASS}}}}}
+
+ self.assertRequestBodyIs(json=req)
+ self.assertRequestHeaderEqual('Content-Type', 'application/json')
+ self.assertRequestHeaderEqual('Accept', 'application/json')
+ self.assertEqual(s.auth.auth_ref.auth_token, self.TEST_TOKEN)
+
+ def test_authenticate_with_username_password_unscoped(self):
+ del self.TEST_RESPONSE_DICT['token']['catalog']
+ del self.TEST_RESPONSE_DICT['token']['project']
+
+ self.stub_auth(json=self.TEST_RESPONSE_DICT)
+ self.stub_url(method="GET", json=self.TEST_DISCOVERY_RESPONSE)
+ test_user_id = self.TEST_RESPONSE_DICT['token']['user']['id']
+ self.stub_url(method="GET",
+ json=self.TEST_PROJECTS_RESPONSE,
+ parts=['users', test_user_id, 'projects'])
+
+ a = v3.Password(self.TEST_URL,
+ username=self.TEST_USER,
+ password=self.TEST_PASS)
+ s = session.Session(auth=a)
+ cs = client.Client(session=s, auth_url=self.TEST_URL)
+
+ # As a sanity check on the auth_ref, make sure client has the
+ # proper user id, that it fetches the right project response
+ self.assertEqual(test_user_id, a.auth_ref.user_id)
+ t = cs.projects.list(user=a.auth_ref.user_id)
+ self.assertEqual(2, len(t))
+
+ def test_authenticate_with_username_password_domain_scoped(self):
+ self.stub_auth(json=self.TEST_RESPONSE_DICT)
+ a = v3.Password(self.TEST_URL, username=self.TEST_USER,
+ password=self.TEST_PASS, domain_id=self.TEST_DOMAIN_ID)
+ s = session.Session(a)
+
+ self.assertEqual({'X-Auth-Token': self.TEST_TOKEN},
+ s.get_auth_headers())
+
+ req = {'auth': {'identity':
+ {'methods': ['password'],
+ 'password': {'user': {'name': self.TEST_USER,
+ 'password': self.TEST_PASS}}},
+ 'scope': {'domain': {'id': self.TEST_DOMAIN_ID}}}}
+ self.assertRequestBodyIs(json=req)
+ self.assertEqual(s.auth.auth_ref.auth_token, self.TEST_TOKEN)
+
+ def test_authenticate_with_username_password_project_scoped(self):
+ self.stub_auth(json=self.TEST_RESPONSE_DICT)
+ a = v3.Password(self.TEST_URL, username=self.TEST_USER,
+ password=self.TEST_PASS,
+ project_id=self.TEST_DOMAIN_ID)
+ s = session.Session(a)
+
+ self.assertEqual({'X-Auth-Token': self.TEST_TOKEN},
+ s.get_auth_headers())
+
+ req = {'auth': {'identity':
+ {'methods': ['password'],
+ 'password': {'user': {'name': self.TEST_USER,
+ 'password': self.TEST_PASS}}},
+ 'scope': {'project': {'id': self.TEST_DOMAIN_ID}}}}
+ self.assertRequestBodyIs(json=req)
+ self.assertEqual(s.auth.auth_ref.auth_token, self.TEST_TOKEN)
+ self.assertEqual(s.auth.auth_ref.project_id, self.TEST_DOMAIN_ID)
+
+ def test_authenticate_with_token(self):
+ self.stub_auth(json=self.TEST_RESPONSE_DICT)
+ a = v3.Token(self.TEST_URL, self.TEST_TOKEN)
+ s = session.Session(auth=a)
+
+ self.assertEqual({'X-Auth-Token': self.TEST_TOKEN},
+ s.get_auth_headers())
+
+ req = {'auth': {'identity':
+ {'methods': ['token'],
+ 'token': {'id': self.TEST_TOKEN}}}}
+
+ self.assertRequestBodyIs(json=req)
+
+ self.assertRequestHeaderEqual('Content-Type', 'application/json')
+ self.assertRequestHeaderEqual('Accept', 'application/json')
+ self.assertEqual(s.auth.auth_ref.auth_token, self.TEST_TOKEN)
+
+ def test_with_expired(self):
+ self.stub_auth(json=self.TEST_RESPONSE_DICT)
+
+ d = copy.deepcopy(self.TEST_RESPONSE_DICT)
+ d['token']['expires_at'] = '2000-01-01T00:00:10.000123Z'
+
+ a = v3.Password(self.TEST_URL, username='username',
+ password='password')
+ a.auth_ref = access.AccessInfo.factory(body=d)
+ s = session.Session(auth=a)
+
+ self.assertEqual({'X-Auth-Token': self.TEST_TOKEN},
+ s.get_auth_headers())
+
+ self.assertEqual(a.auth_ref['expires_at'],
+ self.TEST_RESPONSE_DICT['token']['expires_at'])
+
+ def test_with_domain_and_project_scoping(self):
+ a = v3.Password(self.TEST_URL, username='username',
+ password='password', project_id='project',
+ domain_id='domain')
+
+ self.assertRaises(exceptions.AuthorizationFailure,
+ a.get_token, None)
+ self.assertRaises(exceptions.AuthorizationFailure,
+ a.get_headers, None)
+
+ def test_with_trust_id(self):
+ self.stub_auth(json=self.TEST_RESPONSE_DICT)
+ a = v3.Password(self.TEST_URL, username=self.TEST_USER,
+ password=self.TEST_PASS, trust_id='trust')
+ s = session.Session(a)
+
+ self.assertEqual({'X-Auth-Token': self.TEST_TOKEN},
+ s.get_auth_headers())
+
+ req = {'auth': {'identity':
+ {'methods': ['password'],
+ 'password': {'user': {'name': self.TEST_USER,
+ 'password': self.TEST_PASS}}},
+ 'scope': {'OS-TRUST:trust': {'id': 'trust'}}}}
+ self.assertRequestBodyIs(json=req)
+ self.assertEqual(s.auth.auth_ref.auth_token, self.TEST_TOKEN)
+
+ def test_with_multiple_mechanisms_factory(self):
+ self.stub_auth(json=self.TEST_RESPONSE_DICT)
+ p = v3.PasswordMethod(username=self.TEST_USER, password=self.TEST_PASS)
+ t = v3.TokenMethod(token='foo')
+ a = v3.Auth(self.TEST_URL, [p, t], trust_id='trust')
+ s = session.Session(a)
+
+ self.assertEqual({'X-Auth-Token': self.TEST_TOKEN},
+ s.get_auth_headers())
+
+ req = {'auth': {'identity':
+ {'methods': ['password', 'token'],
+ 'password': {'user': {'name': self.TEST_USER,
+ 'password': self.TEST_PASS}},
+ 'token': {'id': 'foo'}},
+ 'scope': {'OS-TRUST:trust': {'id': 'trust'}}}}
+ self.assertRequestBodyIs(json=req)
+ self.assertEqual(s.auth.auth_ref.auth_token, self.TEST_TOKEN)
+
+ def test_with_multiple_mechanisms(self):
+ self.stub_auth(json=self.TEST_RESPONSE_DICT)
+ p = v3.PasswordMethod(username=self.TEST_USER,
+ password=self.TEST_PASS)
+ t = v3.TokenMethod(token='foo')
+ a = v3.Auth(self.TEST_URL, [p, t], trust_id='trust')
+ s = session.Session(auth=a)
+
+ self.assertEqual({'X-Auth-Token': self.TEST_TOKEN},
+ s.get_auth_headers())
+
+ req = {'auth': {'identity':
+ {'methods': ['password', 'token'],
+ 'password': {'user': {'name': self.TEST_USER,
+ 'password': self.TEST_PASS}},
+ 'token': {'id': 'foo'}},
+ 'scope': {'OS-TRUST:trust': {'id': 'trust'}}}}
+ self.assertRequestBodyIs(json=req)
+ self.assertEqual(s.auth.auth_ref.auth_token, self.TEST_TOKEN)
+
+ def test_with_multiple_scopes(self):
+ s = session.Session()
+
+ a = v3.Password(self.TEST_URL,
+ username=self.TEST_USER, password=self.TEST_PASS,
+ domain_id='x', project_id='x')
+ self.assertRaises(exceptions.AuthorizationFailure, a.get_auth_ref, s)
+
+ a = v3.Password(self.TEST_URL,
+ username=self.TEST_USER, password=self.TEST_PASS,
+ domain_id='x', trust_id='x')
+ self.assertRaises(exceptions.AuthorizationFailure, a.get_auth_ref, s)
+
+ def _do_service_url_test(self, base_url, endpoint_filter):
+ self.stub_auth(json=self.TEST_RESPONSE_DICT)
+ self.stub_url('GET', ['path'],
+ base_url=base_url,
+ text='SUCCESS', status_code=200)
+
+ a = v3.Password(self.TEST_URL, username=self.TEST_USER,
+ password=self.TEST_PASS)
+ s = session.Session(auth=a)
+
+ resp = s.get('/path', endpoint_filter=endpoint_filter)
+
+ self.assertEqual(resp.status_code, 200)
+ self.assertEqual(self.requests.last_request.url, base_url + '/path')
+
+ def test_service_url(self):
+ endpoint_filter = {'service_type': 'compute',
+ 'interface': 'admin',
+ 'service_name': 'nova'}
+ self._do_service_url_test('http://nova/novapi/admin', endpoint_filter)
+
+ def test_service_url_defaults_to_public(self):
+ endpoint_filter = {'service_type': 'compute'}
+ self._do_service_url_test('http://nova/novapi/public', endpoint_filter)
+
+ def test_endpoint_filter_without_service_type_fails(self):
+ self.stub_auth(json=self.TEST_RESPONSE_DICT)
+
+ a = v3.Password(self.TEST_URL, username=self.TEST_USER,
+ password=self.TEST_PASS)
+ s = session.Session(auth=a)
+
+ self.assertRaises(exceptions.EndpointNotFound, s.get, '/path',
+ endpoint_filter={'interface': 'admin'})
+
+ def test_full_url_overrides_endpoint_filter(self):
+ self.stub_auth(json=self.TEST_RESPONSE_DICT)
+ self.stub_url('GET', [],
+ base_url='http://testurl/',
+ text='SUCCESS', status_code=200)
+
+ a = v3.Password(self.TEST_URL, username=self.TEST_USER,
+ password=self.TEST_PASS)
+ s = session.Session(auth=a)
+
+ resp = s.get('http://testurl/',
+ endpoint_filter={'service_type': 'compute'})
+ self.assertEqual(resp.status_code, 200)
+ self.assertEqual(resp.text, 'SUCCESS')
+
+ def test_invalid_auth_response_dict(self):
+ self.stub_auth(json={'hello': 'world'})
+
+ a = v3.Password(self.TEST_URL, username=self.TEST_USER,
+ password=self.TEST_PASS)
+ s = session.Session(auth=a)
+
+ self.assertRaises(exceptions.InvalidResponse, s.get, 'http://any',
+ authenticated=True)
+
+ def test_invalid_auth_response_type(self):
+ self.stub_url('POST', ['auth', 'tokens'], text='testdata')
+
+ a = v3.Password(self.TEST_URL, username=self.TEST_USER,
+ password=self.TEST_PASS)
+ s = session.Session(auth=a)
+
+ self.assertRaises(exceptions.InvalidResponse, s.get, 'http://any',
+ authenticated=True)
+
+ def test_invalidate_response(self):
+ auth_responses = [{'status_code': 200, 'json': self.TEST_RESPONSE_DICT,
+ 'headers': {'X-Subject-Token': 'token1'}},
+ {'status_code': 200, 'json': self.TEST_RESPONSE_DICT,
+ 'headers': {'X-Subject-Token': 'token2'}}]
+
+ self.requests.post('%s/auth/tokens' % self.TEST_URL, auth_responses)
+
+ a = v3.Password(self.TEST_URL, username=self.TEST_USER,
+ password=self.TEST_PASS)
+ s = session.Session(auth=a)
+
+ self.assertEqual('token1', s.get_token())
+ self.assertEqual({'X-Auth-Token': 'token1'}, s.get_auth_headers())
+ a.invalidate()
+ self.assertEqual('token2', s.get_token())
+ self.assertEqual({'X-Auth-Token': 'token2'}, s.get_auth_headers())
+
+ def test_doesnt_log_password(self):
+ self.stub_auth(json=self.TEST_RESPONSE_DICT)
+
+ password = uuid.uuid4().hex
+ a = v3.Password(self.TEST_URL, username=self.TEST_USER,
+ password=password)
+ s = session.Session(a)
+ self.assertEqual(self.TEST_TOKEN, s.get_token())
+ self.assertEqual({'X-Auth-Token': self.TEST_TOKEN},
+ s.get_auth_headers())
+
+ self.assertNotIn(password, self.logger.output)
+
+ def test_sends_nocatalog(self):
+ del self.TEST_RESPONSE_DICT['token']['catalog']
+ self.stub_auth(json=self.TEST_RESPONSE_DICT)
+
+ a = v3.Password(self.TEST_URL,
+ username=self.TEST_USER,
+ password=self.TEST_PASS,
+ include_catalog=False)
+ s = session.Session(auth=a)
+
+ s.get_token()
+
+ auth_url = self.TEST_URL + '/auth/tokens'
+ self.assertEqual(auth_url, a.token_url)
+ self.assertEqual(auth_url + '?nocatalog',
+ self.requests.last_request.url)
diff --git a/keystoneclient/tests/unit/auth/test_password.py b/keystoneclient/tests/unit/auth/test_password.py
new file mode 100644
index 0000000..c5067c0
--- /dev/null
+++ b/keystoneclient/tests/unit/auth/test_password.py
@@ -0,0 +1,63 @@
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import uuid
+
+from keystoneclient.auth.identity.generic import password
+from keystoneclient.auth.identity import v2
+from keystoneclient.auth.identity import v3
+from keystoneclient.tests.unit.auth import utils
+
+
+class PasswordTests(utils.GenericPluginTestCase):
+
+ PLUGIN_CLASS = password.Password
+ V2_PLUGIN_CLASS = v2.Password
+ V3_PLUGIN_CLASS = v3.Password
+
+ def new_plugin(self, **kwargs):
+ kwargs.setdefault('username', uuid.uuid4().hex)
+ kwargs.setdefault('password', uuid.uuid4().hex)
+ return super(PasswordTests, self).new_plugin(**kwargs)
+
+ def test_with_user_domain_params(self):
+ self.stub_discovery()
+
+ self.assertCreateV3(domain_id=uuid.uuid4().hex,
+ user_domain_id=uuid.uuid4().hex)
+
+ def test_v3_user_params_v2_url(self):
+ self.stub_discovery(v3=False)
+ self.assertDiscoveryFailure(user_domain_id=uuid.uuid4().hex)
+
+ def test_options(self):
+ opts = [o.name for o in self.PLUGIN_CLASS.get_options()]
+
+ allowed_opts = ['user-name',
+ 'user-domain-id',
+ 'user-domain-name',
+ 'user-id',
+ 'password',
+
+ 'domain-id',
+ 'domain-name',
+ 'tenant-id',
+ 'tenant-name',
+ 'project-id',
+ 'project-name',
+ 'project-domain-id',
+ 'project-domain-name',
+ 'trust-id',
+ 'auth-url']
+
+ self.assertEqual(set(allowed_opts), set(opts))
+ self.assertEqual(len(allowed_opts), len(opts))
diff --git a/keystoneclient/tests/unit/auth/test_token.py b/keystoneclient/tests/unit/auth/test_token.py
new file mode 100644
index 0000000..928e2b2
--- /dev/null
+++ b/keystoneclient/tests/unit/auth/test_token.py
@@ -0,0 +1,47 @@
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import uuid
+
+from keystoneclient.auth.identity.generic import token
+from keystoneclient.auth.identity import v2
+from keystoneclient.auth.identity import v3
+from keystoneclient.tests.unit.auth import utils
+
+
+class TokenTests(utils.GenericPluginTestCase):
+
+ PLUGIN_CLASS = token.Token
+ V2_PLUGIN_CLASS = v2.Token
+ V3_PLUGIN_CLASS = v3.Token
+
+ def new_plugin(self, **kwargs):
+ kwargs.setdefault('token', uuid.uuid4().hex)
+ return super(TokenTests, self).new_plugin(**kwargs)
+
+ def test_options(self):
+ opts = [o.name for o in self.PLUGIN_CLASS.get_options()]
+
+ allowed_opts = ['token',
+ 'domain-id',
+ 'domain-name',
+ 'tenant-id',
+ 'tenant-name',
+ 'project-id',
+ 'project-name',
+ 'project-domain-id',
+ 'project-domain-name',
+ 'trust-id',
+ 'auth-url']
+
+ self.assertEqual(set(allowed_opts), set(opts))
+ self.assertEqual(len(allowed_opts), len(opts))
diff --git a/keystoneclient/tests/unit/auth/test_token_endpoint.py b/keystoneclient/tests/unit/auth/test_token_endpoint.py
new file mode 100644
index 0000000..4b5f82c
--- /dev/null
+++ b/keystoneclient/tests/unit/auth/test_token_endpoint.py
@@ -0,0 +1,63 @@
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from testtools import matchers
+
+from keystoneclient.auth import token_endpoint
+from keystoneclient import session
+from keystoneclient.tests.unit import utils
+
+
+class TokenEndpointTest(utils.TestCase):
+
+ TEST_TOKEN = 'aToken'
+ TEST_URL = 'http://server/prefix'
+
+ def test_basic_case(self):
+ self.requests.get(self.TEST_URL, text='body')
+
+ a = token_endpoint.Token(self.TEST_URL, self.TEST_TOKEN)
+ s = session.Session(auth=a)
+
+ data = s.get(self.TEST_URL, authenticated=True)
+
+ self.assertEqual(data.text, 'body')
+ self.assertRequestHeaderEqual('X-Auth-Token', self.TEST_TOKEN)
+
+ def test_basic_endpoint_case(self):
+ self.stub_url('GET', ['p'], text='body')
+ a = token_endpoint.Token(self.TEST_URL, self.TEST_TOKEN)
+ s = session.Session(auth=a)
+
+ data = s.get('/p',
+ authenticated=True,
+ endpoint_filter={'service': 'identity'})
+
+ self.assertEqual(self.TEST_URL, a.get_endpoint(s))
+ self.assertEqual('body', data.text)
+ self.assertRequestHeaderEqual('X-Auth-Token', self.TEST_TOKEN)
+
+ def test_token_endpoint_options(self):
+ opt_names = [opt.name for opt in token_endpoint.Token.get_options()]
+
+ self.assertThat(opt_names, matchers.HasLength(2))
+
+ self.assertIn('token', opt_names)
+ self.assertIn('endpoint', opt_names)
+
+ def test_token_endpoint_user_id(self):
+ a = token_endpoint.Token(self.TEST_URL, self.TEST_TOKEN)
+ s = session.Session()
+
+ # we can't know this information about this sort of plugin
+ self.assertIsNone(a.get_user_id(s))
+ self.assertIsNone(a.get_project_id(s))
diff --git a/keystoneclient/tests/unit/auth/utils.py b/keystoneclient/tests/unit/auth/utils.py
new file mode 100644
index 0000000..6580c73
--- /dev/null
+++ b/keystoneclient/tests/unit/auth/utils.py
@@ -0,0 +1,200 @@
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import functools
+import uuid
+
+import mock
+from oslo_config import cfg
+import six
+
+from keystoneclient import access
+from keystoneclient.auth import base
+from keystoneclient import exceptions
+from keystoneclient import fixture
+from keystoneclient import session
+from keystoneclient.tests.unit import utils
+
+
+class MockPlugin(base.BaseAuthPlugin):
+
+ INT_DESC = 'test int'
+ FLOAT_DESC = 'test float'
+ BOOL_DESC = 'test bool'
+ STR_DESC = 'test str'
+ STR_DEFAULT = uuid.uuid4().hex
+
+ def __init__(self, **kwargs):
+ self._data = kwargs
+
+ def __getitem__(self, key):
+ return self._data[key]
+
+ def get_token(self, *args, **kwargs):
+ return 'aToken'
+
+ def get_endpoint(self, *args, **kwargs):
+ return 'http://test'
+
+ @classmethod
+ def get_options(cls):
+ return [
+ cfg.IntOpt('a-int', default='3', help=cls.INT_DESC),
+ cfg.BoolOpt('a-bool', help=cls.BOOL_DESC),
+ cfg.FloatOpt('a-float', help=cls.FLOAT_DESC),
+ cfg.StrOpt('a-str', help=cls.STR_DESC, default=cls.STR_DEFAULT),
+ ]
+
+
+class MockManager(object):
+
+ def __init__(self, driver):
+ self.driver = driver
+
+
+def mock_plugin(f):
+ @functools.wraps(f)
+ def inner(*args, **kwargs):
+ with mock.patch.object(base, 'get_plugin_class') as m:
+ m.return_value = MockPlugin
+ args = list(args) + [m]
+ return f(*args, **kwargs)
+
+ return inner
+
+
+class TestCase(utils.TestCase):
+
+ GROUP = 'auth'
+ V2PASS = 'v2password'
+ V3TOKEN = 'v3token'
+
+ a_int = 88
+ a_float = 88.8
+ a_bool = False
+
+ TEST_VALS = {'a_int': a_int,
+ 'a_float': a_float,
+ 'a_bool': a_bool}
+
+ def assertTestVals(self, plugin, vals=TEST_VALS):
+ for k, v in six.iteritems(vals):
+ self.assertEqual(v, plugin[k])
+
+
+class GenericPluginTestCase(utils.TestCase):
+
+ TEST_URL = 'http://keystone.host:5000/'
+
+ # OVERRIDE THESE IN SUB CLASSES
+ PLUGIN_CLASS = None
+ V2_PLUGIN_CLASS = None
+ V3_PLUGIN_CLASS = None
+
+ def setUp(self):
+ super(GenericPluginTestCase, self).setUp()
+
+ self.token_v2 = fixture.V2Token()
+ self.token_v3 = fixture.V3Token()
+ self.token_v3_id = uuid.uuid4().hex
+ self.session = session.Session()
+
+ self.stub_url('POST', ['v2.0', 'tokens'], json=self.token_v2)
+ self.stub_url('POST', ['v3', 'auth', 'tokens'],
+ headers={'X-Subject-Token': self.token_v3_id},
+ json=self.token_v3)
+
+ def new_plugin(self, **kwargs):
+ kwargs.setdefault('auth_url', self.TEST_URL)
+ return self.PLUGIN_CLASS(**kwargs)
+
+ def stub_discovery(self, base_url=None, **kwargs):
+ kwargs.setdefault('href', self.TEST_URL)
+ disc = fixture.DiscoveryList(**kwargs)
+ self.stub_url('GET', json=disc, base_url=base_url, status_code=300)
+ return disc
+
+ def assertCreateV3(self, **kwargs):
+ auth = self.new_plugin(**kwargs)
+ auth_ref = auth.get_auth_ref(self.session)
+ self.assertIsInstance(auth_ref, access.AccessInfoV3)
+ self.assertEqual(self.TEST_URL + 'v3/auth/tokens',
+ self.requests.last_request.url)
+ self.assertIsInstance(auth._plugin, self.V3_PLUGIN_CLASS)
+ return auth
+
+ def assertCreateV2(self, **kwargs):
+ auth = self.new_plugin(**kwargs)
+ auth_ref = auth.get_auth_ref(self.session)
+ self.assertIsInstance(auth_ref, access.AccessInfoV2)
+ self.assertEqual(self.TEST_URL + 'v2.0/tokens',
+ self.requests.last_request.url)
+ self.assertIsInstance(auth._plugin, self.V2_PLUGIN_CLASS)
+ return auth
+
+ def assertDiscoveryFailure(self, **kwargs):
+ plugin = self.new_plugin(**kwargs)
+ self.assertRaises(exceptions.DiscoveryFailure,
+ plugin.get_auth_ref,
+ self.session)
+
+ def test_create_v3_if_domain_params(self):
+ self.stub_discovery()
+
+ self.assertCreateV3(domain_id=uuid.uuid4().hex)
+ self.assertCreateV3(domain_name=uuid.uuid4().hex)
+ self.assertCreateV3(project_name=uuid.uuid4().hex,
+ project_domain_name=uuid.uuid4().hex)
+ self.assertCreateV3(project_name=uuid.uuid4().hex,
+ project_domain_id=uuid.uuid4().hex)
+
+ def test_create_v2_if_no_domain_params(self):
+ self.stub_discovery()
+ self.assertCreateV2()
+ self.assertCreateV2(project_id=uuid.uuid4().hex)
+ self.assertCreateV2(project_name=uuid.uuid4().hex)
+ self.assertCreateV2(tenant_id=uuid.uuid4().hex)
+ self.assertCreateV2(tenant_name=uuid.uuid4().hex)
+
+ def test_v3_params_v2_url(self):
+ self.stub_discovery(v3=False)
+ self.assertDiscoveryFailure(domain_name=uuid.uuid4().hex)
+
+ def test_v2_params_v3_url(self):
+ self.stub_discovery(v2=False)
+ self.assertCreateV3()
+
+ def test_no_urls(self):
+ self.stub_discovery(v2=False, v3=False)
+ self.assertDiscoveryFailure()
+
+ def test_path_based_url_v2(self):
+ self.stub_url('GET', ['v2.0'], status_code=403)
+ self.assertCreateV2(auth_url=self.TEST_URL + 'v2.0')
+
+ def test_path_based_url_v3(self):
+ self.stub_url('GET', ['v3'], status_code=403)
+ self.assertCreateV3(auth_url=self.TEST_URL + 'v3')
+
+ def test_disc_error_for_failure(self):
+ self.stub_url('GET', [], status_code=403)
+ self.assertDiscoveryFailure()
+
+ def test_v3_plugin_from_failure(self):
+ url = self.TEST_URL + 'v3'
+ self.stub_url('GET', [], base_url=url, status_code=403)
+ self.assertCreateV3(auth_url=url)
+
+ def test_unknown_discovery_version(self):
+ # make a v4 entry that's mostly the same as a v3
+ self.stub_discovery(v2=False, v3_id='v4.0')
+ self.assertDiscoveryFailure()