diff options
author | Ian Cordasco <ian.cordasco@rackspace.com> | 2015-02-05 11:45:49 -0600 |
---|---|---|
committer | Steve Martinelli <stevemar@ca.ibm.com> | 2015-02-06 00:38:14 -0500 |
commit | df5d80759e7c7a05d7a8ef8120d86bb934b23b37 (patch) | |
tree | 9c96599e4cae94d2a24eb0b4ff35100d4745f69b /oslo_policy/_checks.py | |
parent | c2872aa32fa1fb861c9fab54b1eb32401ea2b0cd (diff) | |
download | oslo-policy-df5d80759e7c7a05d7a8ef8120d86bb934b23b37.tar.gz |
Make use of private modules
Move the parser and checks logic into oslo_policy._parser and
oslo_policy._checks respectively. As a consequence, this allows us to
create separate test files for those modules so we now also have
oslo_policy.tests.test_parser and oslo_policy.tests.test_checks. Since
those modules needed some common classes and fixtures it was also
necessary to add oslo_policy.tests.base to service the three test
modules.
Change-Id: I656dcb8fda7b953f5def8ddfaa4d119a8c881965
Diffstat (limited to 'oslo_policy/_checks.py')
-rw-r--r-- | oslo_policy/_checks.py | 310 |
1 files changed, 310 insertions, 0 deletions
diff --git a/oslo_policy/_checks.py b/oslo_policy/_checks.py new file mode 100644 index 0000000..7860b61 --- /dev/null +++ b/oslo_policy/_checks.py @@ -0,0 +1,310 @@ +# -*- coding: utf-8 -*- +# +# Copyright (c) 2015 OpenStack Foundation. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +import abc +import ast +import copy + +from oslo_serialization import jsonutils +import six +import six.moves.urllib.parse as urlparse +import six.moves.urllib.request as urlrequest + + +registered_checks = {} + + +@six.add_metaclass(abc.ABCMeta) +class BaseCheck(object): + """Abstract base class for Check classes.""" + + @abc.abstractmethod + def __str__(self): + """String representation of the Check tree rooted at this node.""" + + pass + + @abc.abstractmethod + def __call__(self, target, cred, enforcer): + """Triggers if instance of the class is called. + + Performs the check. Returns False to reject the access or a + true value (not necessary True) to accept the access. + """ + + pass + + +class FalseCheck(BaseCheck): + """A policy check that always returns ``False`` (disallow).""" + + def __str__(self): + """Return a string representation of this check.""" + + return "!" + + def __call__(self, target, cred, enforcer): + """Check the policy.""" + + return False + + +class TrueCheck(BaseCheck): + """A policy check that always returns ``True`` (allow).""" + + def __str__(self): + """Return a string representation of this check.""" + + return "@" + + def __call__(self, target, cred, enforcer): + """Check the policy.""" + + return True + + +class Check(BaseCheck): + """A base class to allow for user-defined policy checks. + + :param kind: The kind of the check, i.e., the field before the ``:``. + :param match: The match of the check, i.e., the field after the ``:``. + + """ + + def __init__(self, kind, match): + self.kind = kind + self.match = match + + def __str__(self): + """Return a string representation of this check.""" + + return "%s:%s" % (self.kind, self.match) + + +class NotCheck(BaseCheck): + """Implements the "not" logical operator. + + A policy check that inverts the result of another policy check. + + :param rule: The rule to negate. Must be a Check. + + """ + + def __init__(self, rule): + self.rule = rule + + def __str__(self): + """Return a string representation of this check.""" + + return "not %s" % self.rule + + def __call__(self, target, cred, enforcer): + """Check the policy. + + Returns the logical inverse of the wrapped check. + """ + + return not self.rule(target, cred, enforcer) + + +class AndCheck(BaseCheck): + """Implements the "and" logical operator. + + A policy check that requires that a list of other checks all return True. + + :param list rules: rules that will be tested. + + """ + + def __init__(self, rules): + self.rules = rules + + def __str__(self): + """Return a string representation of this check.""" + + return "(%s)" % ' and '.join(str(r) for r in self.rules) + + def __call__(self, target, cred, enforcer): + """Check the policy. + + Requires that all rules accept in order to return True. + """ + + for rule in self.rules: + if not rule(target, cred, enforcer): + return False + + return True + + def add_check(self, rule): + """Adds rule to be tested. + + Allows addition of another rule to the list of rules that will + be tested. + + :returns: self + :rtype: :class:`.AndCheck` + """ + + self.rules.append(rule) + return self + + +class OrCheck(BaseCheck): + """Implements the "or" operator. + + A policy check that requires that at least one of a list of other + checks returns ``True``. + + :param rules: A list of rules that will be tested. + + """ + + def __init__(self, rules): + self.rules = rules + + def __str__(self): + """Return a string representation of this check.""" + + return "(%s)" % ' or '.join(str(r) for r in self.rules) + + def __call__(self, target, cred, enforcer): + """Check the policy. + + Requires that at least one rule accept in order to return True. + """ + + for rule in self.rules: + if rule(target, cred, enforcer): + return True + return False + + def add_check(self, rule): + """Adds rule to be tested. + + Allows addition of another rule to the list of rules that will + be tested. Returns the OrCheck object for convenience. + """ + + self.rules.append(rule) + return self + + +def register(name, func=None): + """Register a function or :class:`.Check` class as a policy check. + + :param name: Gives the name of the check type, e.g., "rule", + "role", etc. If name is ``None``, a default check type + will be registered. + :param func: If given, provides the function or class to register. + If not given, returns a function taking one argument + to specify the function or class to register, + allowing use as a decorator. + """ + + # Perform the actual decoration by registering the function or + # class. Returns the function or class for compliance with the + # decorator interface. + def decorator(func): + registered_checks[name] = func + return func + + # If the function or class is given, do the registration + if func: + return decorator(func) + + return decorator + + +@register("rule") +class RuleCheck(Check): + """Recursively checks credentials based on the defined rules.""" + + def __call__(self, target, creds, enforcer): + try: + return enforcer.rules[self.match](target, creds, enforcer) + except KeyError: + # We don't have any matching rule; fail closed + return False + + +@register("role") +class RoleCheck(Check): + """Check that there is a matching role in the ``creds`` dict.""" + + def __call__(self, target, creds, enforcer): + return self.match.lower() in [x.lower() for x in creds['roles']] + + +@register('http') +class HttpCheck(Check): + """Check ``http:`` rules by calling to a remote server. + + This example implementation simply verifies that the response + is exactly ``True``. + """ + + def __call__(self, target, creds, enforcer): + url = ('http:' + self.match) % target + + # Convert instances of object() in target temporarily to + # empty dict to avoid circular reference detection + # errors in jsonutils.dumps(). + temp_target = copy.deepcopy(target) + for key in target.keys(): + element = target.get(key) + if type(element) is object: + temp_target[key] = {} + + data = {'target': jsonutils.dumps(temp_target), + 'credentials': jsonutils.dumps(creds)} + post_data = urlparse.urlencode(data) + f = urlrequest.urlopen(url, post_data) + return f.read() == "True" + + +@register(None) +class GenericCheck(Check): + """Check an individual match. + + Matches look like: + + - tenant:%(tenant_id)s + - role:compute:admin + - True:%(user.enabled)s + - 'Member':%(role.name)s + """ + + def __call__(self, target, creds, enforcer): + try: + match = self.match % target + except KeyError: + # While doing GenericCheck if key not + # present in Target return false + return False + + try: + # Try to interpret self.kind as a literal + leftval = ast.literal_eval(self.kind) + except ValueError: + try: + kind_parts = self.kind.split('.') + leftval = creds + for kind_part in kind_parts: + leftval = leftval[kind_part] + except KeyError: + return False + return match == six.text_type(leftval) |