summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBrant Knudson <bknudson@us.ibm.com>2014-09-17 14:31:55 -0500
committerBrant Knudson <bknudson@us.ibm.com>2015-02-10 14:06:55 -0600
commitf302ed5fdbbf0925f59cf2ae13db680038d41ddd (patch)
treef82e8e32586d183307d9aa488dba80f2f49277c2
parentfb306836b411546db385274996a8cdaf6154aaf7 (diff)
downloadkeystonemiddleware-f302ed5fdbbf0925f59cf2ae13db680038d41ddd.tar.gz
Refactor extract class for signing directory
The signing directory code was scattered around the AuthProtocol class. To make the AuthProtocol class cleaner, the signing directory code is pulled into its own class. bp refactor-extract-class Change-Id: Ibc657bba234f480f0b4a051819768f413b7cafc5
-rw-r--r--keystonemiddleware/auth_token.py148
-rw-r--r--keystonemiddleware/tests/test_auth_token_middleware.py43
-rw-r--r--keystonemiddleware/tests/unit/__init__.py0
-rw-r--r--keystonemiddleware/tests/unit/test_auth_token.py140
4 files changed, 245 insertions, 86 deletions
diff --git a/keystonemiddleware/auth_token.py b/keystonemiddleware/auth_token.py
index 03dc5dd..4095ebc 100644
--- a/keystonemiddleware/auth_token.py
+++ b/keystonemiddleware/auth_token.py
@@ -355,6 +355,8 @@ CONF = cfg.CONF
CONF.register_opts(_OPTS, group=_AUTHTOKEN_GROUP)
auth.register_conf_options(CONF, _AUTHTOKEN_GROUP)
+_LOG = logging.getLogger(__name__)
+
_HEADER_TEMPLATE = {
'X%s-Domain-Id': 'domain_id',
'X%s-Domain-Name': 'domain_name',
@@ -692,6 +694,10 @@ class _UserAuthPlugin(base_identity.BaseIdentityPlugin):
class AuthProtocol(object):
"""Middleware that handles authenticating client calls."""
+ _SIGNING_CERT_FILE_NAME = 'signing_cert.pem'
+ _SIGNING_CA_FILE_NAME = 'cacert.pem'
+ _REVOKED_FILE_NAME = 'revoked.pem'
+
def __init__(self, app, conf):
self._LOG = logging.getLogger(conf.get('log_name', __name__))
self._LOG.info(_LI('Starting Keystone auth_token middleware'))
@@ -721,22 +727,8 @@ class AuthProtocol(object):
self._auth_uri = self._identity_server.auth_uri
- # signing
- self._signing_dirname = self._conf_get('signing_dir')
- if self._signing_dirname is None:
- self._signing_dirname = tempfile.mkdtemp(
- prefix='keystone-signing-')
- self._LOG.info(
- _LI('Using %s as cache directory for signing certificate'),
- self._signing_dirname)
- self._verify_signing_dir()
-
- val = '%s/signing_cert.pem' % self._signing_dirname
- self._signing_cert_file_name = val
- val = '%s/cacert.pem' % self._signing_dirname
- self._signing_ca_file_name = val
- val = '%s/revoked.pem' % self._signing_dirname
- self._revoked_file_name = val
+ self._signing_directory = _SigningDirectory(
+ directory_name=self._conf_get('signing_dir'), log=self._LOG)
self._token_cache = self._token_cache_factory()
self._token_revocation_list_prop = None
@@ -1184,8 +1176,12 @@ class AuthProtocol(object):
"""
def verify():
try:
- return cms.cms_verify(data, self._signing_cert_file_name,
- self._signing_ca_file_name,
+ signing_cert_path = self._signing_directory.calc_path(
+ self._SIGNING_CERT_FILE_NAME)
+ signing_ca_path = self._signing_directory.calc_path(
+ self._SIGNING_CA_FILE_NAME)
+ return cms.cms_verify(data, signing_cert_path,
+ signing_ca_path,
inform=inform).decode('utf-8')
except cms.subprocess.CalledProcessError as err:
self._LOG.warning(_LW('Verify error: %s'), err)
@@ -1227,30 +1223,15 @@ class AuthProtocol(object):
except TypeError:
raise InvalidToken(signed_text)
- def _verify_signing_dir(self):
- if os.path.exists(self._signing_dirname):
- if not os.access(self._signing_dirname, os.W_OK):
- raise ConfigurationError(
- _('unable to access signing_dir %s') %
- self._signing_dirname)
- uid = os.getuid()
- if os.stat(self._signing_dirname).st_uid != uid:
- self._LOG.warning(_LW('signing_dir is not owned by %s'), uid)
- current_mode = stat.S_IMODE(os.stat(self._signing_dirname).st_mode)
- if current_mode != stat.S_IRWXU:
- self._LOG.warning(
- _LW('signing_dir mode is %(mode)s instead of %(need)s'),
- {'mode': oct(current_mode), 'need': oct(stat.S_IRWXU)})
- else:
- os.makedirs(self._signing_dirname, stat.S_IRWXU)
-
@property
def _token_revocation_list_fetched_time(self):
if not self._token_revocation_list_fetched_time_prop:
# If the fetched list has been written to disk, use its
# modification time.
- if os.path.exists(self._revoked_file_name):
- mtime = os.path.getmtime(self._revoked_file_name)
+ revoked_file_name = self._signing_directory.calc_path(
+ self._REVOKED_FILE_NAME)
+ if os.path.exists(revoked_file_name):
+ mtime = os.path.getmtime(revoked_file_name)
fetched_time = datetime.datetime.utcfromtimestamp(mtime)
# Otherwise the list will need to be fetched.
else:
@@ -1271,32 +1252,12 @@ class AuthProtocol(object):
if list_is_current:
# Load the list from disk if required
if not self._token_revocation_list_prop:
- open_kwargs = {'encoding': 'utf-8'} if six.PY3 else {}
- with open(self._revoked_file_name, 'r', **open_kwargs) as f:
- self._token_revocation_list_prop = jsonutils.loads(
- f.read())
+ self._token_revocation_list_prop = jsonutils.loads(
+ self._signing_directory.read_file(self._REVOKED_FILE_NAME))
else:
self._token_revocation_list = self._fetch_revocation_list()
return self._token_revocation_list_prop
- def _atomic_write_to_signing_dir(self, file_name, value):
- # In Python2, encoding is slow so the following check avoids it if it
- # is not absolutely necessary.
- if isinstance(value, six.text_type):
- value = value.encode('utf-8')
-
- def _atomic_write(destination, data):
- with tempfile.NamedTemporaryFile(dir=self._signing_dirname,
- delete=False) as f:
- f.write(data)
- os.rename(f.name, destination)
-
- try:
- _atomic_write(file_name, value)
- except (OSError, IOError):
- self._verify_signing_dir()
- _atomic_write(file_name, value)
-
@_token_revocation_list.setter
def _token_revocation_list(self, value):
"""Save a revocation list to memory and to disk.
@@ -1306,20 +1267,20 @@ class AuthProtocol(object):
"""
self._token_revocation_list_prop = jsonutils.loads(value)
self._token_revocation_list_fetched_time = timeutils.utcnow()
- self._atomic_write_to_signing_dir(self._revoked_file_name, value)
+ self._signing_directory.write_file(self._REVOKED_FILE_NAME, value)
def _fetch_revocation_list(self):
revocation_list_data = self._identity_server.fetch_revocation_list()
return self._cms_verify(revocation_list_data)
def _fetch_signing_cert(self):
- self._atomic_write_to_signing_dir(
- self._signing_cert_file_name,
+ self._signing_directory.write_file(
+ self._SIGNING_CERT_FILE_NAME,
self._identity_server.fetch_signing_cert())
def _fetch_ca_cert(self):
- self._atomic_write_to_signing_dir(
- self._signing_ca_file_name,
+ self._signing_directory.write_file(
+ self._SIGNING_CA_FILE_NAME,
self._identity_server.fetch_ca_cert())
def _create_identity_server(self):
@@ -1682,6 +1643,65 @@ class _V3RequestStrategy(_RequestStrategy):
_REQUEST_STRATEGIES = [_V3RequestStrategy, _V2RequestStrategy]
+class _SigningDirectory(object):
+ def __init__(self, directory_name=None, log=None):
+ self._log = log or _LOG
+
+ if directory_name is None:
+ directory_name = tempfile.mkdtemp(prefix='keystone-signing-')
+ self._log.info(
+ _LI('Using %s as cache directory for signing certificate'),
+ directory_name)
+ self._directory_name = directory_name
+
+ self._verify_signing_dir()
+
+ def write_file(self, file_name, new_contents):
+
+ # In Python2, encoding is slow so the following check avoids it if it
+ # is not absolutely necessary.
+ if isinstance(new_contents, six.text_type):
+ new_contents = new_contents.encode('utf-8')
+
+ def _atomic_write():
+ with tempfile.NamedTemporaryFile(dir=self._directory_name,
+ delete=False) as f:
+ f.write(new_contents)
+ os.rename(f.name, self.calc_path(file_name))
+
+ try:
+ _atomic_write()
+ except (OSError, IOError):
+ self._verify_signing_dir()
+ _atomic_write()
+
+ def read_file(self, file_name):
+ path = self.calc_path(file_name)
+ open_kwargs = {'encoding': 'utf-8'} if six.PY3 else {}
+ with open(path, 'r', **open_kwargs) as f:
+ return f.read()
+
+ def calc_path(self, file_name):
+ return os.path.join(self._directory_name, file_name)
+
+ def _verify_signing_dir(self):
+ if os.path.isdir(self._directory_name):
+ if not os.access(self._directory_name, os.W_OK):
+ raise ConfigurationError(
+ _('unable to access signing_dir %s') %
+ self._directory_name)
+ uid = os.getuid()
+ if os.stat(self._directory_name).st_uid != uid:
+ self._log.warning(_LW('signing_dir is not owned by %s'), uid)
+ current_mode = stat.S_IMODE(os.stat(self._directory_name).st_mode)
+ if current_mode != stat.S_IRWXU:
+ self._log.warning(
+ _LW('signing_dir mode is %(mode)s instead of %(need)s'),
+ {'mode': oct(current_mode), 'need': oct(stat.S_IRWXU)})
+ else:
+ os.makedirs(self._directory_name, stat.S_IRWXU)
+
+
class _TokenCache(object):
"""Encapsulates the auth_token token cache functionality.
diff --git a/keystonemiddleware/tests/test_auth_token_middleware.py b/keystonemiddleware/tests/test_auth_token_middleware.py
index 0927abe..09e6a79 100644
--- a/keystonemiddleware/tests/test_auth_token_middleware.py
+++ b/keystonemiddleware/tests/test_auth_token_middleware.py
@@ -898,32 +898,22 @@ class CommonAuthTokenMiddlewareTest(object):
self.token_dict['signed_token_scoped_hash']])
self.assertIsValidJSON(text)
- def test_verify_signing_dir_create_while_missing(self):
- tmp_name = uuid.uuid4().hex
- test_parent_signing_dir = "/tmp/%s" % tmp_name
- self.middleware._signing_dirname = "/tmp/%s/%s" % ((tmp_name,) * 2)
- self.middleware._signing_cert_file_name = (
- "%s/test.pem" % self.middleware._signing_dirname)
- self.middleware._verify_signing_dir()
- # NOTE(wu_wenxiang): Verify if the signing dir was created as expected.
- self.assertTrue(os.path.isdir(self.middleware._signing_dirname))
- self.assertTrue(os.access(self.middleware._signing_dirname, os.W_OK))
- self.assertEqual(os.stat(self.middleware._signing_dirname).st_uid,
- os.getuid())
- self.assertEqual(
- stat.S_IMODE(os.stat(self.middleware._signing_dirname).st_mode),
- stat.S_IRWXU)
- shutil.rmtree(test_parent_signing_dir)
-
def test_get_token_revocation_list_fetched_time_returns_min(self):
self.middleware._token_revocation_list_fetched_time = None
- self.middleware._revoked_file_name = ''
+
+ # Get rid of the revoked file
+ revoked_path = self.middleware._signing_directory.calc_path(
+ auth_token.AuthProtocol._REVOKED_FILE_NAME)
+ os.remove(revoked_path)
+
self.assertEqual(self.middleware._token_revocation_list_fetched_time,
datetime.datetime.min)
def test_get_token_revocation_list_fetched_time_returns_mtime(self):
self.middleware._token_revocation_list_fetched_time = None
- mtime = os.path.getmtime(self.middleware._revoked_file_name)
+ revoked_path = self.middleware._signing_directory.calc_path(
+ auth_token.AuthProtocol._REVOKED_FILE_NAME)
+ mtime = os.path.getmtime(revoked_path)
fetched_time = datetime.datetime.utcfromtimestamp(mtime)
self.assertEqual(fetched_time,
self.middleware._token_revocation_list_fetched_time)
@@ -947,7 +937,12 @@ class CommonAuthTokenMiddlewareTest(object):
# auth_token uses v2 to fetch this, so don't allow the v3
# tests to override the fake http connection
self.middleware._token_revocation_list_fetched_time = None
- os.remove(self.middleware._revoked_file_name)
+
+ # Get rid of the revoked file
+ revoked_path = self.middleware._signing_directory.calc_path(
+ auth_token.AuthProtocol._REVOKED_FILE_NAME)
+ os.remove(revoked_path)
+
self.assertEqual(self.middleware._token_revocation_list,
self.examples.REVOCATION_LIST)
@@ -1427,7 +1422,9 @@ class V2CertDownloadMiddlewareTest(BaseAuthTokenMiddlewareTest,
self.requests.get(url, text=data)
self.middleware._fetch_signing_cert()
- with open(self.middleware._signing_cert_file_name, 'r') as f:
+ signing_cert_path = self.middleware._signing_directory.calc_path(
+ self.middleware._SIGNING_CERT_FILE_NAME)
+ with open(signing_cert_path, 'r') as f:
self.assertEqual(f.read(), data)
self.assertEqual(url, self.requests.last_request.url)
@@ -1438,7 +1435,9 @@ class V2CertDownloadMiddlewareTest(BaseAuthTokenMiddlewareTest,
self.requests.get(url, text=data)
self.middleware._fetch_ca_cert()
- with open(self.middleware._signing_ca_file_name, 'r') as f:
+ ca_file_path = self.middleware._signing_directory.calc_path(
+ self.middleware._SIGNING_CA_FILE_NAME)
+ with open(ca_file_path, 'r') as f:
self.assertEqual(f.read(), data)
self.assertEqual(url, self.requests.last_request.url)
diff --git a/keystonemiddleware/tests/unit/__init__.py b/keystonemiddleware/tests/unit/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/keystonemiddleware/tests/unit/__init__.py
diff --git a/keystonemiddleware/tests/unit/test_auth_token.py b/keystonemiddleware/tests/unit/test_auth_token.py
new file mode 100644
index 0000000..29f2dff
--- /dev/null
+++ b/keystonemiddleware/tests/unit/test_auth_token.py
@@ -0,0 +1,140 @@
+# Copyright 2014 IBM Corp.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import os
+import shutil
+import stat
+import uuid
+
+import testtools
+
+from keystonemiddleware import auth_token
+
+
+class SigningDirectoryTests(testtools.TestCase):
+
+ def test_directory_created_when_doesnt_exist(self):
+ # When _SigningDirectory is created, if the directory doesn't exist
+ # it's created with the expected permissions.
+ tmp_name = uuid.uuid4().hex
+ parent_directory = '/tmp/%s' % tmp_name
+ directory_name = '/tmp/%s/%s' % ((tmp_name,) * 2)
+
+ # Directories are created by __init__.
+ auth_token._SigningDirectory(directory_name)
+ self.addCleanup(shutil.rmtree, parent_directory)
+
+ self.assertTrue(os.path.isdir(directory_name))
+ self.assertTrue(os.access(directory_name, os.W_OK))
+ self.assertEqual(os.stat(directory_name).st_uid, os.getuid())
+ self.assertEqual(stat.S_IMODE(os.stat(directory_name).st_mode),
+ stat.S_IRWXU)
+
+ def test_use_directory_already_exists(self):
+ # The directory can already exist.
+
+ tmp_name = uuid.uuid4().hex
+ parent_directory = '/tmp/%s' % tmp_name
+ directory_name = '/tmp/%s/%s' % ((tmp_name,) * 2)
+ os.makedirs(directory_name, stat.S_IRWXU)
+ self.addCleanup(shutil.rmtree, parent_directory)
+
+ auth_token._SigningDirectory(directory_name)
+
+ def test_write_file(self):
+ # write_file when the file doesn't exist creates the file.
+
+ signing_directory = auth_token._SigningDirectory()
+ self.addCleanup(shutil.rmtree, signing_directory._directory_name)
+
+ file_name = self.getUniqueString()
+ contents = self.getUniqueString()
+ signing_directory.write_file(file_name, contents)
+
+ file_path = signing_directory.calc_path(file_name)
+ with open(file_path) as f:
+ actual_contents = f.read()
+
+ self.assertEqual(contents, actual_contents)
+
+ def test_replace_file(self):
+ # write_file when the file already exists overwrites it.
+
+ signing_directory = auth_token._SigningDirectory()
+ self.addCleanup(shutil.rmtree, signing_directory._directory_name)
+
+ file_name = self.getUniqueString()
+ orig_contents = self.getUniqueString()
+ signing_directory.write_file(file_name, orig_contents)
+
+ new_contents = self.getUniqueString()
+ signing_directory.write_file(file_name, new_contents)
+
+ file_path = signing_directory.calc_path(file_name)
+ with open(file_path) as f:
+ actual_contents = f.read()
+
+ self.assertEqual(new_contents, actual_contents)
+
+ def test_recreate_directory(self):
+ # If the original directory is lost, it gets recreated when a file
+ # is written.
+
+ signing_directory = auth_token._SigningDirectory()
+ self.addCleanup(shutil.rmtree, signing_directory._directory_name)
+
+ # Delete the directory.
+ shutil.rmtree(signing_directory._directory_name)
+
+ file_name = self.getUniqueString()
+ contents = self.getUniqueString()
+ signing_directory.write_file(file_name, contents)
+
+ actual_contents = signing_directory.read_file(file_name)
+ self.assertEqual(contents, actual_contents)
+
+ def test_read_file(self):
+ # Can read a file that was written.
+
+ signing_directory = auth_token._SigningDirectory()
+ self.addCleanup(shutil.rmtree, signing_directory._directory_name)
+
+ file_name = self.getUniqueString()
+ contents = self.getUniqueString()
+ signing_directory.write_file(file_name, contents)
+
+ actual_contents = signing_directory.read_file(file_name)
+
+ self.assertEqual(contents, actual_contents)
+
+ def test_read_file_doesnt_exist(self):
+ # Show what happens when try to read a file that wasn't written.
+
+ signing_directory = auth_token._SigningDirectory()
+ self.addCleanup(shutil.rmtree, signing_directory._directory_name)
+
+ file_name = self.getUniqueString()
+ self.assertRaises(IOError, signing_directory.read_file, file_name)
+
+ def test_calc_path(self):
+ # calc_path returns the actual filename built from the directory name.
+
+ signing_directory = auth_token._SigningDirectory()
+ self.addCleanup(shutil.rmtree, signing_directory._directory_name)
+
+ file_name = self.getUniqueString()
+ actual_path = signing_directory.calc_path(file_name)
+ expected_path = os.path.join(signing_directory._directory_name,
+ file_name)
+ self.assertEqual(expected_path, actual_path)