diff options
Diffstat (limited to 'jsonschema/validators.py')
-rw-r--r-- | jsonschema/validators.py | 508 |
1 files changed, 508 insertions, 0 deletions
diff --git a/jsonschema/validators.py b/jsonschema/validators.py new file mode 100644 index 0000000..22abf57 --- /dev/null +++ b/jsonschema/validators.py @@ -0,0 +1,508 @@ +from __future__ import division, unicode_literals + +import collections +import contextlib +import json +import numbers + +try: + import requests +except ImportError: + requests = None + +from jsonschema import _utils, _validators +from jsonschema.compat import ( + PY3, Sequence, urljoin, urlsplit, urldefrag, unquote, urlopen, + str_types, int_types, iteritems, +) +from jsonschema.exceptions import RefResolutionError, SchemaError, UnknownType + + +_unset = _utils.Unset() + +validators = {} +meta_schemas = _utils.URIDict() + + +def validates(version): + """ + Register the decorated validator for a ``version`` of the specification. + + Registered validators and their meta schemas will be considered when + parsing ``$schema`` properties' URIs. + + :argument str version: an identifier to use as the version's name + :returns: a class decorator to decorate the validator with the version + + """ + + def _validates(cls): + validators[version] = cls + if "id" in cls.META_SCHEMA: + meta_schemas[cls.META_SCHEMA["id"]] = cls + return cls + return _validates + + +def create(meta_schema, validators=(), version=None, default_types=None): # noqa + if default_types is None: + default_types = { + "array" : list, "boolean" : bool, "integer" : int_types, + "null" : type(None), "number" : numbers.Number, "object" : dict, + "string" : str_types, + } + + class Validator(object): + VALIDATORS = dict(validators) + META_SCHEMA = dict(meta_schema) + DEFAULT_TYPES = dict(default_types) + + def __init__( + self, schema, types=(), resolver=None, format_checker=None, + ): + self._types = dict(self.DEFAULT_TYPES) + self._types.update(types) + + if resolver is None: + resolver = RefResolver.from_schema(schema) + + self.resolver = resolver + self.format_checker = format_checker + self.schema = schema + + @classmethod + def check_schema(cls, schema): + for error in cls(cls.META_SCHEMA).iter_errors(schema): + raise SchemaError.create_from(error) + + def iter_errors(self, instance, _schema=None): + if _schema is None: + _schema = self.schema + + with self.resolver.in_scope(_schema.get("id", "")): + ref = _schema.get("$ref") + if ref is not None: + validators = [("$ref", ref)] + else: + validators = iteritems(_schema) + + for k, v in validators: + validator = self.VALIDATORS.get(k) + if validator is None: + continue + + errors = validator(self, v, instance, _schema) or () + for error in errors: + # set details if not already set by the called fn + error._set( + validator=k, + validator_value=v, + instance=instance, + schema=_schema, + ) + if k != "$ref": + error.schema_path.appendleft(k) + yield error + + def descend(self, instance, schema, path=None, schema_path=None): + for error in self.iter_errors(instance, schema): + if path is not None: + error.path.appendleft(path) + if schema_path is not None: + error.schema_path.appendleft(schema_path) + yield error + + def validate(self, *args, **kwargs): + for error in self.iter_errors(*args, **kwargs): + raise error + + def is_type(self, instance, type): + if type not in self._types: + raise UnknownType(type) + pytypes = self._types[type] + + # bool inherits from int, so ensure bools aren't reported as ints + if isinstance(instance, bool): + pytypes = _utils.flatten(pytypes) + is_number = any( + issubclass(pytype, numbers.Number) for pytype in pytypes + ) + if is_number and bool not in pytypes: + return False + return isinstance(instance, pytypes) + + def is_valid(self, instance, _schema=None): + error = next(self.iter_errors(instance, _schema), None) + return error is None + + if version is not None: + Validator = validates(version)(Validator) + + name = "{0}Validator".format(version.title().replace(" ", "")) + if not PY3 and isinstance(name, unicode): + name = name.encode("utf-8") + Validator.__name__ = name + + return Validator + + +def extend(validator, validators, version=None): + all_validators = dict(validator.VALIDATORS) + all_validators.update(validators) + return create( + meta_schema=validator.META_SCHEMA, + validators=all_validators, + version=version, + default_types=validator.DEFAULT_TYPES, + ) + + +Draft3Validator = create( + meta_schema=_utils.load_schema("draft3"), + validators={ + "$ref" : _validators.ref, + "additionalItems" : _validators.additionalItems, + "additionalProperties" : _validators.additionalProperties, + "dependencies" : _validators.dependencies, + "disallow" : _validators.disallow_draft3, + "divisibleBy" : _validators.multipleOf, + "enum" : _validators.enum, + "extends" : _validators.extends_draft3, + "format" : _validators.format, + "items" : _validators.items, + "maxItems" : _validators.maxItems, + "maxLength" : _validators.maxLength, + "maximum" : _validators.maximum, + "minItems" : _validators.minItems, + "minLength" : _validators.minLength, + "minimum" : _validators.minimum, + "multipleOf" : _validators.multipleOf, + "pattern" : _validators.pattern, + "patternProperties" : _validators.patternProperties, + "properties" : _validators.properties_draft3, + "type" : _validators.type_draft3, + "uniqueItems" : _validators.uniqueItems, + }, + version="draft3", +) + +Draft4Validator = create( + meta_schema=_utils.load_schema("draft4"), + validators={ + "$ref" : _validators.ref, + "additionalItems" : _validators.additionalItems, + "additionalProperties" : _validators.additionalProperties, + "allOf" : _validators.allOf_draft4, + "anyOf" : _validators.anyOf_draft4, + "dependencies" : _validators.dependencies, + "enum" : _validators.enum, + "format" : _validators.format, + "items" : _validators.items, + "maxItems" : _validators.maxItems, + "maxLength" : _validators.maxLength, + "maxProperties" : _validators.maxProperties_draft4, + "maximum" : _validators.maximum, + "minItems" : _validators.minItems, + "minLength" : _validators.minLength, + "minProperties" : _validators.minProperties_draft4, + "minimum" : _validators.minimum, + "multipleOf" : _validators.multipleOf, + "not" : _validators.not_draft4, + "oneOf" : _validators.oneOf_draft4, + "pattern" : _validators.pattern, + "patternProperties" : _validators.patternProperties, + "properties" : _validators.properties_draft4, + "required" : _validators.required_draft4, + "type" : _validators.type_draft4, + "uniqueItems" : _validators.uniqueItems, + }, + version="draft4", +) + + +class RefResolver(object): + """ + Resolve JSON References. + + :argument str base_uri: URI of the referring document + :argument referrer: the actual referring document + :argument dict store: a mapping from URIs to documents to cache + :argument bool cache_remote: whether remote refs should be cached after + first resolution + :argument dict handlers: a mapping from URI schemes to functions that + should be used to retrieve them + + """ + + def __init__( + self, base_uri, referrer, store=(), cache_remote=True, handlers=(), + ): + self.base_uri = base_uri + self.resolution_scope = base_uri + # This attribute is not used, it is for backwards compatibility + self.referrer = referrer + self.cache_remote = cache_remote + self.handlers = dict(handlers) + + self.store = _utils.URIDict( + (id, validator.META_SCHEMA) + for id, validator in iteritems(meta_schemas) + ) + self.store.update(store) + self.store[base_uri] = referrer + + @classmethod + def from_schema(cls, schema, *args, **kwargs): + """ + Construct a resolver from a JSON schema object. + + :argument schema schema: the referring schema + :rtype: :class:`RefResolver` + + """ + + return cls(schema.get("id", ""), schema, *args, **kwargs) + + @contextlib.contextmanager + def in_scope(self, scope): + old_scope = self.resolution_scope + self.resolution_scope = urljoin(old_scope, scope) + try: + yield + finally: + self.resolution_scope = old_scope + + @contextlib.contextmanager + def resolving(self, ref): + """ + Context manager which resolves a JSON ``ref`` and enters the + resolution scope of this ref. + + :argument str ref: reference to resolve + + """ + + full_uri = urljoin(self.resolution_scope, ref) + uri, fragment = urldefrag(full_uri) + if not uri: + uri = self.base_uri + + if uri in self.store: + document = self.store[uri] + else: + try: + document = self.resolve_remote(uri) + except Exception as exc: + raise RefResolutionError(exc) + + old_base_uri, self.base_uri = self.base_uri, uri + try: + with self.in_scope(uri): + yield self.resolve_fragment(document, fragment) + finally: + self.base_uri = old_base_uri + + def resolve_fragment(self, document, fragment): + """ + Resolve a ``fragment`` within the referenced ``document``. + + :argument document: the referrant document + :argument str fragment: a URI fragment to resolve within it + + """ + + fragment = fragment.lstrip("/") + parts = unquote(fragment).split("/") if fragment else [] + + for part in parts: + part = part.replace("~1", "/").replace("~0", "~") + + if isinstance(document, Sequence): + # Array indexes should be turned into integers + try: + part = int(part) + except ValueError: + pass + try: + document = document[part] + except (TypeError, LookupError): + raise RefResolutionError( + "Unresolvable JSON pointer: %r" % fragment + ) + + return document + + def resolve_remote(self, uri): + """ + Resolve a remote ``uri``. + + Does not check the store first, but stores the retrieved document in + the store if :attr:`RefResolver.cache_remote` is True. + + .. note:: + + If the requests_ library is present, ``jsonschema`` will use it to + request the remote ``uri``, so that the correct encoding is + detected and used. + + If it isn't, or if the scheme of the ``uri`` is not ``http`` or + ``https``, UTF-8 is assumed. + + :argument str uri: the URI to resolve + :returns: the retrieved document + + .. _requests: http://pypi.python.org/pypi/requests/ + + """ + + scheme = urlsplit(uri).scheme + + if scheme in self.handlers: + result = self.handlers[scheme](uri) + elif ( + scheme in ["http", "https"] and + requests and + getattr(requests.Response, "json", None) is not None + ): + # Requests has support for detecting the correct encoding of + # json over http + if callable(requests.Response.json): + result = requests.get(uri).json() + else: + result = requests.get(uri).json + else: + # Otherwise, pass off to urllib and assume utf-8 + result = json.loads(urlopen(uri).read().decode("utf-8")) + + if self.cache_remote: + self.store[uri] = result + return result + + +class ErrorTree(object): + """ + ErrorTrees make it easier to check which validations failed. + + """ + + _instance = _unset + + def __init__(self, errors=()): + self.errors = {} + self._contents = collections.defaultdict(self.__class__) + + for error in errors: + container = self + for element in error.path: + container = container[element] + container.errors[error.validator] = error + + self._instance = error.instance + + def __contains__(self, index): + """ + Check whether ``instance[index]`` has any errors. + + """ + + return index in self._contents + + def __getitem__(self, index): + """ + Retrieve the child tree one level down at the given ``index``. + + If the index is not in the instance that this tree corresponds to and + is not known by this tree, whatever error would be raised by + ``instance.__getitem__`` will be propagated (usually this is some + subclass of :class:`LookupError`. + + """ + + if self._instance is not _unset and index not in self: + self._instance[index] + return self._contents[index] + + def __setitem__(self, index, value): + self._contents[index] = value + + def __iter__(self): + """ + Iterate (non-recursively) over the indices in the instance with errors. + + """ + + return iter(self._contents) + + def __len__(self): + """ + Same as :attr:`total_errors`. + + """ + + return self.total_errors + + def __repr__(self): + return "<%s (%s total errors)>" % (self.__class__.__name__, len(self)) + + @property + def total_errors(self): + """ + The total number of errors in the entire tree, including children. + + """ + + child_errors = sum(len(tree) for _, tree in iteritems(self._contents)) + return len(self.errors) + child_errors + + +def validator_for(schema, default=_unset): + if default is _unset: + default = Draft4Validator + return meta_schemas.get(schema.get("$schema", ""), default) + + +def validate(instance, schema, cls=None, *args, **kwargs): + """ + Validate an instance under the given schema. + + >>> validate([2, 3, 4], {"maxItems" : 2}) + Traceback (most recent call last): + ... + ValidationError: [2, 3, 4] is too long + + :func:`validate` will first verify that the provided schema is itself + valid, since not doing so can lead to less obvious error messages and fail + in less obvious or consistent ways. If you know you have a valid schema + already or don't care, you might prefer using the + :meth:`~IValidator.validate` method directly on a specific validator + (e.g. :meth:`Draft4Validator.validate`). + + + :argument instance: the instance to validate + :argument schema: the schema to validate with + :argument cls: an :class:`IValidator` class that will be used to validate + the instance. + + If the ``cls`` argument is not provided, two things will happen in + accordance with the specification. First, if the schema has a + :validator:`$schema` property containing a known meta-schema [#]_ then the + proper validator will be used. The specification recommends that all + schemas contain :validator:`$schema` properties for this reason. If no + :validator:`$schema` property is found, the default validator class is + :class:`Draft4Validator`. + + Any other provided positional and keyword arguments will be passed on when + instantiating the ``cls``. + + :raises: + :exc:`ValidationError` if the instance is invalid + + :exc:`SchemaError` if the schema itself is invalid + + .. rubric:: Footnotes + .. [#] known by a validator registered with :func:`validates` + """ + if cls is None: + cls = validator_for(schema) + cls.check_schema(schema) + cls(schema, *args, **kwargs).validate(instance) |