summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJamie Lennox <jamielennox@redhat.com>2015-02-19 15:36:30 +1100
committerJamie Lennox <jamielennox@redhat.com>2015-02-23 14:20:40 +1100
commitca1cf28bee4f8b69494a2e00fa6bcdf87911e726 (patch)
treef825aeada7ecc3e4c4c849f3bd1e757e634578cb
parent96579f7c26efbf1c4fd8c5b326cc9288b17525e4 (diff)
downloadkeystonemiddleware-ca1cf28bee4f8b69494a2e00fa6bcdf87911e726.tar.gz
Extract SigningDirectory into file
Extract the SigningDirectory class into its own file and its own test files. Change-Id: I48631c9abea06da81409b6bd805e0f68bfe87a9f Implements: bp refactor-extract-module
-rw-r--r--keystonemiddleware/auth_token/__init__.py64
-rw-r--r--keystonemiddleware/auth_token/_signing_dir.py83
-rw-r--r--keystonemiddleware/tests/auth_token/test_signing_dir.py138
-rw-r--r--keystonemiddleware/tests/unit/test_auth_token.py123
4 files changed, 225 insertions, 183 deletions
diff --git a/keystonemiddleware/auth_token/__init__.py b/keystonemiddleware/auth_token/__init__.py
index ed0a5b6..0d4b41e 100644
--- a/keystonemiddleware/auth_token/__init__.py
+++ b/keystonemiddleware/auth_token/__init__.py
@@ -175,8 +175,6 @@ import contextlib
import datetime
import logging
import os
-import stat
-import tempfile
from keystoneclient import access
from keystoneclient import adapter
@@ -196,6 +194,7 @@ from six.moves import urllib
from keystonemiddleware import _memcache_crypt as memcache_crypt
from keystonemiddleware.auth_token import _exceptions as exc
+from keystonemiddleware.auth_token import _signing_dir
from keystonemiddleware.i18n import _, _LC, _LE, _LI, _LW
from keystonemiddleware.openstack.common import memorycache
@@ -845,7 +844,7 @@ class AuthProtocol(object):
self._auth_uri = self._identity_server.auth_uri
- self._signing_directory = _SigningDirectory(
+ self._signing_directory = _signing_dir.SigningDirectory(
directory_name=self._conf_get('signing_dir'), log=self._LOG)
self._token_cache = self._token_cache_factory()
@@ -1780,65 +1779,6 @@ class _Revocations(object):
raise exc.InvalidToken(_('Token has been revoked'))
-class _SigningDirectory(object):
- def __init__(self, directory_name=None, log=None):
- self._log = log or _LOG
-
- if directory_name is None:
- directory_name = tempfile.mkdtemp(prefix='keystone-signing-')
- self._log.info(
- _LI('Using %s as cache directory for signing certificate'),
- directory_name)
- self._directory_name = directory_name
-
- self._verify_signing_dir()
-
- def write_file(self, file_name, new_contents):
-
- # In Python2, encoding is slow so the following check avoids it if it
- # is not absolutely necessary.
- if isinstance(new_contents, six.text_type):
- new_contents = new_contents.encode('utf-8')
-
- def _atomic_write():
- with tempfile.NamedTemporaryFile(dir=self._directory_name,
- delete=False) as f:
- f.write(new_contents)
- os.rename(f.name, self.calc_path(file_name))
-
- try:
- _atomic_write()
- except (OSError, IOError):
- self._verify_signing_dir()
- _atomic_write()
-
- def read_file(self, file_name):
- path = self.calc_path(file_name)
- open_kwargs = {'encoding': 'utf-8'} if six.PY3 else {}
- with open(path, 'r', **open_kwargs) as f:
- return f.read()
-
- def calc_path(self, file_name):
- return os.path.join(self._directory_name, file_name)
-
- def _verify_signing_dir(self):
- if os.path.isdir(self._directory_name):
- if not os.access(self._directory_name, os.W_OK):
- raise exc.ConfigurationError(
- _('unable to access signing_dir %s') %
- self._directory_name)
- uid = os.getuid()
- if os.stat(self._directory_name).st_uid != uid:
- self._log.warning(_LW('signing_dir is not owned by %s'), uid)
- current_mode = stat.S_IMODE(os.stat(self._directory_name).st_mode)
- if current_mode != stat.S_IRWXU:
- self._log.warning(
- _LW('signing_dir mode is %(mode)s instead of %(need)s'),
- {'mode': oct(current_mode), 'need': oct(stat.S_IRWXU)})
- else:
- os.makedirs(self._directory_name, stat.S_IRWXU)
-
-
class _TokenCache(object):
"""Encapsulates the auth_token token cache functionality.
diff --git a/keystonemiddleware/auth_token/_signing_dir.py b/keystonemiddleware/auth_token/_signing_dir.py
new file mode 100644
index 0000000..f8b1a41
--- /dev/null
+++ b/keystonemiddleware/auth_token/_signing_dir.py
@@ -0,0 +1,83 @@
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import logging
+import os
+import stat
+import tempfile
+
+import six
+
+from keystonemiddleware.auth_token import _exceptions as exc
+from keystonemiddleware.i18n import _, _LI, _LW
+
+_LOG = logging.getLogger(__name__)
+
+
+class SigningDirectory(object):
+
+ def __init__(self, directory_name=None, log=None):
+ self._log = log or _LOG
+
+ if directory_name is None:
+ directory_name = tempfile.mkdtemp(prefix='keystone-signing-')
+ self._log.info(
+ _LI('Using %s as cache directory for signing certificate'),
+ directory_name)
+ self._directory_name = directory_name
+
+ self._verify_signing_dir()
+
+ def write_file(self, file_name, new_contents):
+
+ # In Python2, encoding is slow so the following check avoids it if it
+ # is not absolutely necessary.
+ if isinstance(new_contents, six.text_type):
+ new_contents = new_contents.encode('utf-8')
+
+ def _atomic_write():
+ with tempfile.NamedTemporaryFile(dir=self._directory_name,
+ delete=False) as f:
+ f.write(new_contents)
+ os.rename(f.name, self.calc_path(file_name))
+
+ try:
+ _atomic_write()
+ except (OSError, IOError):
+ self._verify_signing_dir()
+ _atomic_write()
+
+ def read_file(self, file_name):
+ path = self.calc_path(file_name)
+ open_kwargs = {'encoding': 'utf-8'} if six.PY3 else {}
+ with open(path, 'r', **open_kwargs) as f:
+ return f.read()
+
+ def calc_path(self, file_name):
+ return os.path.join(self._directory_name, file_name)
+
+ def _verify_signing_dir(self):
+ if os.path.isdir(self._directory_name):
+ if not os.access(self._directory_name, os.W_OK):
+ raise exc.ConfigurationError(
+ _('unable to access signing_dir %s') %
+ self._directory_name)
+ uid = os.getuid()
+ if os.stat(self._directory_name).st_uid != uid:
+ self._log.warning(_LW('signing_dir is not owned by %s'), uid)
+ current_mode = stat.S_IMODE(os.stat(self._directory_name).st_mode)
+ if current_mode != stat.S_IRWXU:
+ self._log.warning(
+ _LW('signing_dir mode is %(mode)s instead of %(need)s'),
+ {'mode': oct(current_mode), 'need': oct(stat.S_IRWXU)})
+ else:
+ os.makedirs(self._directory_name, stat.S_IRWXU)
diff --git a/keystonemiddleware/tests/auth_token/test_signing_dir.py b/keystonemiddleware/tests/auth_token/test_signing_dir.py
new file mode 100644
index 0000000..bef6274
--- /dev/null
+++ b/keystonemiddleware/tests/auth_token/test_signing_dir.py
@@ -0,0 +1,138 @@
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import os
+import shutil
+import stat
+import uuid
+
+import testtools
+
+from keystonemiddleware.auth_token import _signing_dir
+
+
+class SigningDirectoryTests(testtools.TestCase):
+
+ def test_directory_created_when_doesnt_exist(self):
+ # When _SigningDirectory is created, if the directory doesn't exist
+ # it's created with the expected permissions.
+ tmp_name = uuid.uuid4().hex
+ parent_directory = '/tmp/%s' % tmp_name
+ directory_name = '/tmp/%s/%s' % ((tmp_name,) * 2)
+
+ # Directories are created by __init__.
+ _signing_dir.SigningDirectory(directory_name)
+ self.addCleanup(shutil.rmtree, parent_directory)
+
+ self.assertTrue(os.path.isdir(directory_name))
+ self.assertTrue(os.access(directory_name, os.W_OK))
+ self.assertEqual(os.stat(directory_name).st_uid, os.getuid())
+ self.assertEqual(stat.S_IMODE(os.stat(directory_name).st_mode),
+ stat.S_IRWXU)
+
+ def test_use_directory_already_exists(self):
+ # The directory can already exist.
+
+ tmp_name = uuid.uuid4().hex
+ parent_directory = '/tmp/%s' % tmp_name
+ directory_name = '/tmp/%s/%s' % ((tmp_name,) * 2)
+ os.makedirs(directory_name, stat.S_IRWXU)
+ self.addCleanup(shutil.rmtree, parent_directory)
+
+ _signing_dir.SigningDirectory(directory_name)
+
+ def test_write_file(self):
+ # write_file when the file doesn't exist creates the file.
+
+ signing_directory = _signing_dir.SigningDirectory()
+ self.addCleanup(shutil.rmtree, signing_directory._directory_name)
+
+ file_name = self.getUniqueString()
+ contents = self.getUniqueString()
+ signing_directory.write_file(file_name, contents)
+
+ file_path = signing_directory.calc_path(file_name)
+ with open(file_path) as f:
+ actual_contents = f.read()
+
+ self.assertEqual(contents, actual_contents)
+
+ def test_replace_file(self):
+ # write_file when the file already exists overwrites it.
+
+ signing_directory = _signing_dir.SigningDirectory()
+ self.addCleanup(shutil.rmtree, signing_directory._directory_name)
+
+ file_name = self.getUniqueString()
+ orig_contents = self.getUniqueString()
+ signing_directory.write_file(file_name, orig_contents)
+
+ new_contents = self.getUniqueString()
+ signing_directory.write_file(file_name, new_contents)
+
+ file_path = signing_directory.calc_path(file_name)
+ with open(file_path) as f:
+ actual_contents = f.read()
+
+ self.assertEqual(new_contents, actual_contents)
+
+ def test_recreate_directory(self):
+ # If the original directory is lost, it gets recreated when a file
+ # is written.
+
+ signing_directory = _signing_dir.SigningDirectory()
+ self.addCleanup(shutil.rmtree, signing_directory._directory_name)
+
+ # Delete the directory.
+ shutil.rmtree(signing_directory._directory_name)
+
+ file_name = self.getUniqueString()
+ contents = self.getUniqueString()
+ signing_directory.write_file(file_name, contents)
+
+ actual_contents = signing_directory.read_file(file_name)
+ self.assertEqual(contents, actual_contents)
+
+ def test_read_file(self):
+ # Can read a file that was written.
+
+ signing_directory = _signing_dir.SigningDirectory()
+ self.addCleanup(shutil.rmtree, signing_directory._directory_name)
+
+ file_name = self.getUniqueString()
+ contents = self.getUniqueString()
+ signing_directory.write_file(file_name, contents)
+
+ actual_contents = signing_directory.read_file(file_name)
+
+ self.assertEqual(contents, actual_contents)
+
+ def test_read_file_doesnt_exist(self):
+ # Show what happens when try to read a file that wasn't written.
+
+ signing_directory = _signing_dir.SigningDirectory()
+ self.addCleanup(shutil.rmtree, signing_directory._directory_name)
+
+ file_name = self.getUniqueString()
+ self.assertRaises(IOError, signing_directory.read_file, file_name)
+
+ def test_calc_path(self):
+ # calc_path returns the actual filename built from the directory name.
+
+ signing_directory = _signing_dir.SigningDirectory()
+ self.addCleanup(shutil.rmtree, signing_directory._directory_name)
+
+ file_name = self.getUniqueString()
+ actual_path = signing_directory.calc_path(file_name)
+ expected_path = os.path.join(signing_directory._directory_name,
+ file_name)
+ self.assertEqual(expected_path, actual_path)
diff --git a/keystonemiddleware/tests/unit/test_auth_token.py b/keystonemiddleware/tests/unit/test_auth_token.py
index a909be7..38bc340 100644
--- a/keystonemiddleware/tests/unit/test_auth_token.py
+++ b/keystonemiddleware/tests/unit/test_auth_token.py
@@ -14,9 +14,7 @@
import datetime
import json
-import os
import shutil
-import stat
import uuid
import mock
@@ -24,13 +22,14 @@ import testtools
from keystonemiddleware import auth_token
from keystonemiddleware.auth_token import _exceptions as exc
+from keystonemiddleware.auth_token import _signing_dir
class RevocationsTests(testtools.TestCase):
def _check_with_list(self, revoked_list, token_ids):
directory_name = '/tmp/%s' % uuid.uuid4().hex
- signing_directory = auth_token._SigningDirectory(directory_name)
+ signing_directory = _signing_dir.SigningDirectory(directory_name)
self.addCleanup(shutil.rmtree, directory_name)
identity_server = mock.Mock()
@@ -64,121 +63,3 @@ class RevocationsTests(testtools.TestCase):
token_ids = [token_id]
self.assertRaises(exc.InvalidToken,
self._check_with_list, revoked_tokens, token_ids)
-
-
-class SigningDirectoryTests(testtools.TestCase):
-
- def test_directory_created_when_doesnt_exist(self):
- # When _SigningDirectory is created, if the directory doesn't exist
- # it's created with the expected permissions.
- tmp_name = uuid.uuid4().hex
- parent_directory = '/tmp/%s' % tmp_name
- directory_name = '/tmp/%s/%s' % ((tmp_name,) * 2)
-
- # Directories are created by __init__.
- auth_token._SigningDirectory(directory_name)
- self.addCleanup(shutil.rmtree, parent_directory)
-
- self.assertTrue(os.path.isdir(directory_name))
- self.assertTrue(os.access(directory_name, os.W_OK))
- self.assertEqual(os.stat(directory_name).st_uid, os.getuid())
- self.assertEqual(stat.S_IMODE(os.stat(directory_name).st_mode),
- stat.S_IRWXU)
-
- def test_use_directory_already_exists(self):
- # The directory can already exist.
-
- tmp_name = uuid.uuid4().hex
- parent_directory = '/tmp/%s' % tmp_name
- directory_name = '/tmp/%s/%s' % ((tmp_name,) * 2)
- os.makedirs(directory_name, stat.S_IRWXU)
- self.addCleanup(shutil.rmtree, parent_directory)
-
- auth_token._SigningDirectory(directory_name)
-
- def test_write_file(self):
- # write_file when the file doesn't exist creates the file.
-
- signing_directory = auth_token._SigningDirectory()
- self.addCleanup(shutil.rmtree, signing_directory._directory_name)
-
- file_name = self.getUniqueString()
- contents = self.getUniqueString()
- signing_directory.write_file(file_name, contents)
-
- file_path = signing_directory.calc_path(file_name)
- with open(file_path) as f:
- actual_contents = f.read()
-
- self.assertEqual(contents, actual_contents)
-
- def test_replace_file(self):
- # write_file when the file already exists overwrites it.
-
- signing_directory = auth_token._SigningDirectory()
- self.addCleanup(shutil.rmtree, signing_directory._directory_name)
-
- file_name = self.getUniqueString()
- orig_contents = self.getUniqueString()
- signing_directory.write_file(file_name, orig_contents)
-
- new_contents = self.getUniqueString()
- signing_directory.write_file(file_name, new_contents)
-
- file_path = signing_directory.calc_path(file_name)
- with open(file_path) as f:
- actual_contents = f.read()
-
- self.assertEqual(new_contents, actual_contents)
-
- def test_recreate_directory(self):
- # If the original directory is lost, it gets recreated when a file
- # is written.
-
- signing_directory = auth_token._SigningDirectory()
- self.addCleanup(shutil.rmtree, signing_directory._directory_name)
-
- # Delete the directory.
- shutil.rmtree(signing_directory._directory_name)
-
- file_name = self.getUniqueString()
- contents = self.getUniqueString()
- signing_directory.write_file(file_name, contents)
-
- actual_contents = signing_directory.read_file(file_name)
- self.assertEqual(contents, actual_contents)
-
- def test_read_file(self):
- # Can read a file that was written.
-
- signing_directory = auth_token._SigningDirectory()
- self.addCleanup(shutil.rmtree, signing_directory._directory_name)
-
- file_name = self.getUniqueString()
- contents = self.getUniqueString()
- signing_directory.write_file(file_name, contents)
-
- actual_contents = signing_directory.read_file(file_name)
-
- self.assertEqual(contents, actual_contents)
-
- def test_read_file_doesnt_exist(self):
- # Show what happens when try to read a file that wasn't written.
-
- signing_directory = auth_token._SigningDirectory()
- self.addCleanup(shutil.rmtree, signing_directory._directory_name)
-
- file_name = self.getUniqueString()
- self.assertRaises(IOError, signing_directory.read_file, file_name)
-
- def test_calc_path(self):
- # calc_path returns the actual filename built from the directory name.
-
- signing_directory = auth_token._SigningDirectory()
- self.addCleanup(shutil.rmtree, signing_directory._directory_name)
-
- file_name = self.getUniqueString()
- actual_path = signing_directory.calc_path(file_name)
- expected_path = os.path.join(signing_directory._directory_name,
- file_name)
- self.assertEqual(expected_path, actual_path)