summaryrefslogtreecommitdiff
path: root/jsonpatch.py
diff options
context:
space:
mode:
Diffstat (limited to 'jsonpatch.py')
-rw-r--r--jsonpatch.py345
1 files changed, 261 insertions, 84 deletions
diff --git a/jsonpatch.py b/jsonpatch.py
index c6460b7..67bdfdb 100644
--- a/jsonpatch.py
+++ b/jsonpatch.py
@@ -30,7 +30,8 @@
# THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#
-"""Apply JSON-Patches according to http://tools.ietf.org/html/draft-pbryan-json-patch-01"""
+"""Apply JSON-Patches according to
+http://tools.ietf.org/html/draft-pbryan-json-patch-04"""
# Will be parsed by setup.py to determine package metadata
__author__ = 'Stefan Kögl <stefan@skoegl.net>'
@@ -38,109 +39,299 @@ __version__ = '0.1'
__website__ = 'https://github.com/stefankoegl/python-json-patch'
__license__ = 'Modified BSD License'
-
import copy
-import json
+import sys
+
+if sys.version_info < (2, 6):
+ import simplejson as json
+else:
+ import json
+
+if sys.version_info >= (3, 0):
+ basestring = (bytes, str)
class JsonPatchException(Exception):
- pass
+ """Base Json Patch exception"""
class JsonPatchConflict(JsonPatchException):
- pass
+ """Raises if patch could be applied due to conflict situations such as:
+ - attempt to add object key then it already exists;
+ - attempt to operate with nonexistence object key;
+ - attempt to insert value to array at position beyond of it size;
+ - etc.
+ """
-def apply_patch(doc, patch):
- """
- >>> obj = { 'baz': 'qux', 'foo': 'bar' }
- >>> patch = [ { 'remove': '/baz' } ]
- >>> apply_patch(obj, patch)
- {'foo': 'bar'}
+def apply_patch(doc, patch, in_place=False):
+ """Apply list of patches to specified json document.
+
+ :param doc: Document object.
+ :type doc: dict
+
+ :param patch: JSON patch as list of dicts or raw JSON-encoded string.
+ :type patch: list or str
+
+ :param in_place: While :const:`True` patch will modify target document.
+ By default patch will be applied to document copy.
+ :type in_place: bool
+
+ :return: Patched document object.
+ :rtype: dict
+
+ >>> doc = {'foo': 'bar'}
+ >>> other = apply_patch(doc, [{'add': '/baz', 'value': 'qux'}])
+ >>> doc is not other
+ True
+ >>> other
+ {'foo': 'bar', 'baz': 'qux'}
+ >>> apply_patch(doc, [{'add': '/baz', 'value': 'qux'}], in_place=True)
+ {'foo': 'bar', 'baz': 'qux'}
+ >>> doc == other
+ True
"""
- p = JsonPatch(patch)
- return p.apply(doc)
+ if isinstance(patch, basestring):
+ patch = JsonPatch.from_string(patch)
+ else:
+ patch = JsonPatch(patch)
+ return patch.apply(doc, in_place)
+def make_patch(src, dst):
+ """Generates patch by comparing of two document objects. Actually is
+ a proxy to :meth:`JsonPatch.from_diff` method.
-class JsonPatch(object):
- """ A JSON Patch is a list of Patch Operations """
+ :param src: Data source document object.
+ :type src: dict
+
+ :param dst: Data source document object.
+ :type dst: dict
+ >>> src = {'foo': 'bar', 'numbers': [1, 3, 4, 8]}
+ >>> dst = {'baz': 'qux', 'numbers': [1, 4, 7]}
+ >>> patch = make_patch(src, dst)
+ >>> new = patch.apply(src)
+ >>> new == dst
+ True
+ """
+ return JsonPatch.from_diff(src, dst)
+
+
+class JsonPatch(object):
+ """A JSON Patch is a list of Patch Operations.
+
+ >>> patch = JsonPatch([
+ ... {'add': '/foo', 'value': 'bar'},
+ ... {'add': '/baz', 'value': [1, 2, 3]},
+ ... {'remove': '/baz/1'},
+ ... {'test': '/baz', 'value': [1, 3]},
+ ... {'replace': '/baz/0', 'value': 42},
+ ... {'remove': '/baz/1'},
+ ... ])
+ >>> doc = {}
+ >>> patch.apply(doc)
+ {'foo': 'bar', 'baz': [42]}
+
+ JsonPatch object is iterable, so you could easily access to each patch
+ statement in loop:
+
+ >>> lpatch = list(patch)
+ >>> lpatch[0]
+ {'add': '/foo', 'value': 'bar'}
+ >>> lpatch == patch.patch
+ True
+
+ Also JsonPatch could be converted directly to :class:`bool` if it contains
+ any operation statements:
+
+ >>> bool(patch)
+ True
+ >>> bool(JsonPatch([]))
+ False
+
+ This behavior is very handy with :func:`make_patch` to write more readable
+ code:
+
+ >>> old = {'foo': 'bar', 'numbers': [1, 3, 4, 8]}
+ >>> new = {'baz': 'qux', 'numbers': [1, 4, 7]}
+ >>> patch = make_patch(old, new)
+ >>> if patch:
+ ... # document have changed, do something useful
+ ... patch.apply(old) #doctest: +ELLIPSIS
+ {...}
+ """
def __init__(self, patch):
self.patch = patch
- self.OPERATIONS = {
+ self.operations = {
'remove': RemoveOperation,
'add': AddOperation,
'replace': ReplaceOperation,
+ 'move': MoveOperation,
+ 'test': TestOperation
}
+ def __str__(self):
+ """str(self) -> self.to_string()"""
+ return self.to_string()
+
+ def __bool__(self):
+ return bool(self.patch)
+
+ __nonzero__ = __bool__
+
+ def __iter__(self):
+ return iter(self.patch)
@classmethod
def from_string(cls, patch_str):
- patch = json.loads(patch_str)
- return cls(patch)
+ """Creates JsonPatch instance from string source.
+ :param patch_str: JSON patch as raw string.
+ :type patch_str: str
- def apply(self, obj):
- """ Applies the patch to a copy of the given object """
+ :return: :class:`JsonPatch` instance.
+ """
+ patch = json.loads(patch_str)
+ return cls(patch)
- obj = copy.deepcopy(obj)
+ @classmethod
+ def from_diff(cls, src, dst):
+ """Creates JsonPatch instance based on comparing of two document
+ objects. Json patch would be created for `src` argument against `dst`
+ one.
+
+ :param src: Data source document object.
+ :type src: dict
+
+ :param dst: Data source document object.
+ :type dst: dict
+
+ :return: :class:`JsonPatch` instance.
+
+ >>> src = {'foo': 'bar', 'numbers': [1, 3, 4, 8]}
+ >>> dst = {'baz': 'qux', 'numbers': [1, 4, 7]}
+ >>> patch = JsonPatch.from_diff(src, dst)
+ >>> new = patch.apply(src)
+ >>> new == dst
+ True
+ """
+ def compare_values(path, value, other):
+ if isinstance(value, dict) and isinstance(other, dict):
+ for operation in compare_dict(path, value, other):
+ yield operation
+ elif isinstance(value, list) and isinstance(other, list):
+ for operation in compare_list(path, value, other):
+ yield operation
+ else:
+ yield {'replace': '/'.join(path), 'value': other}
+
+ def compare_dict(path, src, dst):
+ for key in src:
+ if key not in dst:
+ yield {'remove': '/'.join(path + [key])}
+ elif src[key] != dst[key]:
+ current = path + [key]
+ for operation in compare_values(current, src[key], dst[key]):
+ yield operation
+ for key in dst:
+ if key not in src:
+ yield {'add': '/'.join(path + [key]), 'value': dst[key]}
+
+ def compare_list(path, src, dst):
+ lsrc, ldst = len(src), len(dst)
+ for idx in reversed(range(max(lsrc, ldst))):
+ if idx < lsrc and idx < ldst:
+ current = path + [str(idx)]
+ for operation in compare_values(current, src[idx], dst[idx]):
+ yield operation
+ elif idx < ldst:
+ yield {'add': '/'.join(path + [str(idx)]),
+ 'value': dst[idx]}
+ elif idx < lsrc:
+ yield {'remove': '/'.join(path + [str(idx)])}
+
+ return cls(list(compare_dict([''], src, dst)))
+
+ def to_string(self):
+ """Returns patch set as JSON string."""
+ return json.dumps(self.patch)
+
+ def apply(self, obj, in_place=False):
+ """Applies the patch to given object.
+
+ :param obj: Document object.
+ :type obj: dict
+
+ :param in_place: Tweaks way how patch would be applied - directly to
+ specified `obj` or to his copy.
+ :type in_place: bool
+
+ :return: Modified `obj`.
+ """
+
+ if not in_place:
+ obj = copy.deepcopy(obj)
for operation in self.patch:
- op = self._get_operation(operation)
- op.apply(obj)
+ operation = self._get_operation(operation)
+ operation.apply(obj)
return obj
-
def _get_operation(self, operation):
- for action, op_cls in self.OPERATIONS.items():
+ for action, op_cls in self.operations.items():
if action in operation:
location = operation[action]
- op = op_cls(location, operation)
- return op
+ return op_cls(location, operation)
raise JsonPatchException("invalid operation '%s'" % operation)
-
class PatchOperation(object):
- """ A single operation inside a JSON Patch """
+ """A single operation inside a JSON Patch."""
def __init__(self, location, operation):
self.location = location
self.operation = operation
+ def apply(self, obj):
+ """Abstract method that applies patch operation to specified object."""
+ raise NotImplementedError('should implement patch operation.')
def locate(self, obj, location, last_must_exist=True):
- """ Walks through the object according to location
+ """Walks through the object according to location.
- Returns the last step as (sub-object, last location-step) """
+ Returns the last step as (sub-object, last location-step)."""
parts = location.split('/')
if parts.pop(0) != '':
raise JsonPatchException('location must starts with /')
for part in parts[:-1]:
- obj, loc_part = self._step(obj, part)
+ obj, _ = self._step(obj, part)
_, last_loc = self._step(obj, parts[-1], must_exist=last_must_exist)
return obj, last_loc
-
def _step(self, obj, loc_part, must_exist=True):
- """ Goes one step in a locate() call """
+ """Goes one step in a locate() call."""
- # Its not clear if a location "1" should be considered as 1 or "1"
- # We prefer the integer-variant if possible
- part_variants = self._try_parse(loc_part) + [loc_part]
-
- for variant in part_variants:
- try:
+ if isinstance(obj, dict):
+ part_variants = [loc_part]
+ for variant in part_variants:
+ if variant not in obj:
+ continue
+ return obj[variant], variant
+ elif isinstance(obj, list):
+ part_variants = [int(loc_part)]
+ for variant in part_variants:
+ if variant >= len(obj):
+ continue
return obj[variant], variant
- except:
- continue
+ else:
+ raise ValueError('list or dict expected, got %r' % type(obj))
if must_exist:
raise JsonPatchConflict('key %s not found' % loc_part)
@@ -148,27 +339,8 @@ class PatchOperation(object):
return obj, part_variants[0]
- @staticmethod
- def _try_parse(val, cls=int):
- try:
- return [cls(val)]
- except:
- return []
-
-
class RemoveOperation(PatchOperation):
- """ Removes an object property or an array element
-
- >>> obj = { 'baz': 'qux', 'foo': 'bar' }
- >>> patch = JsonPatch( [ { 'remove': '/baz' } ] )
- >>> patch.apply(obj)
- {'foo': 'bar'}
-
- >>> obj = { 'foo': [ 'bar', 'qux', 'baz' ] }
- >>> patch = JsonPatch( [ { "remove": "/foo/1" } ] )
- >>> patch.apply(obj)
- {'foo': ['bar', 'baz']}
- """
+ """Removes an object property or an array element."""
def apply(self, obj):
subobj, part = self.locate(obj, self.location)
@@ -176,18 +348,7 @@ class RemoveOperation(PatchOperation):
class AddOperation(PatchOperation):
- """ Adds an object property or an array element
-
- >>> obj = { "foo": "bar" }
- >>> patch = JsonPatch([ { "add": "/baz", "value": "qux" } ])
- >>> patch.apply(obj)
- {'foo': 'bar', 'baz': 'qux'}
-
- >>> obj = { "foo": [ "bar", "baz" ] }
- >>> patch = JsonPatch([ { "add": "/foo/1", "value": "qux" } ])
- >>> patch.apply(obj)
- {'foo': ['bar', 'qux', 'baz']}
- """
+ """Adds an object property or an array element."""
def apply(self, obj):
value = self.operation["value"]
@@ -206,17 +367,12 @@ class AddOperation(PatchOperation):
subobj[part] = value
else:
- raise JsonPatchConflict("can't add to type '%s'" % subobj.__class__.__name__)
+ raise JsonPatchConflict("can't add to type '%s'"
+ "" % subobj.__class__.__name__)
class ReplaceOperation(PatchOperation):
- """ Replaces a value
-
- >>> obj = { "baz": "qux", "foo": "bar" }
- >>> patch = JsonPatch([ { "replace": "/baz", "value": "boo" } ])
- >>> patch.apply(obj)
- {'foo': 'bar', 'baz': 'boo'}
- """
+ """Replaces an object property or an array element by new value."""
def apply(self, obj):
value = self.operation["value"]
@@ -228,9 +384,30 @@ class ReplaceOperation(PatchOperation):
elif isinstance(subobj, dict):
if not part in subobj:
- raise JsonPatchConflict("can't replace non-existant object '%s'" % part)
+ raise JsonPatchConflict("can't replace non-existant object '%s'"
+ "" % part)
else:
- raise JsonPatchConflict("can't replace in type '%s'" % subobj.__class__.__name__)
+ raise JsonPatchConflict("can't replace in type '%s'"
+ "" % subobj.__class__.__name__)
subobj[part] = value
+
+
+class MoveOperation(PatchOperation):
+ """Moves an object property or an array element to new location."""
+
+ def apply(self, obj):
+ subobj, part = self.locate(obj, self.location)
+ value = subobj[part]
+ RemoveOperation(self.location, self.operation).apply(obj)
+ AddOperation(self.operation['to'], {'value': value}).apply(obj)
+
+
+class TestOperation(PatchOperation):
+ """Test value by specified location."""
+
+ def apply(self, obj):
+ value = self.operation['value']
+ subobj, part = self.locate(obj, self.location)
+ assert subobj[part] == value