diff options
Diffstat (limited to 'jsonpatch.py')
| -rw-r--r-- | jsonpatch.py | 345 |
1 files changed, 261 insertions, 84 deletions
diff --git a/jsonpatch.py b/jsonpatch.py index c6460b7..67bdfdb 100644 --- a/jsonpatch.py +++ b/jsonpatch.py @@ -30,7 +30,8 @@ # THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. # -"""Apply JSON-Patches according to http://tools.ietf.org/html/draft-pbryan-json-patch-01""" +"""Apply JSON-Patches according to +http://tools.ietf.org/html/draft-pbryan-json-patch-04""" # Will be parsed by setup.py to determine package metadata __author__ = 'Stefan Kögl <stefan@skoegl.net>' @@ -38,109 +39,299 @@ __version__ = '0.1' __website__ = 'https://github.com/stefankoegl/python-json-patch' __license__ = 'Modified BSD License' - import copy -import json +import sys + +if sys.version_info < (2, 6): + import simplejson as json +else: + import json + +if sys.version_info >= (3, 0): + basestring = (bytes, str) class JsonPatchException(Exception): - pass + """Base Json Patch exception""" class JsonPatchConflict(JsonPatchException): - pass + """Raises if patch could be applied due to conflict situations such as: + - attempt to add object key then it already exists; + - attempt to operate with nonexistence object key; + - attempt to insert value to array at position beyond of it size; + - etc. + """ -def apply_patch(doc, patch): - """ - >>> obj = { 'baz': 'qux', 'foo': 'bar' } - >>> patch = [ { 'remove': '/baz' } ] - >>> apply_patch(obj, patch) - {'foo': 'bar'} +def apply_patch(doc, patch, in_place=False): + """Apply list of patches to specified json document. + + :param doc: Document object. + :type doc: dict + + :param patch: JSON patch as list of dicts or raw JSON-encoded string. + :type patch: list or str + + :param in_place: While :const:`True` patch will modify target document. + By default patch will be applied to document copy. + :type in_place: bool + + :return: Patched document object. + :rtype: dict + + >>> doc = {'foo': 'bar'} + >>> other = apply_patch(doc, [{'add': '/baz', 'value': 'qux'}]) + >>> doc is not other + True + >>> other + {'foo': 'bar', 'baz': 'qux'} + >>> apply_patch(doc, [{'add': '/baz', 'value': 'qux'}], in_place=True) + {'foo': 'bar', 'baz': 'qux'} + >>> doc == other + True """ - p = JsonPatch(patch) - return p.apply(doc) + if isinstance(patch, basestring): + patch = JsonPatch.from_string(patch) + else: + patch = JsonPatch(patch) + return patch.apply(doc, in_place) +def make_patch(src, dst): + """Generates patch by comparing of two document objects. Actually is + a proxy to :meth:`JsonPatch.from_diff` method. -class JsonPatch(object): - """ A JSON Patch is a list of Patch Operations """ + :param src: Data source document object. + :type src: dict + + :param dst: Data source document object. + :type dst: dict + >>> src = {'foo': 'bar', 'numbers': [1, 3, 4, 8]} + >>> dst = {'baz': 'qux', 'numbers': [1, 4, 7]} + >>> patch = make_patch(src, dst) + >>> new = patch.apply(src) + >>> new == dst + True + """ + return JsonPatch.from_diff(src, dst) + + +class JsonPatch(object): + """A JSON Patch is a list of Patch Operations. + + >>> patch = JsonPatch([ + ... {'add': '/foo', 'value': 'bar'}, + ... {'add': '/baz', 'value': [1, 2, 3]}, + ... {'remove': '/baz/1'}, + ... {'test': '/baz', 'value': [1, 3]}, + ... {'replace': '/baz/0', 'value': 42}, + ... {'remove': '/baz/1'}, + ... ]) + >>> doc = {} + >>> patch.apply(doc) + {'foo': 'bar', 'baz': [42]} + + JsonPatch object is iterable, so you could easily access to each patch + statement in loop: + + >>> lpatch = list(patch) + >>> lpatch[0] + {'add': '/foo', 'value': 'bar'} + >>> lpatch == patch.patch + True + + Also JsonPatch could be converted directly to :class:`bool` if it contains + any operation statements: + + >>> bool(patch) + True + >>> bool(JsonPatch([])) + False + + This behavior is very handy with :func:`make_patch` to write more readable + code: + + >>> old = {'foo': 'bar', 'numbers': [1, 3, 4, 8]} + >>> new = {'baz': 'qux', 'numbers': [1, 4, 7]} + >>> patch = make_patch(old, new) + >>> if patch: + ... # document have changed, do something useful + ... patch.apply(old) #doctest: +ELLIPSIS + {...} + """ def __init__(self, patch): self.patch = patch - self.OPERATIONS = { + self.operations = { 'remove': RemoveOperation, 'add': AddOperation, 'replace': ReplaceOperation, + 'move': MoveOperation, + 'test': TestOperation } + def __str__(self): + """str(self) -> self.to_string()""" + return self.to_string() + + def __bool__(self): + return bool(self.patch) + + __nonzero__ = __bool__ + + def __iter__(self): + return iter(self.patch) @classmethod def from_string(cls, patch_str): - patch = json.loads(patch_str) - return cls(patch) + """Creates JsonPatch instance from string source. + :param patch_str: JSON patch as raw string. + :type patch_str: str - def apply(self, obj): - """ Applies the patch to a copy of the given object """ + :return: :class:`JsonPatch` instance. + """ + patch = json.loads(patch_str) + return cls(patch) - obj = copy.deepcopy(obj) + @classmethod + def from_diff(cls, src, dst): + """Creates JsonPatch instance based on comparing of two document + objects. Json patch would be created for `src` argument against `dst` + one. + + :param src: Data source document object. + :type src: dict + + :param dst: Data source document object. + :type dst: dict + + :return: :class:`JsonPatch` instance. + + >>> src = {'foo': 'bar', 'numbers': [1, 3, 4, 8]} + >>> dst = {'baz': 'qux', 'numbers': [1, 4, 7]} + >>> patch = JsonPatch.from_diff(src, dst) + >>> new = patch.apply(src) + >>> new == dst + True + """ + def compare_values(path, value, other): + if isinstance(value, dict) and isinstance(other, dict): + for operation in compare_dict(path, value, other): + yield operation + elif isinstance(value, list) and isinstance(other, list): + for operation in compare_list(path, value, other): + yield operation + else: + yield {'replace': '/'.join(path), 'value': other} + + def compare_dict(path, src, dst): + for key in src: + if key not in dst: + yield {'remove': '/'.join(path + [key])} + elif src[key] != dst[key]: + current = path + [key] + for operation in compare_values(current, src[key], dst[key]): + yield operation + for key in dst: + if key not in src: + yield {'add': '/'.join(path + [key]), 'value': dst[key]} + + def compare_list(path, src, dst): + lsrc, ldst = len(src), len(dst) + for idx in reversed(range(max(lsrc, ldst))): + if idx < lsrc and idx < ldst: + current = path + [str(idx)] + for operation in compare_values(current, src[idx], dst[idx]): + yield operation + elif idx < ldst: + yield {'add': '/'.join(path + [str(idx)]), + 'value': dst[idx]} + elif idx < lsrc: + yield {'remove': '/'.join(path + [str(idx)])} + + return cls(list(compare_dict([''], src, dst))) + + def to_string(self): + """Returns patch set as JSON string.""" + return json.dumps(self.patch) + + def apply(self, obj, in_place=False): + """Applies the patch to given object. + + :param obj: Document object. + :type obj: dict + + :param in_place: Tweaks way how patch would be applied - directly to + specified `obj` or to his copy. + :type in_place: bool + + :return: Modified `obj`. + """ + + if not in_place: + obj = copy.deepcopy(obj) for operation in self.patch: - op = self._get_operation(operation) - op.apply(obj) + operation = self._get_operation(operation) + operation.apply(obj) return obj - def _get_operation(self, operation): - for action, op_cls in self.OPERATIONS.items(): + for action, op_cls in self.operations.items(): if action in operation: location = operation[action] - op = op_cls(location, operation) - return op + return op_cls(location, operation) raise JsonPatchException("invalid operation '%s'" % operation) - class PatchOperation(object): - """ A single operation inside a JSON Patch """ + """A single operation inside a JSON Patch.""" def __init__(self, location, operation): self.location = location self.operation = operation + def apply(self, obj): + """Abstract method that applies patch operation to specified object.""" + raise NotImplementedError('should implement patch operation.') def locate(self, obj, location, last_must_exist=True): - """ Walks through the object according to location + """Walks through the object according to location. - Returns the last step as (sub-object, last location-step) """ + Returns the last step as (sub-object, last location-step).""" parts = location.split('/') if parts.pop(0) != '': raise JsonPatchException('location must starts with /') for part in parts[:-1]: - obj, loc_part = self._step(obj, part) + obj, _ = self._step(obj, part) _, last_loc = self._step(obj, parts[-1], must_exist=last_must_exist) return obj, last_loc - def _step(self, obj, loc_part, must_exist=True): - """ Goes one step in a locate() call """ + """Goes one step in a locate() call.""" - # Its not clear if a location "1" should be considered as 1 or "1" - # We prefer the integer-variant if possible - part_variants = self._try_parse(loc_part) + [loc_part] - - for variant in part_variants: - try: + if isinstance(obj, dict): + part_variants = [loc_part] + for variant in part_variants: + if variant not in obj: + continue + return obj[variant], variant + elif isinstance(obj, list): + part_variants = [int(loc_part)] + for variant in part_variants: + if variant >= len(obj): + continue return obj[variant], variant - except: - continue + else: + raise ValueError('list or dict expected, got %r' % type(obj)) if must_exist: raise JsonPatchConflict('key %s not found' % loc_part) @@ -148,27 +339,8 @@ class PatchOperation(object): return obj, part_variants[0] - @staticmethod - def _try_parse(val, cls=int): - try: - return [cls(val)] - except: - return [] - - class RemoveOperation(PatchOperation): - """ Removes an object property or an array element - - >>> obj = { 'baz': 'qux', 'foo': 'bar' } - >>> patch = JsonPatch( [ { 'remove': '/baz' } ] ) - >>> patch.apply(obj) - {'foo': 'bar'} - - >>> obj = { 'foo': [ 'bar', 'qux', 'baz' ] } - >>> patch = JsonPatch( [ { "remove": "/foo/1" } ] ) - >>> patch.apply(obj) - {'foo': ['bar', 'baz']} - """ + """Removes an object property or an array element.""" def apply(self, obj): subobj, part = self.locate(obj, self.location) @@ -176,18 +348,7 @@ class RemoveOperation(PatchOperation): class AddOperation(PatchOperation): - """ Adds an object property or an array element - - >>> obj = { "foo": "bar" } - >>> patch = JsonPatch([ { "add": "/baz", "value": "qux" } ]) - >>> patch.apply(obj) - {'foo': 'bar', 'baz': 'qux'} - - >>> obj = { "foo": [ "bar", "baz" ] } - >>> patch = JsonPatch([ { "add": "/foo/1", "value": "qux" } ]) - >>> patch.apply(obj) - {'foo': ['bar', 'qux', 'baz']} - """ + """Adds an object property or an array element.""" def apply(self, obj): value = self.operation["value"] @@ -206,17 +367,12 @@ class AddOperation(PatchOperation): subobj[part] = value else: - raise JsonPatchConflict("can't add to type '%s'" % subobj.__class__.__name__) + raise JsonPatchConflict("can't add to type '%s'" + "" % subobj.__class__.__name__) class ReplaceOperation(PatchOperation): - """ Replaces a value - - >>> obj = { "baz": "qux", "foo": "bar" } - >>> patch = JsonPatch([ { "replace": "/baz", "value": "boo" } ]) - >>> patch.apply(obj) - {'foo': 'bar', 'baz': 'boo'} - """ + """Replaces an object property or an array element by new value.""" def apply(self, obj): value = self.operation["value"] @@ -228,9 +384,30 @@ class ReplaceOperation(PatchOperation): elif isinstance(subobj, dict): if not part in subobj: - raise JsonPatchConflict("can't replace non-existant object '%s'" % part) + raise JsonPatchConflict("can't replace non-existant object '%s'" + "" % part) else: - raise JsonPatchConflict("can't replace in type '%s'" % subobj.__class__.__name__) + raise JsonPatchConflict("can't replace in type '%s'" + "" % subobj.__class__.__name__) subobj[part] = value + + +class MoveOperation(PatchOperation): + """Moves an object property or an array element to new location.""" + + def apply(self, obj): + subobj, part = self.locate(obj, self.location) + value = subobj[part] + RemoveOperation(self.location, self.operation).apply(obj) + AddOperation(self.operation['to'], {'value': value}).apply(obj) + + +class TestOperation(PatchOperation): + """Test value by specified location.""" + + def apply(self, obj): + value = self.operation['value'] + subobj, part = self.locate(obj, self.location) + assert subobj[part] == value |
