diff options
| author | Jamie Lennox <jamielennox@redhat.com> | 2015-02-11 19:03:25 +1100 |
|---|---|---|
| committer | Jamie Lennox <jamielennox@redhat.com> | 2015-02-11 19:03:25 +1100 |
| commit | 6bd93179a2966f2b5c67e297628510ac73689fb3 (patch) | |
| tree | faf3a93a16fb49b4a742f74b6fcdd20f8a0ebd0e /keystoneclient/tests/unit/test_session.py | |
| parent | 58ac2de5d4a6b58e8bd5d430a04199a4d40427a8 (diff) | |
| download | python-keystoneclient-6bd93179a2966f2b5c67e297628510ac73689fb3.tar.gz | |
Move tests to the unit subdirectory
Move all the existing tests to the unit/ subdirectory. This gives us
some room to add a functional/ directory later with other tests.
Change-Id: I0fb8d5b628eb8ee1f35f05f42d0c0ac9f285e8c3
Implements: functional-testing
Diffstat (limited to 'keystoneclient/tests/unit/test_session.py')
| -rw-r--r-- | keystoneclient/tests/unit/test_session.py | 866 |
1 files changed, 866 insertions, 0 deletions
diff --git a/keystoneclient/tests/unit/test_session.py b/keystoneclient/tests/unit/test_session.py new file mode 100644 index 0000000..1d01c3a --- /dev/null +++ b/keystoneclient/tests/unit/test_session.py @@ -0,0 +1,866 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import argparse +import itertools +import uuid + +import mock +from oslo_config import cfg +from oslo_config import fixture as config +from oslo_serialization import jsonutils +import requests +import six +from testtools import matchers + +from keystoneclient import adapter +from keystoneclient.auth import base +from keystoneclient import exceptions +from keystoneclient import session as client_session +from keystoneclient.tests.unit import utils + + +class SessionTests(utils.TestCase): + + TEST_URL = 'http://127.0.0.1:5000/' + + def test_get(self): + session = client_session.Session() + self.stub_url('GET', text='response') + resp = session.get(self.TEST_URL) + + self.assertEqual('GET', self.requests.last_request.method) + self.assertEqual(resp.text, 'response') + self.assertTrue(resp.ok) + + def test_post(self): + session = client_session.Session() + self.stub_url('POST', text='response') + resp = session.post(self.TEST_URL, json={'hello': 'world'}) + + self.assertEqual('POST', self.requests.last_request.method) + self.assertEqual(resp.text, 'response') + self.assertTrue(resp.ok) + self.assertRequestBodyIs(json={'hello': 'world'}) + + def test_head(self): + session = client_session.Session() + self.stub_url('HEAD') + resp = session.head(self.TEST_URL) + + self.assertEqual('HEAD', self.requests.last_request.method) + self.assertTrue(resp.ok) + self.assertRequestBodyIs('') + + def test_put(self): + session = client_session.Session() + self.stub_url('PUT', text='response') + resp = session.put(self.TEST_URL, json={'hello': 'world'}) + + self.assertEqual('PUT', self.requests.last_request.method) + self.assertEqual(resp.text, 'response') + self.assertTrue(resp.ok) + self.assertRequestBodyIs(json={'hello': 'world'}) + + def test_delete(self): + session = client_session.Session() + self.stub_url('DELETE', text='response') + resp = session.delete(self.TEST_URL) + + self.assertEqual('DELETE', self.requests.last_request.method) + self.assertTrue(resp.ok) + self.assertEqual(resp.text, 'response') + + def test_patch(self): + session = client_session.Session() + self.stub_url('PATCH', text='response') + resp = session.patch(self.TEST_URL, json={'hello': 'world'}) + + self.assertEqual('PATCH', self.requests.last_request.method) + self.assertTrue(resp.ok) + self.assertEqual(resp.text, 'response') + self.assertRequestBodyIs(json={'hello': 'world'}) + + def test_user_agent(self): + session = client_session.Session(user_agent='test-agent') + self.stub_url('GET', text='response') + resp = session.get(self.TEST_URL) + + self.assertTrue(resp.ok) + self.assertRequestHeaderEqual('User-Agent', 'test-agent') + + resp = session.get(self.TEST_URL, headers={'User-Agent': 'new-agent'}) + self.assertTrue(resp.ok) + self.assertRequestHeaderEqual('User-Agent', 'new-agent') + + resp = session.get(self.TEST_URL, headers={'User-Agent': 'new-agent'}, + user_agent='overrides-agent') + self.assertTrue(resp.ok) + self.assertRequestHeaderEqual('User-Agent', 'overrides-agent') + + def test_http_session_opts(self): + session = client_session.Session(cert='cert.pem', timeout=5, + verify='certs') + + FAKE_RESP = utils.TestResponse({'status_code': 200, 'text': 'resp'}) + RESP = mock.Mock(return_value=FAKE_RESP) + + with mock.patch.object(session.session, 'request', RESP) as mocked: + session.post(self.TEST_URL, data='value') + + mock_args, mock_kwargs = mocked.call_args + + self.assertEqual(mock_args[0], 'POST') + self.assertEqual(mock_args[1], self.TEST_URL) + self.assertEqual(mock_kwargs['data'], 'value') + self.assertEqual(mock_kwargs['cert'], 'cert.pem') + self.assertEqual(mock_kwargs['verify'], 'certs') + self.assertEqual(mock_kwargs['timeout'], 5) + + def test_not_found(self): + session = client_session.Session() + self.stub_url('GET', status_code=404) + self.assertRaises(exceptions.NotFound, session.get, self.TEST_URL) + + def test_server_error(self): + session = client_session.Session() + self.stub_url('GET', status_code=500) + self.assertRaises(exceptions.InternalServerError, + session.get, self.TEST_URL) + + def test_session_debug_output(self): + """Test request and response headers in debug logs + + in order to redact secure headers while debug is true. + """ + session = client_session.Session(verify=False) + headers = {'HEADERA': 'HEADERVALB'} + security_headers = {'Authorization': uuid.uuid4().hex, + 'X-Auth-Token': uuid.uuid4().hex, + 'X-Subject-Token': uuid.uuid4().hex, } + body = 'BODYRESPONSE' + data = 'BODYDATA' + all_headers = dict( + itertools.chain(headers.items(), security_headers.items())) + self.stub_url('POST', text=body, headers=all_headers) + resp = session.post(self.TEST_URL, headers=all_headers, data=data) + self.assertEqual(resp.status_code, 200) + + self.assertIn('curl', self.logger.output) + self.assertIn('POST', self.logger.output) + self.assertIn('--insecure', self.logger.output) + self.assertIn(body, self.logger.output) + self.assertIn("'%s'" % data, self.logger.output) + + for k, v in six.iteritems(headers): + self.assertIn(k, self.logger.output) + self.assertIn(v, self.logger.output) + + # Assert that response headers contains actual values and + # only debug logs has been masked + for k, v in six.iteritems(security_headers): + self.assertIn('%s: {SHA1}' % k, self.logger.output) + self.assertEqual(v, resp.headers[k]) + self.assertNotIn(v, self.logger.output) + + def test_logging_cacerts(self): + path_to_certs = '/path/to/certs' + session = client_session.Session(verify=path_to_certs) + + self.stub_url('GET', text='text') + session.get(self.TEST_URL) + + self.assertIn('--cacert', self.logger.output) + self.assertIn(path_to_certs, self.logger.output) + + def test_connect_retries(self): + + def _timeout_error(request, context): + raise requests.exceptions.Timeout() + + self.stub_url('GET', text=_timeout_error) + + session = client_session.Session() + retries = 3 + + with mock.patch('time.sleep') as m: + self.assertRaises(exceptions.RequestTimeout, + session.get, + self.TEST_URL, connect_retries=retries) + + self.assertEqual(retries, m.call_count) + # 3 retries finishing with 2.0 means 0.5, 1.0 and 2.0 + m.assert_called_with(2.0) + + # we count retries so there will be one initial request + 3 retries + self.assertThat(self.requests.request_history, + matchers.HasLength(retries + 1)) + + def test_uses_tcp_keepalive_by_default(self): + session = client_session.Session() + requests_session = session.session + self.assertIsInstance(requests_session.adapters['http://'], + client_session.TCPKeepAliveAdapter) + self.assertIsInstance(requests_session.adapters['https://'], + client_session.TCPKeepAliveAdapter) + + def test_does_not_set_tcp_keepalive_on_custom_sessions(self): + mock_session = mock.Mock() + client_session.Session(session=mock_session) + self.assertFalse(mock_session.mount.called) + + +class RedirectTests(utils.TestCase): + + REDIRECT_CHAIN = ['http://myhost:3445/', + 'http://anotherhost:6555/', + 'http://thirdhost/', + 'http://finaldestination:55/'] + + DEFAULT_REDIRECT_BODY = 'Redirect' + DEFAULT_RESP_BODY = 'Found' + + def setup_redirects(self, method='GET', status_code=305, + redirect_kwargs={}, final_kwargs={}): + redirect_kwargs.setdefault('text', self.DEFAULT_REDIRECT_BODY) + + for s, d in zip(self.REDIRECT_CHAIN, self.REDIRECT_CHAIN[1:]): + self.requests.register_uri(method, s, status_code=status_code, + headers={'Location': d}, + **redirect_kwargs) + + final_kwargs.setdefault('status_code', 200) + final_kwargs.setdefault('text', self.DEFAULT_RESP_BODY) + self.requests.register_uri(method, self.REDIRECT_CHAIN[-1], + **final_kwargs) + + def assertResponse(self, resp): + self.assertEqual(resp.status_code, 200) + self.assertEqual(resp.text, self.DEFAULT_RESP_BODY) + + def test_basic_get(self): + session = client_session.Session() + self.setup_redirects() + resp = session.get(self.REDIRECT_CHAIN[-2]) + self.assertResponse(resp) + + def test_basic_post_keeps_correct_method(self): + session = client_session.Session() + self.setup_redirects(method='POST', status_code=301) + resp = session.post(self.REDIRECT_CHAIN[-2]) + self.assertResponse(resp) + + def test_redirect_forever(self): + session = client_session.Session(redirect=True) + self.setup_redirects() + resp = session.get(self.REDIRECT_CHAIN[0]) + self.assertResponse(resp) + self.assertTrue(len(resp.history), len(self.REDIRECT_CHAIN)) + + def test_no_redirect(self): + session = client_session.Session(redirect=False) + self.setup_redirects() + resp = session.get(self.REDIRECT_CHAIN[0]) + self.assertEqual(resp.status_code, 305) + self.assertEqual(resp.url, self.REDIRECT_CHAIN[0]) + + def test_redirect_limit(self): + self.setup_redirects() + for i in (1, 2): + session = client_session.Session(redirect=i) + resp = session.get(self.REDIRECT_CHAIN[0]) + self.assertEqual(resp.status_code, 305) + self.assertEqual(resp.url, self.REDIRECT_CHAIN[i]) + self.assertEqual(resp.text, self.DEFAULT_REDIRECT_BODY) + + def test_history_matches_requests(self): + self.setup_redirects(status_code=301) + session = client_session.Session(redirect=True) + req_resp = requests.get(self.REDIRECT_CHAIN[0], + allow_redirects=True) + + ses_resp = session.get(self.REDIRECT_CHAIN[0]) + + self.assertEqual(len(req_resp.history), len(ses_resp.history)) + + for r, s in zip(req_resp.history, ses_resp.history): + self.assertEqual(r.url, s.url) + self.assertEqual(r.status_code, s.status_code) + + +class ConstructSessionFromArgsTests(utils.TestCase): + + KEY = 'keyfile' + CERT = 'certfile' + CACERT = 'cacert-path' + + def _s(self, k=None, **kwargs): + k = k or kwargs + return client_session.Session.construct(k) + + def test_verify(self): + self.assertFalse(self._s(insecure=True).verify) + self.assertTrue(self._s(verify=True, insecure=True).verify) + self.assertFalse(self._s(verify=False, insecure=True).verify) + self.assertEqual(self._s(cacert=self.CACERT).verify, self.CACERT) + + def test_cert(self): + tup = (self.CERT, self.KEY) + self.assertEqual(self._s(cert=tup).cert, tup) + self.assertEqual(self._s(cert=self.CERT, key=self.KEY).cert, tup) + self.assertIsNone(self._s(key=self.KEY).cert) + + def test_pass_through(self): + value = 42 # only a number because timeout needs to be + for key in ['timeout', 'session', 'original_ip', 'user_agent']: + args = {key: value} + self.assertEqual(getattr(self._s(args), key), value) + self.assertNotIn(key, args) + + +class AuthPlugin(base.BaseAuthPlugin): + """Very simple debug authentication plugin. + + Takes Parameters such that it can throw exceptions at the right times. + """ + + TEST_TOKEN = 'aToken' + TEST_USER_ID = 'aUser' + TEST_PROJECT_ID = 'aProject' + + SERVICE_URLS = { + 'identity': {'public': 'http://identity-public:1111/v2.0', + 'admin': 'http://identity-admin:1111/v2.0'}, + 'compute': {'public': 'http://compute-public:2222/v1.0', + 'admin': 'http://compute-admin:2222/v1.0'}, + 'image': {'public': 'http://image-public:3333/v2.0', + 'admin': 'http://image-admin:3333/v2.0'} + } + + def __init__(self, token=TEST_TOKEN, invalidate=True): + self.token = token + self._invalidate = invalidate + + def get_token(self, session): + return self.token + + def get_endpoint(self, session, service_type=None, interface=None, + **kwargs): + try: + return self.SERVICE_URLS[service_type][interface] + except (KeyError, AttributeError): + return None + + def invalidate(self): + return self._invalidate + + def get_user_id(self, session): + return self.TEST_USER_ID + + def get_project_id(self, session): + return self.TEST_PROJECT_ID + + +class CalledAuthPlugin(base.BaseAuthPlugin): + + ENDPOINT = 'http://fakeendpoint/' + + def __init__(self, invalidate=True): + self.get_token_called = False + self.get_endpoint_called = False + self.endpoint_arguments = {} + self.invalidate_called = False + self._invalidate = invalidate + + def get_token(self, session): + self.get_token_called = True + return 'aToken' + + def get_endpoint(self, session, **kwargs): + self.get_endpoint_called = True + self.endpoint_arguments = kwargs + return self.ENDPOINT + + def invalidate(self): + self.invalidate_called = True + return self._invalidate + + +class SessionAuthTests(utils.TestCase): + + TEST_URL = 'http://127.0.0.1:5000/' + TEST_JSON = {'hello': 'world'} + + def stub_service_url(self, service_type, interface, path, + method='GET', **kwargs): + base_url = AuthPlugin.SERVICE_URLS[service_type][interface] + uri = "%s/%s" % (base_url.rstrip('/'), path.lstrip('/')) + + self.requests.register_uri(method, uri, **kwargs) + + def test_auth_plugin_default_with_plugin(self): + self.stub_url('GET', base_url=self.TEST_URL, json=self.TEST_JSON) + + # if there is an auth_plugin then it should default to authenticated + auth = AuthPlugin() + sess = client_session.Session(auth=auth) + resp = sess.get(self.TEST_URL) + self.assertDictEqual(resp.json(), self.TEST_JSON) + + self.assertRequestHeaderEqual('X-Auth-Token', AuthPlugin.TEST_TOKEN) + + def test_auth_plugin_disable(self): + self.stub_url('GET', base_url=self.TEST_URL, json=self.TEST_JSON) + + auth = AuthPlugin() + sess = client_session.Session(auth=auth) + resp = sess.get(self.TEST_URL, authenticated=False) + self.assertDictEqual(resp.json(), self.TEST_JSON) + + self.assertRequestHeaderEqual('X-Auth-Token', None) + + def test_service_type_urls(self): + service_type = 'compute' + interface = 'public' + path = '/instances' + status = 200 + body = 'SUCCESS' + + self.stub_service_url(service_type=service_type, + interface=interface, + path=path, + status_code=status, + text=body) + + sess = client_session.Session(auth=AuthPlugin()) + resp = sess.get(path, + endpoint_filter={'service_type': service_type, + 'interface': interface}) + + self.assertEqual(self.requests.last_request.url, + AuthPlugin.SERVICE_URLS['compute']['public'] + path) + self.assertEqual(resp.text, body) + self.assertEqual(resp.status_code, status) + + def test_service_url_raises_if_no_auth_plugin(self): + sess = client_session.Session() + self.assertRaises(exceptions.MissingAuthPlugin, + sess.get, '/path', + endpoint_filter={'service_type': 'compute', + 'interface': 'public'}) + + def test_service_url_raises_if_no_url_returned(self): + sess = client_session.Session(auth=AuthPlugin()) + self.assertRaises(exceptions.EndpointNotFound, + sess.get, '/path', + endpoint_filter={'service_type': 'unknown', + 'interface': 'public'}) + + def test_raises_exc_only_when_asked(self): + # A request that returns a HTTP error should by default raise an + # exception by default, if you specify raise_exc=False then it will not + self.requests.get(self.TEST_URL, status_code=401) + + sess = client_session.Session() + self.assertRaises(exceptions.Unauthorized, sess.get, self.TEST_URL) + + resp = sess.get(self.TEST_URL, raise_exc=False) + self.assertEqual(401, resp.status_code) + + def test_passed_auth_plugin(self): + passed = CalledAuthPlugin() + sess = client_session.Session() + + self.requests.get(CalledAuthPlugin.ENDPOINT + 'path', + status_code=200) + endpoint_filter = {'service_type': 'identity'} + + # no plugin with authenticated won't work + self.assertRaises(exceptions.MissingAuthPlugin, sess.get, 'path', + authenticated=True) + + # no plugin with an endpoint filter won't work + self.assertRaises(exceptions.MissingAuthPlugin, sess.get, 'path', + authenticated=False, endpoint_filter=endpoint_filter) + + resp = sess.get('path', auth=passed, endpoint_filter=endpoint_filter) + + self.assertEqual(200, resp.status_code) + self.assertTrue(passed.get_endpoint_called) + self.assertTrue(passed.get_token_called) + + def test_passed_auth_plugin_overrides(self): + fixed = CalledAuthPlugin() + passed = CalledAuthPlugin() + + sess = client_session.Session(fixed) + + self.requests.get(CalledAuthPlugin.ENDPOINT + 'path', + status_code=200) + + resp = sess.get('path', auth=passed, + endpoint_filter={'service_type': 'identity'}) + + self.assertEqual(200, resp.status_code) + self.assertTrue(passed.get_endpoint_called) + self.assertTrue(passed.get_token_called) + self.assertFalse(fixed.get_endpoint_called) + self.assertFalse(fixed.get_token_called) + + def test_requests_auth_plugin(self): + sess = client_session.Session() + + requests_auth = object() + + FAKE_RESP = utils.TestResponse({'status_code': 200, 'text': 'resp'}) + RESP = mock.Mock(return_value=FAKE_RESP) + + with mock.patch.object(sess.session, 'request', RESP) as mocked: + sess.get(self.TEST_URL, requests_auth=requests_auth) + + mocked.assert_called_once_with('GET', self.TEST_URL, + headers=mock.ANY, + allow_redirects=mock.ANY, + auth=requests_auth, + verify=mock.ANY) + + def test_reauth_called(self): + auth = CalledAuthPlugin(invalidate=True) + sess = client_session.Session(auth=auth) + + self.requests.get(self.TEST_URL, + [{'text': 'Failed', 'status_code': 401}, + {'text': 'Hello', 'status_code': 200}]) + + # allow_reauth=True is the default + resp = sess.get(self.TEST_URL, authenticated=True) + + self.assertEqual(200, resp.status_code) + self.assertEqual('Hello', resp.text) + self.assertTrue(auth.invalidate_called) + + def test_reauth_not_called(self): + auth = CalledAuthPlugin(invalidate=True) + sess = client_session.Session(auth=auth) + + self.requests.get(self.TEST_URL, + [{'text': 'Failed', 'status_code': 401}, + {'text': 'Hello', 'status_code': 200}]) + + self.assertRaises(exceptions.Unauthorized, sess.get, self.TEST_URL, + authenticated=True, allow_reauth=False) + self.assertFalse(auth.invalidate_called) + + def test_endpoint_override_overrides_filter(self): + auth = CalledAuthPlugin() + sess = client_session.Session(auth=auth) + + override_base = 'http://mytest/' + path = 'path' + override_url = override_base + path + resp_text = uuid.uuid4().hex + + self.requests.get(override_url, text=resp_text) + + resp = sess.get(path, + endpoint_override=override_base, + endpoint_filter={'service_type': 'identity'}) + + self.assertEqual(resp_text, resp.text) + self.assertEqual(override_url, self.requests.last_request.url) + + self.assertTrue(auth.get_token_called) + self.assertFalse(auth.get_endpoint_called) + + def test_endpoint_override_ignore_full_url(self): + auth = CalledAuthPlugin() + sess = client_session.Session(auth=auth) + + path = 'path' + url = self.TEST_URL + path + + resp_text = uuid.uuid4().hex + self.requests.get(url, text=resp_text) + + resp = sess.get(url, + endpoint_override='http://someother.url', + endpoint_filter={'service_type': 'identity'}) + + self.assertEqual(resp_text, resp.text) + self.assertEqual(url, self.requests.last_request.url) + + self.assertTrue(auth.get_token_called) + self.assertFalse(auth.get_endpoint_called) + + def test_user_and_project_id(self): + auth = AuthPlugin() + sess = client_session.Session(auth=auth) + + self.assertEqual(auth.TEST_USER_ID, sess.get_user_id()) + self.assertEqual(auth.TEST_PROJECT_ID, sess.get_project_id()) + + +class AdapterTest(utils.TestCase): + + SERVICE_TYPE = uuid.uuid4().hex + SERVICE_NAME = uuid.uuid4().hex + INTERFACE = uuid.uuid4().hex + REGION_NAME = uuid.uuid4().hex + USER_AGENT = uuid.uuid4().hex + VERSION = uuid.uuid4().hex + + TEST_URL = CalledAuthPlugin.ENDPOINT + + def _create_loaded_adapter(self): + auth = CalledAuthPlugin() + sess = client_session.Session() + return adapter.Adapter(sess, + auth=auth, + service_type=self.SERVICE_TYPE, + service_name=self.SERVICE_NAME, + interface=self.INTERFACE, + region_name=self.REGION_NAME, + user_agent=self.USER_AGENT, + version=self.VERSION) + + def _verify_endpoint_called(self, adpt): + self.assertEqual(self.SERVICE_TYPE, + adpt.auth.endpoint_arguments['service_type']) + self.assertEqual(self.SERVICE_NAME, + adpt.auth.endpoint_arguments['service_name']) + self.assertEqual(self.INTERFACE, + adpt.auth.endpoint_arguments['interface']) + self.assertEqual(self.REGION_NAME, + adpt.auth.endpoint_arguments['region_name']) + self.assertEqual(self.VERSION, + adpt.auth.endpoint_arguments['version']) + + def test_setting_variables_on_request(self): + response = uuid.uuid4().hex + self.stub_url('GET', text=response) + adpt = self._create_loaded_adapter() + resp = adpt.get('/') + self.assertEqual(resp.text, response) + + self._verify_endpoint_called(adpt) + self.assertTrue(adpt.auth.get_token_called) + self.assertRequestHeaderEqual('User-Agent', self.USER_AGENT) + + def test_setting_variables_on_get_endpoint(self): + adpt = self._create_loaded_adapter() + url = adpt.get_endpoint() + + self.assertEqual(self.TEST_URL, url) + self._verify_endpoint_called(adpt) + + def test_legacy_binding(self): + key = uuid.uuid4().hex + val = uuid.uuid4().hex + response = jsonutils.dumps({key: val}) + + self.stub_url('GET', text=response) + + auth = CalledAuthPlugin() + sess = client_session.Session(auth=auth) + adpt = adapter.LegacyJsonAdapter(sess, + service_type=self.SERVICE_TYPE, + user_agent=self.USER_AGENT) + + resp, body = adpt.get('/') + self.assertEqual(self.SERVICE_TYPE, + auth.endpoint_arguments['service_type']) + self.assertEqual(resp.text, response) + self.assertEqual(val, body[key]) + + def test_legacy_binding_non_json_resp(self): + response = uuid.uuid4().hex + self.stub_url('GET', text=response, + headers={'Content-Type': 'text/html'}) + + auth = CalledAuthPlugin() + sess = client_session.Session(auth=auth) + adpt = adapter.LegacyJsonAdapter(sess, + service_type=self.SERVICE_TYPE, + user_agent=self.USER_AGENT) + + resp, body = adpt.get('/') + self.assertEqual(self.SERVICE_TYPE, + auth.endpoint_arguments['service_type']) + self.assertEqual(resp.text, response) + self.assertIsNone(body) + + def test_methods(self): + sess = client_session.Session() + adpt = adapter.Adapter(sess) + url = 'http://url' + + for method in ['get', 'head', 'post', 'put', 'patch', 'delete']: + with mock.patch.object(adpt, 'request') as m: + getattr(adpt, method)(url) + m.assert_called_once_with(url, method.upper()) + + def test_setting_endpoint_override(self): + endpoint_override = 'http://overrideurl' + path = '/path' + endpoint_url = endpoint_override + path + + auth = CalledAuthPlugin() + sess = client_session.Session(auth=auth) + adpt = adapter.Adapter(sess, endpoint_override=endpoint_override) + + response = uuid.uuid4().hex + self.requests.get(endpoint_url, text=response) + + resp = adpt.get(path) + + self.assertEqual(response, resp.text) + self.assertEqual(endpoint_url, self.requests.last_request.url) + + self.assertEqual(endpoint_override, adpt.get_endpoint()) + + def test_adapter_invalidate(self): + auth = CalledAuthPlugin() + sess = client_session.Session() + adpt = adapter.Adapter(sess, auth=auth) + + adpt.invalidate() + + self.assertTrue(auth.invalidate_called) + + def test_adapter_get_token(self): + auth = CalledAuthPlugin() + sess = client_session.Session() + adpt = adapter.Adapter(sess, auth=auth) + + self.assertEqual(self.TEST_TOKEN, adpt.get_token()) + self.assertTrue(auth.get_token_called) + + def test_adapter_connect_retries(self): + retries = 2 + sess = client_session.Session() + adpt = adapter.Adapter(sess, connect_retries=retries) + + def _refused_error(request, context): + raise requests.exceptions.ConnectionError() + + self.stub_url('GET', text=_refused_error) + + with mock.patch('time.sleep') as m: + self.assertRaises(exceptions.ConnectionRefused, + adpt.get, self.TEST_URL) + self.assertEqual(retries, m.call_count) + + # we count retries so there will be one initial request + 2 retries + self.assertThat(self.requests.request_history, + matchers.HasLength(retries + 1)) + + def test_user_and_project_id(self): + auth = AuthPlugin() + sess = client_session.Session() + adpt = adapter.Adapter(sess, auth=auth) + + self.assertEqual(auth.TEST_USER_ID, adpt.get_user_id()) + self.assertEqual(auth.TEST_PROJECT_ID, adpt.get_project_id()) + + +class ConfLoadingTests(utils.TestCase): + + GROUP = 'sessiongroup' + + def setUp(self): + super(ConfLoadingTests, self).setUp() + + self.conf_fixture = self.useFixture(config.Config()) + client_session.Session.register_conf_options(self.conf_fixture.conf, + self.GROUP) + + def config(self, **kwargs): + kwargs['group'] = self.GROUP + self.conf_fixture.config(**kwargs) + + def get_session(self, **kwargs): + return client_session.Session.load_from_conf_options( + self.conf_fixture.conf, + self.GROUP, + **kwargs) + + def test_insecure_timeout(self): + self.config(insecure=True, timeout=5) + s = self.get_session() + + self.assertFalse(s.verify) + self.assertEqual(5, s.timeout) + + def test_client_certs(self): + cert = '/path/to/certfile' + key = '/path/to/keyfile' + + self.config(certfile=cert, keyfile=key) + s = self.get_session() + + self.assertTrue(s.verify) + self.assertEqual((cert, key), s.cert) + + def test_cacert(self): + cafile = '/path/to/cacert' + + self.config(cafile=cafile) + s = self.get_session() + + self.assertEqual(cafile, s.verify) + + def test_deprecated(self): + def new_deprecated(): + return cfg.DeprecatedOpt(uuid.uuid4().hex, group=uuid.uuid4().hex) + + opt_names = ['cafile', 'certfile', 'keyfile', 'insecure', 'timeout'] + depr = dict([(n, [new_deprecated()]) for n in opt_names]) + opts = client_session.Session.get_conf_options(deprecated_opts=depr) + + self.assertThat(opt_names, matchers.HasLength(len(opts))) + for opt in opts: + self.assertIn(depr[opt.name][0], opt.deprecated_opts) + + +class CliLoadingTests(utils.TestCase): + + def setUp(self): + super(CliLoadingTests, self).setUp() + + self.parser = argparse.ArgumentParser() + client_session.Session.register_cli_options(self.parser) + + def get_session(self, val, **kwargs): + args = self.parser.parse_args(val.split()) + return client_session.Session.load_from_cli_options(args, **kwargs) + + def test_insecure_timeout(self): + s = self.get_session('--insecure --timeout 5.5') + + self.assertFalse(s.verify) + self.assertEqual(5.5, s.timeout) + + def test_client_certs(self): + cert = '/path/to/certfile' + key = '/path/to/keyfile' + + s = self.get_session('--os-cert %s --os-key %s' % (cert, key)) + + self.assertTrue(s.verify) + self.assertEqual((cert, key), s.cert) + + def test_cacert(self): + cacert = '/path/to/cacert' + + s = self.get_session('--os-cacert %s' % cacert) + + self.assertEqual(cacert, s.verify) |
