diff options
| author | Jamie Lennox <jamielennox@redhat.com> | 2015-02-11 19:03:25 +1100 |
|---|---|---|
| committer | Jamie Lennox <jamielennox@redhat.com> | 2015-02-11 19:03:25 +1100 |
| commit | 6bd93179a2966f2b5c67e297628510ac73689fb3 (patch) | |
| tree | faf3a93a16fb49b4a742f74b6fcdd20f8a0ebd0e /keystoneclient/tests/unit/utils.py | |
| parent | 58ac2de5d4a6b58e8bd5d430a04199a4d40427a8 (diff) | |
| download | python-keystoneclient-6bd93179a2966f2b5c67e297628510ac73689fb3.tar.gz | |
Move tests to the unit subdirectory
Move all the existing tests to the unit/ subdirectory. This gives us
some room to add a functional/ directory later with other tests.
Change-Id: I0fb8d5b628eb8ee1f35f05f42d0c0ac9f285e8c3
Implements: functional-testing
Diffstat (limited to 'keystoneclient/tests/unit/utils.py')
| -rw-r--r-- | keystoneclient/tests/unit/utils.py | 209 |
1 files changed, 209 insertions, 0 deletions
diff --git a/keystoneclient/tests/unit/utils.py b/keystoneclient/tests/unit/utils.py new file mode 100644 index 0000000..038f34c --- /dev/null +++ b/keystoneclient/tests/unit/utils.py @@ -0,0 +1,209 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import logging +import sys +import time +import uuid + +import fixtures +import mock +from mox3 import mox +from oslo_serialization import jsonutils +import requests +from requests_mock.contrib import fixture +import six +from six.moves.urllib import parse as urlparse +import testtools + + +class TestCase(testtools.TestCase): + + TEST_DOMAIN_ID = '1' + TEST_DOMAIN_NAME = 'aDomain' + TEST_GROUP_ID = uuid.uuid4().hex + TEST_ROLE_ID = uuid.uuid4().hex + TEST_TENANT_ID = '1' + TEST_TENANT_NAME = 'aTenant' + TEST_TOKEN = 'aToken' + TEST_TRUST_ID = 'aTrust' + TEST_USER = 'test' + TEST_USER_ID = uuid.uuid4().hex + + TEST_ROOT_URL = 'http://127.0.0.1:5000/' + + def setUp(self): + super(TestCase, self).setUp() + self.mox = mox.Mox() + self.logger = self.useFixture(fixtures.FakeLogger(level=logging.DEBUG)) + self.time_patcher = mock.patch.object(time, 'time', lambda: 1234) + self.time_patcher.start() + + self.requests = self.useFixture(fixture.Fixture()) + + def tearDown(self): + self.time_patcher.stop() + self.mox.UnsetStubs() + self.mox.VerifyAll() + super(TestCase, self).tearDown() + + def stub_url(self, method, parts=None, base_url=None, json=None, **kwargs): + if not base_url: + base_url = self.TEST_URL + + if json: + kwargs['text'] = jsonutils.dumps(json) + headers = kwargs.setdefault('headers', {}) + headers['Content-Type'] = 'application/json' + + if parts: + url = '/'.join([p.strip('/') for p in [base_url] + parts]) + else: + url = base_url + + url = url.replace("/?", "?") + self.requests.register_uri(method, url, **kwargs) + + def assertRequestBodyIs(self, body=None, json=None): + last_request_body = self.requests.last_request.body + if json: + val = jsonutils.loads(last_request_body) + self.assertEqual(json, val) + elif body: + self.assertEqual(body, last_request_body) + + def assertQueryStringIs(self, qs=''): + """Verify the QueryString matches what is expected. + + The qs parameter should be of the format \'foo=bar&abc=xyz\' + """ + expected = urlparse.parse_qs(qs, keep_blank_values=True) + parts = urlparse.urlparse(self.requests.last_request.url) + querystring = urlparse.parse_qs(parts.query, keep_blank_values=True) + self.assertEqual(expected, querystring) + + def assertQueryStringContains(self, **kwargs): + """Verify the query string contains the expected parameters. + + This method is used to verify that the query string for the most recent + request made contains all the parameters provided as ``kwargs``, and + that the value of each parameter contains the value for the kwarg. If + the value for the kwarg is an empty string (''), then all that's + verified is that the parameter is present. + + """ + parts = urlparse.urlparse(self.requests.last_request.url) + qs = urlparse.parse_qs(parts.query, keep_blank_values=True) + + for k, v in six.iteritems(kwargs): + self.assertIn(k, qs) + self.assertIn(v, qs[k]) + + def assertRequestHeaderEqual(self, name, val): + """Verify that the last request made contains a header and its value + + The request must have already been made. + """ + headers = self.requests.last_request.headers + self.assertEqual(headers.get(name), val) + + +if tuple(sys.version_info)[0:2] < (2, 7): + + def assertDictEqual(self, d1, d2, msg=None): + # Simple version taken from 2.7 + self.assertIsInstance(d1, dict, + 'First argument is not a dictionary') + self.assertIsInstance(d2, dict, + 'Second argument is not a dictionary') + if d1 != d2: + if msg: + self.fail(msg) + else: + standardMsg = '%r != %r' % (d1, d2) + self.fail(standardMsg) + + TestCase.assertDictEqual = assertDictEqual + + +class TestResponse(requests.Response): + """Class used to wrap requests.Response and provide some + convenience to initialize with a dict. + """ + + def __init__(self, data): + self._text = None + super(TestResponse, self).__init__() + if isinstance(data, dict): + self.status_code = data.get('status_code', 200) + headers = data.get('headers') + if headers: + self.headers.update(headers) + # Fake the text attribute to streamline Response creation + # _content is defined by requests.Response + self._content = data.get('text') + else: + self.status_code = data + + def __eq__(self, other): + return self.__dict__ == other.__dict__ + + @property + def text(self): + return self.content + + +class DisableModuleFixture(fixtures.Fixture): + """A fixture to provide support for unloading/disabling modules.""" + + def __init__(self, module, *args, **kw): + super(DisableModuleFixture, self).__init__(*args, **kw) + self.module = module + self._finders = [] + self._cleared_modules = {} + + def tearDown(self): + super(DisableModuleFixture, self).tearDown() + for finder in self._finders: + sys.meta_path.remove(finder) + sys.modules.update(self._cleared_modules) + + def clear_module(self): + cleared_modules = {} + for fullname in sys.modules.keys(): + if (fullname == self.module or + fullname.startswith(self.module + '.')): + cleared_modules[fullname] = sys.modules.pop(fullname) + return cleared_modules + + def setUp(self): + """Ensure ImportError for the specified module.""" + + super(DisableModuleFixture, self).setUp() + + # Clear 'module' references in sys.modules + self._cleared_modules.update(self.clear_module()) + + finder = NoModuleFinder(self.module) + self._finders.append(finder) + sys.meta_path.insert(0, finder) + + +class NoModuleFinder(object): + """Disallow further imports of 'module'.""" + + def __init__(self, module): + self.module = module + + def find_module(self, fullname, path): + if fullname == self.module or fullname.startswith(self.module + '.'): + raise ImportError |
