diff options
author | Steve Baker <sbaker@redhat.com> | 2020-07-24 17:12:55 +1200 |
---|---|---|
committer | Steve Baker <sbaker@redhat.com> | 2020-11-16 10:39:36 +1300 |
commit | 6ea0e8eaf65330e47e9be231fd7871474ad16f76 (patch) | |
tree | fd85ea4abdfb98fbc99656816076a3c5857f439e /ironic/common/args.py | |
parent | b861ea022b21978e1fa0db7ef90b3cc56fc1c908 (diff) | |
download | ironic-6ea0e8eaf65330e47e9be231fd7871474ad16f76.tar.gz |
New argument validate decorator
This will replace @expose.expose parameter validation of method
arguments.
This implementation is complete and every required validator has been
implemented. Implementing *args and **kwargs support is probably
overkill with the followup @expose.body decorator, but it might be
nice to have in the future.
Change-Id: I62492f7396805433ac3577274ffbbb5d787d0fc1
Story: 1651346
Task: 10551
Diffstat (limited to 'ironic/common/args.py')
-rwxr-xr-x | ironic/common/args.py | 394 |
1 files changed, 394 insertions, 0 deletions
diff --git a/ironic/common/args.py b/ironic/common/args.py new file mode 100755 index 000000000..014fbd318 --- /dev/null +++ b/ironic/common/args.py @@ -0,0 +1,394 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import functools +import inspect + +import jsonschema +from oslo_utils import strutils +from oslo_utils import uuidutils + +from ironic.common import exception +from ironic.common.i18n import _ +from ironic.common import utils + + +def string(name, value): + """Validate that the value is a string + + :param name: Name of the argument + :param value: A string value + :returns: The string value, or None if value is None + :raises: InvalidParameterValue if the value is not a string + """ + if value is None: + return + if not isinstance(value, str): + raise exception.InvalidParameterValue( + _('Expected string for %s: %s') % (name, value)) + return value + + +def boolean(name, value): + """Validate that the value is a string representing a boolean + + :param name: Name of the argument + :param value: A string value + :returns: The boolean representation of the value, or None if value is None + :raises: InvalidParameterValue if the value cannot be converted to a + boolean + """ + if value is None: + return + try: + return strutils.bool_from_string(value, strict=True) + except ValueError as e: + raise exception.InvalidParameterValue( + _('Invalid %s: %s') % (name, e)) + + +def uuid(name, value): + """Validate that the value is a UUID + + :param name: Name of the argument + :param value: A UUID string value + :returns: The value, or None if value is None + :raises: InvalidParameterValue if the value is not a valid UUID + """ + if value is None: + return + if not uuidutils.is_uuid_like(value): + raise exception.InvalidParameterValue( + _('Expected UUID for %s: %s') % (name, value)) + return value + + +def name(name, value): + """Validate that the value is a logical name + + :param name: Name of the argument + :param value: A logical name string value + :returns: The value, or None if value is None + :raises: InvalidParameterValue if the value is not a valid logical name + """ + if value is None: + return + if not utils.is_valid_logical_name(value): + raise exception.InvalidParameterValue( + _('Expected name for %s: %s') % (name, value)) + return value + + +def uuid_or_name(name, value): + """Validate that the value is a UUID or logical name + + :param name: Name of the argument + :param value: A UUID or logical name string value + :returns: The value, or None if value is None + :raises: InvalidParameterValue if the value is not a valid UUID or + logical name + """ + if value is None: + return + if (not utils.is_valid_logical_name(value) + and not uuidutils.is_uuid_like(value)): + raise exception.InvalidParameterValue( + _('Expected UUID or name for %s: %s') % (name, value)) + return value + + +def string_list(name, value): + """Validate and convert comma delimited string to a list. + + :param name: Name of the argument + :param value: A comma separated string of values + :returns: A list of unique values (lower-cased), maintaining the + same order, or None if value is None + :raises: InvalidParameterValue if the value is not a string + """ + value = string(name, value) + if value is None: + return + items = [] + for v in str(value).split(','): + v_norm = v.strip().lower() + if v_norm and v_norm not in items: + items.append(v_norm) + return items + + +def integer(name, value): + """Validate that the value represents an integer + + :param name: Name of the argument + :param value: A value representing an integer + :returns: The value as an int, or None if value is None + :raises: InvalidParameterValue if the value does not represent an integer + """ + if value is None: + return + try: + return int(value) + except (ValueError, TypeError): + raise exception.InvalidParameterValue( + _('Expected an integer for %s: %s') % (name, value)) + + +def mac_address(name, value): + """Validate that the value represents a MAC address + + :param name: Name of the argument + :param value: A string value representing a MAC address + :returns: The value as a normalized MAC address, or None if value is None + :raises: InvalidParameterValue if the value is not a valid MAC address + """ + if value is None: + return + try: + return utils.validate_and_normalize_mac(value) + except exception.InvalidMAC: + raise exception.InvalidParameterValue( + _('Expected valid MAC address for %s: %s') % (name, value)) + + +def _or(name, value, validators): + last_error = None + for v in validators: + try: + return v(name=name, value=value) + except exception.Invalid as e: + last_error = e + if last_error: + raise last_error + + +def or_valid(*validators): + """Validates if at least one supplied validator passes + + :param name: Name of the argument + :param value: A value + :returns: The value returned from the first successful validator + :raises: The error from the last validator when + every validation fails + """ + assert validators, 'No validators specified for or_valid' + return functools.partial(_or, validators=validators) + + +def _and(name, value, validators): + for v in validators: + value = v(name=name, value=value) + return value + + +def and_valid(*validators): + """Validates that every supplied validator passes + + The value returned from each validator is passed as the value to the next + one. + + :param name: Name of the argument + :param value: A value + :returns: The value transformed through every supplied validator + :raises: The error from the first failed validator + """ + assert validators, 'No validators specified for or_valid' + return functools.partial(_and, validators=validators) + + +def _validate_schema(name, value, schema): + if value is None: + return + try: + jsonschema.validate(value, schema) + except jsonschema.exceptions.ValidationError as e: + + # The error message includes the whole schema which can be very + # large and unhelpful, so truncate it to be brief and useful + error_msg = ' '.join(str(e).split("\n")[:3])[:-1] + raise exception.InvalidParameterValue( + _('Schema error for %s: %s') % (name, error_msg)) + return value + + +def schema(schema): + """Return a validator function which validates the value with jsonschema + + :param: schema dict representing jsonschema to validate with + :returns: validator function which takes name and value arguments + """ + jsonschema.Draft4Validator.check_schema(schema) + + return functools.partial(_validate_schema, schema=schema) + + +def _validate_dict(name, value, validators): + if value is None: + return + _validate_types(name, value, (dict, )) + + for k, v in validators.items(): + if k in value: + value[k] = v(name=k, value=value[k]) + + return value + + +def dict_valid(**validators): + """Return a validator function which validates dict fields + + Validators will replace the value with the validation result. Any dict + item which has no validator is ignored. When a key is missing in the value + then the corresponding validator will not be run. + + :param: validators dict where the key is a dict key to validate and the + value is a validator function to run on that value + :returns: validator function which takes name and value arguments + """ + return functools.partial(_validate_dict, validators=validators) + + +def _validate_types(name, value, types): + if not isinstance(value, types): + str_types = ', '.join([str(t) for t in types]) + raise exception.InvalidParameterValue( + _('Expected types %s for %s: %s') % (str_types, name, value)) + return value + + +def types(*types): + """Return a validator function which checks the value is one of the types + + :param: types one or more types to use for the isinstance test + :returns: validator function which takes name and value arguments + """ + return functools.partial(_validate_types, types=tuple(types)) + + +def _apply_validator(name, value, val_functions): + if callable(val_functions): + return val_functions(name, value) + + for v in val_functions: + value = v(name, value) + return value + + +def _inspect(function): + sig = inspect.signature(function) + param_keyword = None # **kwargs parameter + param_positional = None # *args parameter + params = [] + + for param in sig.parameters.values(): + if param.kind == inspect.Parameter.POSITIONAL_OR_KEYWORD: + params.append(param) + elif param.kind == inspect.Parameter.VAR_KEYWORD: + param_keyword = param + elif param.kind == inspect.Parameter.VAR_POSITIONAL: + param_positional = param + else: + assert False, 'Unsupported parameter kind %s %s' % ( + param.name, param.kind + ) + return params, param_positional, param_keyword + + +def validate(*args, **kwargs): + """Decorator which validates and transforms function arguments + + """ + assert not args, 'Validators must be specifed by argument name' + assert kwargs, 'No validators specified' + validators = kwargs + + def inner_function(function): + params, param_positional, param_keyword = _inspect(function) + + @functools.wraps(function) + def inner_check_args(*args, **kwargs): + args = list(args) + args_len = len(args) + kwargs_next = {} + next_arg_index = 0 + + if not param_keyword: + # ensure each named argument belongs to a param + kwarg_keys = set(kwargs) + param_names = set(p.name for p in params) + extra_args = kwarg_keys.difference(param_names) + if extra_args: + raise exception.InvalidParameterValue( + _('Unexpected arguments: %s') % ', '.join(extra_args)) + + for i, param in enumerate(params): + + if i == 0 and param.name == 'self': + # skip validating self + continue + + val_function = validators.get(param.name) + if not val_function: + continue + + if i < args_len: + # validate positional argument + args[i] = val_function(param.name, args[i]) + next_arg_index = i + 1 + + elif param.name in kwargs: + # validate keyword argument + kwargs_next[param.name] = val_function( + param.name, kwargs.pop(param.name)) + elif param.default == inspect.Parameter.empty: + # no argument was provided, and there is no default + # in the parameter, so this is a mandatory argument + raise exception.InvalidParameterValue( + _('Missing mandatory parameter: %s') % param.name) + + if param_positional: + # handle validating *args + val_function = validators.get(param_positional.name) + remaining = args[next_arg_index:] + if val_function and remaining: + args = args[:next_arg_index] + args.extend(val_function(param_positional.name, remaining)) + + # handle validating remaining **kwargs + if kwargs: + val_function = (param_keyword + and validators.get(param_keyword.name)) + if val_function: + kwargs_next.update( + val_function(param_keyword.name, kwargs)) + else: + # make sure unvalidated keyword arguments are kept + kwargs_next.update(kwargs) + + return function(*args, **kwargs_next) + return inner_check_args + return inner_function + + +patch = schema({ + 'type': 'array', + 'items': { + 'type': 'object', + 'properties': { + 'path': {'type': 'string', 'pattern': '^(/[\\w-]+)+$'}, + 'op': {'type': 'string', 'enum': ['add', 'replace', 'remove']}, + 'value': {} + }, + 'additionalProperties': False + } +}) +"""Validate a patch API operation""" |