summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2015-02-19 05:50:16 +0000
committerGerrit Code Review <review@openstack.org>2015-02-19 05:50:16 +0000
commit7e277c96a2605c6d85d0ff121e5578607cae4895 (patch)
tree1cc804456cf08af1cd40a97bdb0c2eb6745dd14b
parentbcf777580294e19f4c470d47c68a174af734b4ee (diff)
parentf302ed5fdbbf0925f59cf2ae13db680038d41ddd (diff)
downloadkeystonemiddleware-7e277c96a2605c6d85d0ff121e5578607cae4895.tar.gz
Merge "Refactor extract class for signing directory"
-rw-r--r--keystonemiddleware/auth_token.py148
-rw-r--r--keystonemiddleware/tests/test_auth_token_middleware.py43
-rw-r--r--keystonemiddleware/tests/unit/__init__.py0
-rw-r--r--keystonemiddleware/tests/unit/test_auth_token.py140
4 files changed, 245 insertions, 86 deletions
diff --git a/keystonemiddleware/auth_token.py b/keystonemiddleware/auth_token.py
index ea3b80e..2d8da52 100644
--- a/keystonemiddleware/auth_token.py
+++ b/keystonemiddleware/auth_token.py
@@ -355,6 +355,8 @@ CONF = cfg.CONF
CONF.register_opts(_OPTS, group=_AUTHTOKEN_GROUP)
auth.register_conf_options(CONF, _AUTHTOKEN_GROUP)
+_LOG = logging.getLogger(__name__)
+
_HEADER_TEMPLATE = {
'X%s-Domain-Id': 'domain_id',
'X%s-Domain-Name': 'domain_name',
@@ -826,6 +828,10 @@ class _UserAuthPlugin(base_identity.BaseIdentityPlugin):
class AuthProtocol(object):
"""Middleware that handles authenticating client calls."""
+ _SIGNING_CERT_FILE_NAME = 'signing_cert.pem'
+ _SIGNING_CA_FILE_NAME = 'cacert.pem'
+ _REVOKED_FILE_NAME = 'revoked.pem'
+
def __init__(self, app, conf):
self._LOG = logging.getLogger(conf.get('log_name', __name__))
self._LOG.info(_LI('Starting Keystone auth_token middleware'))
@@ -855,22 +861,8 @@ class AuthProtocol(object):
self._auth_uri = self._identity_server.auth_uri
- # signing
- self._signing_dirname = self._conf_get('signing_dir')
- if self._signing_dirname is None:
- self._signing_dirname = tempfile.mkdtemp(
- prefix='keystone-signing-')
- self._LOG.info(
- _LI('Using %s as cache directory for signing certificate'),
- self._signing_dirname)
- self._verify_signing_dir()
-
- val = '%s/signing_cert.pem' % self._signing_dirname
- self._signing_cert_file_name = val
- val = '%s/cacert.pem' % self._signing_dirname
- self._signing_ca_file_name = val
- val = '%s/revoked.pem' % self._signing_dirname
- self._revoked_file_name = val
+ self._signing_directory = _SigningDirectory(
+ directory_name=self._conf_get('signing_dir'), log=self._LOG)
self._token_cache = self._token_cache_factory()
self._token_revocation_list_prop = None
@@ -1327,8 +1319,12 @@ class AuthProtocol(object):
"""
def verify():
try:
- return cms.cms_verify(data, self._signing_cert_file_name,
- self._signing_ca_file_name,
+ signing_cert_path = self._signing_directory.calc_path(
+ self._SIGNING_CERT_FILE_NAME)
+ signing_ca_path = self._signing_directory.calc_path(
+ self._SIGNING_CA_FILE_NAME)
+ return cms.cms_verify(data, signing_cert_path,
+ signing_ca_path,
inform=inform).decode('utf-8')
except cms.subprocess.CalledProcessError as err:
self._LOG.warning(_LW('Verify error: %s'), err)
@@ -1370,30 +1366,15 @@ class AuthProtocol(object):
except TypeError:
raise InvalidToken(signed_text)
- def _verify_signing_dir(self):
- if os.path.exists(self._signing_dirname):
- if not os.access(self._signing_dirname, os.W_OK):
- raise ConfigurationError(
- _('unable to access signing_dir %s') %
- self._signing_dirname)
- uid = os.getuid()
- if os.stat(self._signing_dirname).st_uid != uid:
- self._LOG.warning(_LW('signing_dir is not owned by %s'), uid)
- current_mode = stat.S_IMODE(os.stat(self._signing_dirname).st_mode)
- if current_mode != stat.S_IRWXU:
- self._LOG.warning(
- _LW('signing_dir mode is %(mode)s instead of %(need)s'),
- {'mode': oct(current_mode), 'need': oct(stat.S_IRWXU)})
- else:
- os.makedirs(self._signing_dirname, stat.S_IRWXU)
-
@property
def _token_revocation_list_fetched_time(self):
if not self._token_revocation_list_fetched_time_prop:
# If the fetched list has been written to disk, use its
# modification time.
- if os.path.exists(self._revoked_file_name):
- mtime = os.path.getmtime(self._revoked_file_name)
+ revoked_file_name = self._signing_directory.calc_path(
+ self._REVOKED_FILE_NAME)
+ if os.path.exists(revoked_file_name):
+ mtime = os.path.getmtime(revoked_file_name)
fetched_time = datetime.datetime.utcfromtimestamp(mtime)
# Otherwise the list will need to be fetched.
else:
@@ -1414,32 +1395,12 @@ class AuthProtocol(object):
if list_is_current:
# Load the list from disk if required
if not self._token_revocation_list_prop:
- open_kwargs = {'encoding': 'utf-8'} if six.PY3 else {}
- with open(self._revoked_file_name, 'r', **open_kwargs) as f:
- self._token_revocation_list_prop = jsonutils.loads(
- f.read())
+ self._token_revocation_list_prop = jsonutils.loads(
+ self._signing_directory.read_file(self._REVOKED_FILE_NAME))
else:
self._token_revocation_list = self._fetch_revocation_list()
return self._token_revocation_list_prop
- def _atomic_write_to_signing_dir(self, file_name, value):
- # In Python2, encoding is slow so the following check avoids it if it
- # is not absolutely necessary.
- if isinstance(value, six.text_type):
- value = value.encode('utf-8')
-
- def _atomic_write(destination, data):
- with tempfile.NamedTemporaryFile(dir=self._signing_dirname,
- delete=False) as f:
- f.write(data)
- os.rename(f.name, destination)
-
- try:
- _atomic_write(file_name, value)
- except (OSError, IOError):
- self._verify_signing_dir()
- _atomic_write(file_name, value)
-
@_token_revocation_list.setter
def _token_revocation_list(self, value):
"""Save a revocation list to memory and to disk.
@@ -1449,20 +1410,20 @@ class AuthProtocol(object):
"""
self._token_revocation_list_prop = jsonutils.loads(value)
self._token_revocation_list_fetched_time = timeutils.utcnow()
- self._atomic_write_to_signing_dir(self._revoked_file_name, value)
+ self._signing_directory.write_file(self._REVOKED_FILE_NAME, value)
def _fetch_revocation_list(self):
revocation_list_data = self._identity_server.fetch_revocation_list()
return self._cms_verify(revocation_list_data)
def _fetch_signing_cert(self):
- self._atomic_write_to_signing_dir(
- self._signing_cert_file_name,
+ self._signing_directory.write_file(
+ self._SIGNING_CERT_FILE_NAME,
self._identity_server.fetch_signing_cert())
def _fetch_ca_cert(self):
- self._atomic_write_to_signing_dir(
- self._signing_ca_file_name,
+ self._signing_directory.write_file(
+ self._SIGNING_CA_FILE_NAME,
self._identity_server.fetch_ca_cert())
def _create_identity_server(self):
@@ -1825,6 +1786,65 @@ class _V3RequestStrategy(_RequestStrategy):
_REQUEST_STRATEGIES = [_V3RequestStrategy, _V2RequestStrategy]
+class _SigningDirectory(object):
+ def __init__(self, directory_name=None, log=None):
+ self._log = log or _LOG
+
+ if directory_name is None:
+ directory_name = tempfile.mkdtemp(prefix='keystone-signing-')
+ self._log.info(
+ _LI('Using %s as cache directory for signing certificate'),
+ directory_name)
+ self._directory_name = directory_name
+
+ self._verify_signing_dir()
+
+ def write_file(self, file_name, new_contents):
+
+ # In Python2, encoding is slow so the following check avoids it if it
+ # is not absolutely necessary.
+ if isinstance(new_contents, six.text_type):
+ new_contents = new_contents.encode('utf-8')
+
+ def _atomic_write():
+ with tempfile.NamedTemporaryFile(dir=self._directory_name,
+ delete=False) as f:
+ f.write(new_contents)
+ os.rename(f.name, self.calc_path(file_name))
+
+ try:
+ _atomic_write()
+ except (OSError, IOError):
+ self._verify_signing_dir()
+ _atomic_write()
+
+ def read_file(self, file_name):
+ path = self.calc_path(file_name)
+ open_kwargs = {'encoding': 'utf-8'} if six.PY3 else {}
+ with open(path, 'r', **open_kwargs) as f:
+ return f.read()
+
+ def calc_path(self, file_name):
+ return os.path.join(self._directory_name, file_name)
+
+ def _verify_signing_dir(self):
+ if os.path.isdir(self._directory_name):
+ if not os.access(self._directory_name, os.W_OK):
+ raise ConfigurationError(
+ _('unable to access signing_dir %s') %
+ self._directory_name)
+ uid = os.getuid()
+ if os.stat(self._directory_name).st_uid != uid:
+ self._log.warning(_LW('signing_dir is not owned by %s'), uid)
+ current_mode = stat.S_IMODE(os.stat(self._directory_name).st_mode)
+ if current_mode != stat.S_IRWXU:
+ self._log.warning(
+ _LW('signing_dir mode is %(mode)s instead of %(need)s'),
+ {'mode': oct(current_mode), 'need': oct(stat.S_IRWXU)})
+ else:
+ os.makedirs(self._directory_name, stat.S_IRWXU)
+
+
class _TokenCache(object):
"""Encapsulates the auth_token token cache functionality.
diff --git a/keystonemiddleware/tests/test_auth_token_middleware.py b/keystonemiddleware/tests/test_auth_token_middleware.py
index fce882a..cd75b4b 100644
--- a/keystonemiddleware/tests/test_auth_token_middleware.py
+++ b/keystonemiddleware/tests/test_auth_token_middleware.py
@@ -898,32 +898,22 @@ class CommonAuthTokenMiddlewareTest(object):
self.token_dict['signed_token_scoped_hash']])
self.assertIsValidJSON(text)
- def test_verify_signing_dir_create_while_missing(self):
- tmp_name = uuid.uuid4().hex
- test_parent_signing_dir = "/tmp/%s" % tmp_name
- self.middleware._signing_dirname = "/tmp/%s/%s" % ((tmp_name,) * 2)
- self.middleware._signing_cert_file_name = (
- "%s/test.pem" % self.middleware._signing_dirname)
- self.middleware._verify_signing_dir()
- # NOTE(wu_wenxiang): Verify if the signing dir was created as expected.
- self.assertTrue(os.path.isdir(self.middleware._signing_dirname))
- self.assertTrue(os.access(self.middleware._signing_dirname, os.W_OK))
- self.assertEqual(os.stat(self.middleware._signing_dirname).st_uid,
- os.getuid())
- self.assertEqual(
- stat.S_IMODE(os.stat(self.middleware._signing_dirname).st_mode),
- stat.S_IRWXU)
- shutil.rmtree(test_parent_signing_dir)
-
def test_get_token_revocation_list_fetched_time_returns_min(self):
self.middleware._token_revocation_list_fetched_time = None
- self.middleware._revoked_file_name = ''
+
+ # Get rid of the revoked file
+ revoked_path = self.middleware._signing_directory.calc_path(
+ auth_token.AuthProtocol._REVOKED_FILE_NAME)
+ os.remove(revoked_path)
+
self.assertEqual(self.middleware._token_revocation_list_fetched_time,
datetime.datetime.min)
def test_get_token_revocation_list_fetched_time_returns_mtime(self):
self.middleware._token_revocation_list_fetched_time = None
- mtime = os.path.getmtime(self.middleware._revoked_file_name)
+ revoked_path = self.middleware._signing_directory.calc_path(
+ auth_token.AuthProtocol._REVOKED_FILE_NAME)
+ mtime = os.path.getmtime(revoked_path)
fetched_time = datetime.datetime.utcfromtimestamp(mtime)
self.assertEqual(fetched_time,
self.middleware._token_revocation_list_fetched_time)
@@ -947,7 +937,12 @@ class CommonAuthTokenMiddlewareTest(object):
# auth_token uses v2 to fetch this, so don't allow the v3
# tests to override the fake http connection
self.middleware._token_revocation_list_fetched_time = None
- os.remove(self.middleware._revoked_file_name)
+
+ # Get rid of the revoked file
+ revoked_path = self.middleware._signing_directory.calc_path(
+ auth_token.AuthProtocol._REVOKED_FILE_NAME)
+ os.remove(revoked_path)
+
self.assertEqual(self.middleware._token_revocation_list,
self.examples.REVOCATION_LIST)
@@ -1431,7 +1426,9 @@ class V2CertDownloadMiddlewareTest(BaseAuthTokenMiddlewareTest,
self.requests.get(url, text=data)
self.middleware._fetch_signing_cert()
- with open(self.middleware._signing_cert_file_name, 'r') as f:
+ signing_cert_path = self.middleware._signing_directory.calc_path(
+ self.middleware._SIGNING_CERT_FILE_NAME)
+ with open(signing_cert_path, 'r') as f:
self.assertEqual(f.read(), data)
self.assertEqual(url, self.requests.last_request.url)
@@ -1442,7 +1439,9 @@ class V2CertDownloadMiddlewareTest(BaseAuthTokenMiddlewareTest,
self.requests.get(url, text=data)
self.middleware._fetch_ca_cert()
- with open(self.middleware._signing_ca_file_name, 'r') as f:
+ ca_file_path = self.middleware._signing_directory.calc_path(
+ self.middleware._SIGNING_CA_FILE_NAME)
+ with open(ca_file_path, 'r') as f:
self.assertEqual(f.read(), data)
self.assertEqual(url, self.requests.last_request.url)
diff --git a/keystonemiddleware/tests/unit/__init__.py b/keystonemiddleware/tests/unit/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/keystonemiddleware/tests/unit/__init__.py
diff --git a/keystonemiddleware/tests/unit/test_auth_token.py b/keystonemiddleware/tests/unit/test_auth_token.py
new file mode 100644
index 0000000..29f2dff
--- /dev/null
+++ b/keystonemiddleware/tests/unit/test_auth_token.py
@@ -0,0 +1,140 @@
+# Copyright 2014 IBM Corp.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import os
+import shutil
+import stat
+import uuid
+
+import testtools
+
+from keystonemiddleware import auth_token
+
+
+class SigningDirectoryTests(testtools.TestCase):
+
+ def test_directory_created_when_doesnt_exist(self):
+ # When _SigningDirectory is created, if the directory doesn't exist
+ # it's created with the expected permissions.
+ tmp_name = uuid.uuid4().hex
+ parent_directory = '/tmp/%s' % tmp_name
+ directory_name = '/tmp/%s/%s' % ((tmp_name,) * 2)
+
+ # Directories are created by __init__.
+ auth_token._SigningDirectory(directory_name)
+ self.addCleanup(shutil.rmtree, parent_directory)
+
+ self.assertTrue(os.path.isdir(directory_name))
+ self.assertTrue(os.access(directory_name, os.W_OK))
+ self.assertEqual(os.stat(directory_name).st_uid, os.getuid())
+ self.assertEqual(stat.S_IMODE(os.stat(directory_name).st_mode),
+ stat.S_IRWXU)
+
+ def test_use_directory_already_exists(self):
+ # The directory can already exist.
+
+ tmp_name = uuid.uuid4().hex
+ parent_directory = '/tmp/%s' % tmp_name
+ directory_name = '/tmp/%s/%s' % ((tmp_name,) * 2)
+ os.makedirs(directory_name, stat.S_IRWXU)
+ self.addCleanup(shutil.rmtree, parent_directory)
+
+ auth_token._SigningDirectory(directory_name)
+
+ def test_write_file(self):
+ # write_file when the file doesn't exist creates the file.
+
+ signing_directory = auth_token._SigningDirectory()
+ self.addCleanup(shutil.rmtree, signing_directory._directory_name)
+
+ file_name = self.getUniqueString()
+ contents = self.getUniqueString()
+ signing_directory.write_file(file_name, contents)
+
+ file_path = signing_directory.calc_path(file_name)
+ with open(file_path) as f:
+ actual_contents = f.read()
+
+ self.assertEqual(contents, actual_contents)
+
+ def test_replace_file(self):
+ # write_file when the file already exists overwrites it.
+
+ signing_directory = auth_token._SigningDirectory()
+ self.addCleanup(shutil.rmtree, signing_directory._directory_name)
+
+ file_name = self.getUniqueString()
+ orig_contents = self.getUniqueString()
+ signing_directory.write_file(file_name, orig_contents)
+
+ new_contents = self.getUniqueString()
+ signing_directory.write_file(file_name, new_contents)
+
+ file_path = signing_directory.calc_path(file_name)
+ with open(file_path) as f:
+ actual_contents = f.read()
+
+ self.assertEqual(new_contents, actual_contents)
+
+ def test_recreate_directory(self):
+ # If the original directory is lost, it gets recreated when a file
+ # is written.
+
+ signing_directory = auth_token._SigningDirectory()
+ self.addCleanup(shutil.rmtree, signing_directory._directory_name)
+
+ # Delete the directory.
+ shutil.rmtree(signing_directory._directory_name)
+
+ file_name = self.getUniqueString()
+ contents = self.getUniqueString()
+ signing_directory.write_file(file_name, contents)
+
+ actual_contents = signing_directory.read_file(file_name)
+ self.assertEqual(contents, actual_contents)
+
+ def test_read_file(self):
+ # Can read a file that was written.
+
+ signing_directory = auth_token._SigningDirectory()
+ self.addCleanup(shutil.rmtree, signing_directory._directory_name)
+
+ file_name = self.getUniqueString()
+ contents = self.getUniqueString()
+ signing_directory.write_file(file_name, contents)
+
+ actual_contents = signing_directory.read_file(file_name)
+
+ self.assertEqual(contents, actual_contents)
+
+ def test_read_file_doesnt_exist(self):
+ # Show what happens when try to read a file that wasn't written.
+
+ signing_directory = auth_token._SigningDirectory()
+ self.addCleanup(shutil.rmtree, signing_directory._directory_name)
+
+ file_name = self.getUniqueString()
+ self.assertRaises(IOError, signing_directory.read_file, file_name)
+
+ def test_calc_path(self):
+ # calc_path returns the actual filename built from the directory name.
+
+ signing_directory = auth_token._SigningDirectory()
+ self.addCleanup(shutil.rmtree, signing_directory._directory_name)
+
+ file_name = self.getUniqueString()
+ actual_path = signing_directory.calc_path(file_name)
+ expected_path = os.path.join(signing_directory._directory_name,
+ file_name)
+ self.assertEqual(expected_path, actual_path)