summaryrefslogtreecommitdiff
path: root/oslo_policy/tests/test_policy.py
diff options
context:
space:
mode:
Diffstat (limited to 'oslo_policy/tests/test_policy.py')
-rw-r--r--oslo_policy/tests/test_policy.py54
1 files changed, 54 insertions, 0 deletions
diff --git a/oslo_policy/tests/test_policy.py b/oslo_policy/tests/test_policy.py
index f1f7ddb..85cbfc9 100644
--- a/oslo_policy/tests/test_policy.py
+++ b/oslo_policy/tests/test_policy.py
@@ -15,10 +15,13 @@
"""Test of Policy Engine"""
+import os
+
import mock
from oslo_config import cfg
from oslo_serialization import jsonutils
from oslotest import base as test_base
+import six
from oslo_policy import _checks
from oslo_policy import _parser
@@ -172,6 +175,56 @@ class EnforcerTest(base.PolicyBaseTestCase):
'policy.d/b.conf',
])
+ def test_load_directory_caching_with_files_updated(self):
+ self.create_config_file('policy.d/a.conf', POLICY_A_CONTENTS)
+
+ self.enforcer.load_rules(False)
+ self.assertIsNotNone(self.enforcer.rules)
+
+ old = six.next(six.itervalues(
+ self.enforcer._policy_dir_mtimes))
+ self.assertEqual(1, len(self.enforcer._policy_dir_mtimes))
+
+ # Touch the file
+ conf_path = os.path.join(self.config_dir, 'policy.d/a.conf')
+ stinfo = os.stat(conf_path)
+ os.utime(conf_path, (stinfo.st_atime + 10, stinfo.st_mtime + 10))
+
+ self.enforcer.load_rules(False)
+ self.assertEqual(1, len(self.enforcer._policy_dir_mtimes))
+ self.assertEqual(old, six.next(six.itervalues(
+ self.enforcer._policy_dir_mtimes)))
+
+ loaded_rules = jsonutils.loads(str(self.enforcer.rules))
+ self.assertEqual('is_admin:True', loaded_rules['admin'])
+ self.check_loaded_files([
+ 'policy.json',
+ 'policy.d/a.conf',
+ 'policy.d/a.conf',
+ ])
+
+ def test_load_directory_caching_with_files_same(self):
+ self.create_config_file('policy.d/a.conf', POLICY_A_CONTENTS)
+
+ self.enforcer.load_rules(False)
+ self.assertIsNotNone(self.enforcer.rules)
+
+ old = six.next(six.itervalues(
+ self.enforcer._policy_dir_mtimes))
+ self.assertEqual(1, len(self.enforcer._policy_dir_mtimes))
+
+ self.enforcer.load_rules(False)
+ self.assertEqual(1, len(self.enforcer._policy_dir_mtimes))
+ self.assertEqual(old, six.next(six.itervalues(
+ self.enforcer._policy_dir_mtimes)))
+
+ loaded_rules = jsonutils.loads(str(self.enforcer.rules))
+ self.assertEqual('is_admin:True', loaded_rules['admin'])
+ self.check_loaded_files([
+ 'policy.json',
+ 'policy.d/a.conf',
+ ])
+
def test_load_multiple_directories(self):
self.create_config_file('policy.d/a.conf', POLICY_A_CONTENTS)
self.create_config_file('policy.d/b.conf', POLICY_B_CONTENTS)
@@ -285,6 +338,7 @@ class EnforcerTest(base.PolicyBaseTestCase):
use_conf=True)
self.enforcer.overwrite = False
+ self.enforcer._is_directory_updated = lambda x, y: True
# Call enforce(), it will load rules from
# policy configuration files, to merge with