summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJamie Lennox <jamielennox@redhat.com>2014-08-29 17:21:00 +1000
committerJamie Lennox <jamielennox@redhat.com>2014-09-01 10:50:16 +1000
commit4be8e8db3f0760aa0debb093cd61fd6495158007 (patch)
treedfd68fd8daceaf035e119b749ae941434f4cb52e
parent0e6171904385550972b3a961e9cb1614403312dc (diff)
downloadpython-keystoneclient-4be8e8db3f0760aa0debb093cd61fd6495158007.tar.gz
Expose auth methods on the adapter
Provide access to get_token, get_endpoint and invalidate to the adapter. The adapter is essentially created per individual client and it can be useful to know things like the endpoint that requests will be sent to based on the parameters that are included in the endpoint_filter. This essentially allows us to emulate the management_url and auth_token properties of the existing clients. Change-Id: Ic01bc52bb38e8fb72e7a6d93bfd2944b11d0b070
-rw-r--r--keystoneclient/adapter.py49
-rw-r--r--keystoneclient/tests/test_session.py55
2 files changed, 82 insertions, 22 deletions
diff --git a/keystoneclient/adapter.py b/keystoneclient/adapter.py
index b5687b9..24403f9 100644
--- a/keystoneclient/adapter.py
+++ b/keystoneclient/adapter.py
@@ -52,19 +52,21 @@ class Adapter(object):
self.user_agent = user_agent
self.auth = auth
- def request(self, url, method, **kwargs):
- endpoint_filter = kwargs.setdefault('endpoint_filter', {})
-
+ def _set_endpoint_filter_kwargs(self, kwargs):
if self.service_type:
- endpoint_filter.setdefault('service_type', self.service_type)
+ kwargs.setdefault('service_type', self.service_type)
if self.service_name:
- endpoint_filter.setdefault('service_name', self.service_name)
+ kwargs.setdefault('service_name', self.service_name)
if self.interface:
- endpoint_filter.setdefault('interface', self.interface)
+ kwargs.setdefault('interface', self.interface)
if self.region_name:
- endpoint_filter.setdefault('region_name', self.region_name)
+ kwargs.setdefault('region_name', self.region_name)
if self.version:
- endpoint_filter.setdefault('version', self.version)
+ kwargs.setdefault('version', self.version)
+
+ def request(self, url, method, **kwargs):
+ endpoint_filter = kwargs.setdefault('endpoint_filter', {})
+ self._set_endpoint_filter_kwargs(endpoint_filter)
if self.endpoint_override:
kwargs.setdefault('endpoint_override', self.endpoint_override)
@@ -76,6 +78,37 @@ class Adapter(object):
return self.session.request(url, method, **kwargs)
+ def get_token(self, auth=None):
+ """Return a token as provided by the auth plugin.
+
+ :param auth: The auth plugin to use for token. Overrides the plugin
+ on the session. (optional)
+ :type auth: :class:`keystoneclient.auth.base.BaseAuthPlugin`
+
+ :raises AuthorizationFailure: if a new token fetch fails.
+
+ :returns string: A valid token.
+ """
+ return self.session.get_token(auth or self.auth)
+
+ def get_endpoint(self, auth=None, **kwargs):
+ """Get an endpoint as provided by the auth plugin.
+
+ :param auth: The auth plugin to use for token. Overrides the plugin on
+ the session. (optional)
+ :type auth: :class:`keystoneclient.auth.base.BaseAuthPlugin`
+
+ :raises MissingAuthPlugin: if a plugin is not available.
+
+ :returns string: An endpoint if available or None.
+ """
+ self._set_endpoint_filter_kwargs(kwargs)
+ return self.session.get_endpoint(auth or self.auth, **kwargs)
+
+ def invalidate(self, auth=None):
+ """Invalidate an authentication plugin."""
+ return self.session.invalidate(auth or self.auth)
+
def get(self, url, **kwargs):
return self.request(url, 'GET', **kwargs)
diff --git a/keystoneclient/tests/test_session.py b/keystoneclient/tests/test_session.py
index 2b90b73..b5480e8 100644
--- a/keystoneclient/tests/test_session.py
+++ b/keystoneclient/tests/test_session.py
@@ -552,13 +552,10 @@ class AdapterTest(utils.TestCase):
TEST_URL = CalledAuthPlugin.ENDPOINT
- def test_setting_variables(self):
- response = uuid.uuid4().hex
- self.stub_url('GET', text=response)
-
+ def _create_loaded_adapter(self):
auth = CalledAuthPlugin()
sess = client_session.Session()
- adpt = adapter.Adapter(sess,
+ return adapter.Adapter(sess,
auth=auth,
service_type=self.SERVICE_TYPE,
service_name=self.SERVICE_NAME,
@@ -567,23 +564,36 @@ class AdapterTest(utils.TestCase):
user_agent=self.USER_AGENT,
version=self.VERSION)
- resp = adpt.get('/')
- self.assertEqual(resp.text, response)
-
+ def _verify_endpoint_called(self, adpt):
self.assertEqual(self.SERVICE_TYPE,
- auth.endpoint_arguments['service_type'])
+ adpt.auth.endpoint_arguments['service_type'])
self.assertEqual(self.SERVICE_NAME,
- auth.endpoint_arguments['service_name'])
+ adpt.auth.endpoint_arguments['service_name'])
self.assertEqual(self.INTERFACE,
- auth.endpoint_arguments['interface'])
+ adpt.auth.endpoint_arguments['interface'])
self.assertEqual(self.REGION_NAME,
- auth.endpoint_arguments['region_name'])
+ adpt.auth.endpoint_arguments['region_name'])
self.assertEqual(self.VERSION,
- auth.endpoint_arguments['version'])
+ adpt.auth.endpoint_arguments['version'])
- self.assertTrue(auth.get_token_called)
+ def test_setting_variables_on_request(self):
+ response = uuid.uuid4().hex
+ self.stub_url('GET', text=response)
+ adpt = self._create_loaded_adapter()
+ resp = adpt.get('/')
+ self.assertEqual(resp.text, response)
+
+ self._verify_endpoint_called(adpt)
+ self.assertTrue(adpt.auth.get_token_called)
self.assertRequestHeaderEqual('User-Agent', self.USER_AGENT)
+ def test_setting_variables_on_get_endpoint(self):
+ adpt = self._create_loaded_adapter()
+ url = adpt.get_endpoint()
+
+ self.assertEqual(self.TEST_URL, url)
+ self._verify_endpoint_called(adpt)
+
def test_legacy_binding(self):
key = uuid.uuid4().hex
val = uuid.uuid4().hex
@@ -647,6 +657,23 @@ class AdapterTest(utils.TestCase):
self.assertEqual(response, resp.text)
self.assertEqual(endpoint_url, self.requests.last_request.url)
+ def test_adapter_invalidate(self):
+ auth = CalledAuthPlugin()
+ sess = client_session.Session()
+ adpt = adapter.Adapter(sess, auth=auth)
+
+ adpt.invalidate()
+
+ self.assertTrue(auth.invalidate_called)
+
+ def test_adapter_get_token(self):
+ auth = CalledAuthPlugin()
+ sess = client_session.Session()
+ adpt = adapter.Adapter(sess, auth=auth)
+
+ self.assertEqual(self.TEST_TOKEN, adpt.get_token())
+ self.assertTrue(auth.get_token_called)
+
class ConfLoadingTests(utils.TestCase):