summaryrefslogtreecommitdiff
path: root/oslo_policy/_checks.py
diff options
context:
space:
mode:
authorIan Cordasco <ian.cordasco@rackspace.com>2015-02-05 11:45:49 -0600
committerSteve Martinelli <stevemar@ca.ibm.com>2015-02-06 00:38:14 -0500
commitdf5d80759e7c7a05d7a8ef8120d86bb934b23b37 (patch)
tree9c96599e4cae94d2a24eb0b4ff35100d4745f69b /oslo_policy/_checks.py
parentc2872aa32fa1fb861c9fab54b1eb32401ea2b0cd (diff)
downloadoslo-policy-df5d80759e7c7a05d7a8ef8120d86bb934b23b37.tar.gz
Make use of private modules
Move the parser and checks logic into oslo_policy._parser and oslo_policy._checks respectively. As a consequence, this allows us to create separate test files for those modules so we now also have oslo_policy.tests.test_parser and oslo_policy.tests.test_checks. Since those modules needed some common classes and fixtures it was also necessary to add oslo_policy.tests.base to service the three test modules. Change-Id: I656dcb8fda7b953f5def8ddfaa4d119a8c881965
Diffstat (limited to 'oslo_policy/_checks.py')
-rw-r--r--oslo_policy/_checks.py310
1 files changed, 310 insertions, 0 deletions
diff --git a/oslo_policy/_checks.py b/oslo_policy/_checks.py
new file mode 100644
index 0000000..7860b61
--- /dev/null
+++ b/oslo_policy/_checks.py
@@ -0,0 +1,310 @@
+# -*- coding: utf-8 -*-
+#
+# Copyright (c) 2015 OpenStack Foundation.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+import abc
+import ast
+import copy
+
+from oslo_serialization import jsonutils
+import six
+import six.moves.urllib.parse as urlparse
+import six.moves.urllib.request as urlrequest
+
+
+registered_checks = {}
+
+
+@six.add_metaclass(abc.ABCMeta)
+class BaseCheck(object):
+ """Abstract base class for Check classes."""
+
+ @abc.abstractmethod
+ def __str__(self):
+ """String representation of the Check tree rooted at this node."""
+
+ pass
+
+ @abc.abstractmethod
+ def __call__(self, target, cred, enforcer):
+ """Triggers if instance of the class is called.
+
+ Performs the check. Returns False to reject the access or a
+ true value (not necessary True) to accept the access.
+ """
+
+ pass
+
+
+class FalseCheck(BaseCheck):
+ """A policy check that always returns ``False`` (disallow)."""
+
+ def __str__(self):
+ """Return a string representation of this check."""
+
+ return "!"
+
+ def __call__(self, target, cred, enforcer):
+ """Check the policy."""
+
+ return False
+
+
+class TrueCheck(BaseCheck):
+ """A policy check that always returns ``True`` (allow)."""
+
+ def __str__(self):
+ """Return a string representation of this check."""
+
+ return "@"
+
+ def __call__(self, target, cred, enforcer):
+ """Check the policy."""
+
+ return True
+
+
+class Check(BaseCheck):
+ """A base class to allow for user-defined policy checks.
+
+ :param kind: The kind of the check, i.e., the field before the ``:``.
+ :param match: The match of the check, i.e., the field after the ``:``.
+
+ """
+
+ def __init__(self, kind, match):
+ self.kind = kind
+ self.match = match
+
+ def __str__(self):
+ """Return a string representation of this check."""
+
+ return "%s:%s" % (self.kind, self.match)
+
+
+class NotCheck(BaseCheck):
+ """Implements the "not" logical operator.
+
+ A policy check that inverts the result of another policy check.
+
+ :param rule: The rule to negate. Must be a Check.
+
+ """
+
+ def __init__(self, rule):
+ self.rule = rule
+
+ def __str__(self):
+ """Return a string representation of this check."""
+
+ return "not %s" % self.rule
+
+ def __call__(self, target, cred, enforcer):
+ """Check the policy.
+
+ Returns the logical inverse of the wrapped check.
+ """
+
+ return not self.rule(target, cred, enforcer)
+
+
+class AndCheck(BaseCheck):
+ """Implements the "and" logical operator.
+
+ A policy check that requires that a list of other checks all return True.
+
+ :param list rules: rules that will be tested.
+
+ """
+
+ def __init__(self, rules):
+ self.rules = rules
+
+ def __str__(self):
+ """Return a string representation of this check."""
+
+ return "(%s)" % ' and '.join(str(r) for r in self.rules)
+
+ def __call__(self, target, cred, enforcer):
+ """Check the policy.
+
+ Requires that all rules accept in order to return True.
+ """
+
+ for rule in self.rules:
+ if not rule(target, cred, enforcer):
+ return False
+
+ return True
+
+ def add_check(self, rule):
+ """Adds rule to be tested.
+
+ Allows addition of another rule to the list of rules that will
+ be tested.
+
+ :returns: self
+ :rtype: :class:`.AndCheck`
+ """
+
+ self.rules.append(rule)
+ return self
+
+
+class OrCheck(BaseCheck):
+ """Implements the "or" operator.
+
+ A policy check that requires that at least one of a list of other
+ checks returns ``True``.
+
+ :param rules: A list of rules that will be tested.
+
+ """
+
+ def __init__(self, rules):
+ self.rules = rules
+
+ def __str__(self):
+ """Return a string representation of this check."""
+
+ return "(%s)" % ' or '.join(str(r) for r in self.rules)
+
+ def __call__(self, target, cred, enforcer):
+ """Check the policy.
+
+ Requires that at least one rule accept in order to return True.
+ """
+
+ for rule in self.rules:
+ if rule(target, cred, enforcer):
+ return True
+ return False
+
+ def add_check(self, rule):
+ """Adds rule to be tested.
+
+ Allows addition of another rule to the list of rules that will
+ be tested. Returns the OrCheck object for convenience.
+ """
+
+ self.rules.append(rule)
+ return self
+
+
+def register(name, func=None):
+ """Register a function or :class:`.Check` class as a policy check.
+
+ :param name: Gives the name of the check type, e.g., "rule",
+ "role", etc. If name is ``None``, a default check type
+ will be registered.
+ :param func: If given, provides the function or class to register.
+ If not given, returns a function taking one argument
+ to specify the function or class to register,
+ allowing use as a decorator.
+ """
+
+ # Perform the actual decoration by registering the function or
+ # class. Returns the function or class for compliance with the
+ # decorator interface.
+ def decorator(func):
+ registered_checks[name] = func
+ return func
+
+ # If the function or class is given, do the registration
+ if func:
+ return decorator(func)
+
+ return decorator
+
+
+@register("rule")
+class RuleCheck(Check):
+ """Recursively checks credentials based on the defined rules."""
+
+ def __call__(self, target, creds, enforcer):
+ try:
+ return enforcer.rules[self.match](target, creds, enforcer)
+ except KeyError:
+ # We don't have any matching rule; fail closed
+ return False
+
+
+@register("role")
+class RoleCheck(Check):
+ """Check that there is a matching role in the ``creds`` dict."""
+
+ def __call__(self, target, creds, enforcer):
+ return self.match.lower() in [x.lower() for x in creds['roles']]
+
+
+@register('http')
+class HttpCheck(Check):
+ """Check ``http:`` rules by calling to a remote server.
+
+ This example implementation simply verifies that the response
+ is exactly ``True``.
+ """
+
+ def __call__(self, target, creds, enforcer):
+ url = ('http:' + self.match) % target
+
+ # Convert instances of object() in target temporarily to
+ # empty dict to avoid circular reference detection
+ # errors in jsonutils.dumps().
+ temp_target = copy.deepcopy(target)
+ for key in target.keys():
+ element = target.get(key)
+ if type(element) is object:
+ temp_target[key] = {}
+
+ data = {'target': jsonutils.dumps(temp_target),
+ 'credentials': jsonutils.dumps(creds)}
+ post_data = urlparse.urlencode(data)
+ f = urlrequest.urlopen(url, post_data)
+ return f.read() == "True"
+
+
+@register(None)
+class GenericCheck(Check):
+ """Check an individual match.
+
+ Matches look like:
+
+ - tenant:%(tenant_id)s
+ - role:compute:admin
+ - True:%(user.enabled)s
+ - 'Member':%(role.name)s
+ """
+
+ def __call__(self, target, creds, enforcer):
+ try:
+ match = self.match % target
+ except KeyError:
+ # While doing GenericCheck if key not
+ # present in Target return false
+ return False
+
+ try:
+ # Try to interpret self.kind as a literal
+ leftval = ast.literal_eval(self.kind)
+ except ValueError:
+ try:
+ kind_parts = self.kind.split('.')
+ leftval = creds
+ for kind_part in kind_parts:
+ leftval = leftval[kind_part]
+ except KeyError:
+ return False
+ return match == six.text_type(leftval)