summaryrefslogtreecommitdiff
path: root/ironic/common/args.py
diff options
context:
space:
mode:
authorSteve Baker <sbaker@redhat.com>2020-07-24 17:12:55 +1200
committerSteve Baker <sbaker@redhat.com>2020-11-16 10:39:36 +1300
commit6ea0e8eaf65330e47e9be231fd7871474ad16f76 (patch)
treefd85ea4abdfb98fbc99656816076a3c5857f439e /ironic/common/args.py
parentb861ea022b21978e1fa0db7ef90b3cc56fc1c908 (diff)
downloadironic-6ea0e8eaf65330e47e9be231fd7871474ad16f76.tar.gz
New argument validate decorator
This will replace @expose.expose parameter validation of method arguments. This implementation is complete and every required validator has been implemented. Implementing *args and **kwargs support is probably overkill with the followup @expose.body decorator, but it might be nice to have in the future. Change-Id: I62492f7396805433ac3577274ffbbb5d787d0fc1 Story: 1651346 Task: 10551
Diffstat (limited to 'ironic/common/args.py')
-rwxr-xr-xironic/common/args.py394
1 files changed, 394 insertions, 0 deletions
diff --git a/ironic/common/args.py b/ironic/common/args.py
new file mode 100755
index 000000000..014fbd318
--- /dev/null
+++ b/ironic/common/args.py
@@ -0,0 +1,394 @@
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import functools
+import inspect
+
+import jsonschema
+from oslo_utils import strutils
+from oslo_utils import uuidutils
+
+from ironic.common import exception
+from ironic.common.i18n import _
+from ironic.common import utils
+
+
+def string(name, value):
+ """Validate that the value is a string
+
+ :param name: Name of the argument
+ :param value: A string value
+ :returns: The string value, or None if value is None
+ :raises: InvalidParameterValue if the value is not a string
+ """
+ if value is None:
+ return
+ if not isinstance(value, str):
+ raise exception.InvalidParameterValue(
+ _('Expected string for %s: %s') % (name, value))
+ return value
+
+
+def boolean(name, value):
+ """Validate that the value is a string representing a boolean
+
+ :param name: Name of the argument
+ :param value: A string value
+ :returns: The boolean representation of the value, or None if value is None
+ :raises: InvalidParameterValue if the value cannot be converted to a
+ boolean
+ """
+ if value is None:
+ return
+ try:
+ return strutils.bool_from_string(value, strict=True)
+ except ValueError as e:
+ raise exception.InvalidParameterValue(
+ _('Invalid %s: %s') % (name, e))
+
+
+def uuid(name, value):
+ """Validate that the value is a UUID
+
+ :param name: Name of the argument
+ :param value: A UUID string value
+ :returns: The value, or None if value is None
+ :raises: InvalidParameterValue if the value is not a valid UUID
+ """
+ if value is None:
+ return
+ if not uuidutils.is_uuid_like(value):
+ raise exception.InvalidParameterValue(
+ _('Expected UUID for %s: %s') % (name, value))
+ return value
+
+
+def name(name, value):
+ """Validate that the value is a logical name
+
+ :param name: Name of the argument
+ :param value: A logical name string value
+ :returns: The value, or None if value is None
+ :raises: InvalidParameterValue if the value is not a valid logical name
+ """
+ if value is None:
+ return
+ if not utils.is_valid_logical_name(value):
+ raise exception.InvalidParameterValue(
+ _('Expected name for %s: %s') % (name, value))
+ return value
+
+
+def uuid_or_name(name, value):
+ """Validate that the value is a UUID or logical name
+
+ :param name: Name of the argument
+ :param value: A UUID or logical name string value
+ :returns: The value, or None if value is None
+ :raises: InvalidParameterValue if the value is not a valid UUID or
+ logical name
+ """
+ if value is None:
+ return
+ if (not utils.is_valid_logical_name(value)
+ and not uuidutils.is_uuid_like(value)):
+ raise exception.InvalidParameterValue(
+ _('Expected UUID or name for %s: %s') % (name, value))
+ return value
+
+
+def string_list(name, value):
+ """Validate and convert comma delimited string to a list.
+
+ :param name: Name of the argument
+ :param value: A comma separated string of values
+ :returns: A list of unique values (lower-cased), maintaining the
+ same order, or None if value is None
+ :raises: InvalidParameterValue if the value is not a string
+ """
+ value = string(name, value)
+ if value is None:
+ return
+ items = []
+ for v in str(value).split(','):
+ v_norm = v.strip().lower()
+ if v_norm and v_norm not in items:
+ items.append(v_norm)
+ return items
+
+
+def integer(name, value):
+ """Validate that the value represents an integer
+
+ :param name: Name of the argument
+ :param value: A value representing an integer
+ :returns: The value as an int, or None if value is None
+ :raises: InvalidParameterValue if the value does not represent an integer
+ """
+ if value is None:
+ return
+ try:
+ return int(value)
+ except (ValueError, TypeError):
+ raise exception.InvalidParameterValue(
+ _('Expected an integer for %s: %s') % (name, value))
+
+
+def mac_address(name, value):
+ """Validate that the value represents a MAC address
+
+ :param name: Name of the argument
+ :param value: A string value representing a MAC address
+ :returns: The value as a normalized MAC address, or None if value is None
+ :raises: InvalidParameterValue if the value is not a valid MAC address
+ """
+ if value is None:
+ return
+ try:
+ return utils.validate_and_normalize_mac(value)
+ except exception.InvalidMAC:
+ raise exception.InvalidParameterValue(
+ _('Expected valid MAC address for %s: %s') % (name, value))
+
+
+def _or(name, value, validators):
+ last_error = None
+ for v in validators:
+ try:
+ return v(name=name, value=value)
+ except exception.Invalid as e:
+ last_error = e
+ if last_error:
+ raise last_error
+
+
+def or_valid(*validators):
+ """Validates if at least one supplied validator passes
+
+ :param name: Name of the argument
+ :param value: A value
+ :returns: The value returned from the first successful validator
+ :raises: The error from the last validator when
+ every validation fails
+ """
+ assert validators, 'No validators specified for or_valid'
+ return functools.partial(_or, validators=validators)
+
+
+def _and(name, value, validators):
+ for v in validators:
+ value = v(name=name, value=value)
+ return value
+
+
+def and_valid(*validators):
+ """Validates that every supplied validator passes
+
+ The value returned from each validator is passed as the value to the next
+ one.
+
+ :param name: Name of the argument
+ :param value: A value
+ :returns: The value transformed through every supplied validator
+ :raises: The error from the first failed validator
+ """
+ assert validators, 'No validators specified for or_valid'
+ return functools.partial(_and, validators=validators)
+
+
+def _validate_schema(name, value, schema):
+ if value is None:
+ return
+ try:
+ jsonschema.validate(value, schema)
+ except jsonschema.exceptions.ValidationError as e:
+
+ # The error message includes the whole schema which can be very
+ # large and unhelpful, so truncate it to be brief and useful
+ error_msg = ' '.join(str(e).split("\n")[:3])[:-1]
+ raise exception.InvalidParameterValue(
+ _('Schema error for %s: %s') % (name, error_msg))
+ return value
+
+
+def schema(schema):
+ """Return a validator function which validates the value with jsonschema
+
+ :param: schema dict representing jsonschema to validate with
+ :returns: validator function which takes name and value arguments
+ """
+ jsonschema.Draft4Validator.check_schema(schema)
+
+ return functools.partial(_validate_schema, schema=schema)
+
+
+def _validate_dict(name, value, validators):
+ if value is None:
+ return
+ _validate_types(name, value, (dict, ))
+
+ for k, v in validators.items():
+ if k in value:
+ value[k] = v(name=k, value=value[k])
+
+ return value
+
+
+def dict_valid(**validators):
+ """Return a validator function which validates dict fields
+
+ Validators will replace the value with the validation result. Any dict
+ item which has no validator is ignored. When a key is missing in the value
+ then the corresponding validator will not be run.
+
+ :param: validators dict where the key is a dict key to validate and the
+ value is a validator function to run on that value
+ :returns: validator function which takes name and value arguments
+ """
+ return functools.partial(_validate_dict, validators=validators)
+
+
+def _validate_types(name, value, types):
+ if not isinstance(value, types):
+ str_types = ', '.join([str(t) for t in types])
+ raise exception.InvalidParameterValue(
+ _('Expected types %s for %s: %s') % (str_types, name, value))
+ return value
+
+
+def types(*types):
+ """Return a validator function which checks the value is one of the types
+
+ :param: types one or more types to use for the isinstance test
+ :returns: validator function which takes name and value arguments
+ """
+ return functools.partial(_validate_types, types=tuple(types))
+
+
+def _apply_validator(name, value, val_functions):
+ if callable(val_functions):
+ return val_functions(name, value)
+
+ for v in val_functions:
+ value = v(name, value)
+ return value
+
+
+def _inspect(function):
+ sig = inspect.signature(function)
+ param_keyword = None # **kwargs parameter
+ param_positional = None # *args parameter
+ params = []
+
+ for param in sig.parameters.values():
+ if param.kind == inspect.Parameter.POSITIONAL_OR_KEYWORD:
+ params.append(param)
+ elif param.kind == inspect.Parameter.VAR_KEYWORD:
+ param_keyword = param
+ elif param.kind == inspect.Parameter.VAR_POSITIONAL:
+ param_positional = param
+ else:
+ assert False, 'Unsupported parameter kind %s %s' % (
+ param.name, param.kind
+ )
+ return params, param_positional, param_keyword
+
+
+def validate(*args, **kwargs):
+ """Decorator which validates and transforms function arguments
+
+ """
+ assert not args, 'Validators must be specifed by argument name'
+ assert kwargs, 'No validators specified'
+ validators = kwargs
+
+ def inner_function(function):
+ params, param_positional, param_keyword = _inspect(function)
+
+ @functools.wraps(function)
+ def inner_check_args(*args, **kwargs):
+ args = list(args)
+ args_len = len(args)
+ kwargs_next = {}
+ next_arg_index = 0
+
+ if not param_keyword:
+ # ensure each named argument belongs to a param
+ kwarg_keys = set(kwargs)
+ param_names = set(p.name for p in params)
+ extra_args = kwarg_keys.difference(param_names)
+ if extra_args:
+ raise exception.InvalidParameterValue(
+ _('Unexpected arguments: %s') % ', '.join(extra_args))
+
+ for i, param in enumerate(params):
+
+ if i == 0 and param.name == 'self':
+ # skip validating self
+ continue
+
+ val_function = validators.get(param.name)
+ if not val_function:
+ continue
+
+ if i < args_len:
+ # validate positional argument
+ args[i] = val_function(param.name, args[i])
+ next_arg_index = i + 1
+
+ elif param.name in kwargs:
+ # validate keyword argument
+ kwargs_next[param.name] = val_function(
+ param.name, kwargs.pop(param.name))
+ elif param.default == inspect.Parameter.empty:
+ # no argument was provided, and there is no default
+ # in the parameter, so this is a mandatory argument
+ raise exception.InvalidParameterValue(
+ _('Missing mandatory parameter: %s') % param.name)
+
+ if param_positional:
+ # handle validating *args
+ val_function = validators.get(param_positional.name)
+ remaining = args[next_arg_index:]
+ if val_function and remaining:
+ args = args[:next_arg_index]
+ args.extend(val_function(param_positional.name, remaining))
+
+ # handle validating remaining **kwargs
+ if kwargs:
+ val_function = (param_keyword
+ and validators.get(param_keyword.name))
+ if val_function:
+ kwargs_next.update(
+ val_function(param_keyword.name, kwargs))
+ else:
+ # make sure unvalidated keyword arguments are kept
+ kwargs_next.update(kwargs)
+
+ return function(*args, **kwargs_next)
+ return inner_check_args
+ return inner_function
+
+
+patch = schema({
+ 'type': 'array',
+ 'items': {
+ 'type': 'object',
+ 'properties': {
+ 'path': {'type': 'string', 'pattern': '^(/[\\w-]+)+$'},
+ 'op': {'type': 'string', 'enum': ['add', 'replace', 'remove']},
+ 'value': {}
+ },
+ 'additionalProperties': False
+ }
+})
+"""Validate a patch API operation"""