summaryrefslogtreecommitdiff
path: root/kazoo/protocol/serialization.py
diff options
context:
space:
mode:
authorBen Bangert <ben@groovie.org>2012-09-07 17:00:29 -0700
committerBen Bangert <ben@groovie.org>2012-09-07 17:00:29 -0700
commit2f73dab9d0cfec68d94de18b2910e8d97aefc724 (patch)
treeddff63bc4ba52a2235c713d5c77c1fb736206d8e /kazoo/protocol/serialization.py
parent598eb80cafe5fa0c6a474f9f4ac157d79afcaa39 (diff)
downloadkazoo-2f73dab9d0cfec68d94de18b2910e8d97aefc724.tar.gz
Add transactions and some tests for them.
Diffstat (limited to 'kazoo/protocol/serialization.py')
-rw-r--r--kazoo/protocol/serialization.py87
1 files changed, 80 insertions, 7 deletions
diff --git a/kazoo/protocol/serialization.py b/kazoo/protocol/serialization.py
index 474ba15..8324f3c 100644
--- a/kazoo/protocol/serialization.py
+++ b/kazoo/protocol/serialization.py
@@ -2,6 +2,7 @@
from collections import namedtuple
import struct
+from kazoo.exceptions import EXCEPTIONS
from kazoo.protocol.states import ZnodeStat
from kazoo.security import ACL
from kazoo.security import Id
@@ -13,6 +14,7 @@ int_int_struct = struct.Struct('!ii')
int_int_long_struct = struct.Struct('!iiq')
int_long_int_long_struct = struct.Struct('!iqiq')
+multiheader_struct = struct.Struct('!iBi')
reply_header_struct = struct.Struct('!iqi')
stat_struct = struct.Struct('!qqqqiiiqiiq')
@@ -287,6 +289,62 @@ class GetChildren2(namedtuple('GetChildren2', 'path watcher')):
return children, stat
+class CheckVersion(namedtuple('CheckVersion', 'path version')):
+ type = 13
+
+ def serialize(self):
+ b = bytearray()
+ b.extend(write_string(self.path))
+ b.extend(int_struct.pack(self.version))
+ return b
+
+
+class Transaction(namedtuple('Transaction', 'operations')):
+ type = 14
+
+ def serialize(self):
+ b = bytearray()
+ for op in self.operations:
+ b.extend(MultiHeader(op.type, False, -1).serialize() +
+ op.serialize())
+ return b + multiheader_struct.pack(-1, True, -1)
+
+ @classmethod
+ def deserialize(cls, bytes, offset):
+ header = MultiHeader(None, False, None)
+ results = []
+ response = None
+ while not header.done:
+ if header.type == Create.type:
+ response, offset = read_string(bytes, offset)
+ elif header.type == Delete.type:
+ response = True
+ elif header.type == SetData.type:
+ response = ZnodeStat._make(
+ stat_struct.unpack_from(bytes, offset))
+ offset += stat_struct.size
+ elif header.type == CheckVersion.type:
+ response = True
+ elif header.type == -1:
+ err = int_struct.unpack_from(bytes, offset)[0]
+ offset += int_struct.size
+ response = EXCEPTIONS[err]()
+ if response:
+ results.append(response)
+ header, offset = MultiHeader.deserialize(bytes, offset)
+ return results
+
+ @staticmethod
+ def unchroot(client, response):
+ resp = []
+ for result in response:
+ if isinstance(result, unicode):
+ resp.append(client.unchroot(result))
+ else:
+ resp.append(result)
+ return resp
+
+
class Auth(namedtuple('Auth', 'auth_type scheme auth')):
type = 100
@@ -297,20 +355,35 @@ class Auth(namedtuple('Auth', 'auth_type scheme auth')):
class Watch(namedtuple('Watch', 'type state path')):
@classmethod
- def deserialize(cls, buffer, offset):
- """Given a buffer and the current buffer offset, return the
+ def deserialize(cls, bytes, offset):
+ """Given bytes and the current bytes offset, return the
type, state, path, and new offset"""
- type, state = int_int_struct.unpack_from(buffer, offset)
+ type, state = int_int_struct.unpack_from(bytes, offset)
offset += int_int_struct.size
- path, offset = read_string(buffer, offset)
+ path, offset = read_string(bytes, offset)
return cls(type, state, path), offset
class ReplyHeader(namedtuple('ReplyHeader', 'xid, zxid, err')):
@classmethod
- def deserialize(cls, buffer, offset):
- """Given a buffer and the current buffer offset, return a
+ def deserialize(cls, bytes, offset):
+ """Given bytes and the current bytes offset, return a
:class:`ReplyHeader` instance and the new offset"""
new_offset = offset + reply_header_struct.size
return cls._make(
- reply_header_struct.unpack_from(buffer, offset)), new_offset
+ reply_header_struct.unpack_from(bytes, offset)), new_offset
+
+
+class MultiHeader(namedtuple('MultiHeader', 'type done err')):
+ def serialize(self):
+ b = bytearray()
+ b.extend(int_struct.pack(self.type))
+ b.extend([1 if self.done else 0])
+ b.extend(int_struct.pack(self.err))
+ return b
+
+ @classmethod
+ def deserialize(cls, bytes, offset):
+ t, done, err = multiheader_struct.unpack_from(bytes, offset)
+ offset += multiheader_struct.size
+ return cls(t, done is 1, err), offset