diff options
| author | Ben Bangert <ben@groovie.org> | 2012-09-07 17:00:29 -0700 |
|---|---|---|
| committer | Ben Bangert <ben@groovie.org> | 2012-09-07 17:00:29 -0700 |
| commit | 2f73dab9d0cfec68d94de18b2910e8d97aefc724 (patch) | |
| tree | ddff63bc4ba52a2235c713d5c77c1fb736206d8e /kazoo/protocol/serialization.py | |
| parent | 598eb80cafe5fa0c6a474f9f4ac157d79afcaa39 (diff) | |
| download | kazoo-2f73dab9d0cfec68d94de18b2910e8d97aefc724.tar.gz | |
Add transactions and some tests for them.
Diffstat (limited to 'kazoo/protocol/serialization.py')
| -rw-r--r-- | kazoo/protocol/serialization.py | 87 |
1 files changed, 80 insertions, 7 deletions
diff --git a/kazoo/protocol/serialization.py b/kazoo/protocol/serialization.py index 474ba15..8324f3c 100644 --- a/kazoo/protocol/serialization.py +++ b/kazoo/protocol/serialization.py @@ -2,6 +2,7 @@ from collections import namedtuple import struct +from kazoo.exceptions import EXCEPTIONS from kazoo.protocol.states import ZnodeStat from kazoo.security import ACL from kazoo.security import Id @@ -13,6 +14,7 @@ int_int_struct = struct.Struct('!ii') int_int_long_struct = struct.Struct('!iiq') int_long_int_long_struct = struct.Struct('!iqiq') +multiheader_struct = struct.Struct('!iBi') reply_header_struct = struct.Struct('!iqi') stat_struct = struct.Struct('!qqqqiiiqiiq') @@ -287,6 +289,62 @@ class GetChildren2(namedtuple('GetChildren2', 'path watcher')): return children, stat +class CheckVersion(namedtuple('CheckVersion', 'path version')): + type = 13 + + def serialize(self): + b = bytearray() + b.extend(write_string(self.path)) + b.extend(int_struct.pack(self.version)) + return b + + +class Transaction(namedtuple('Transaction', 'operations')): + type = 14 + + def serialize(self): + b = bytearray() + for op in self.operations: + b.extend(MultiHeader(op.type, False, -1).serialize() + + op.serialize()) + return b + multiheader_struct.pack(-1, True, -1) + + @classmethod + def deserialize(cls, bytes, offset): + header = MultiHeader(None, False, None) + results = [] + response = None + while not header.done: + if header.type == Create.type: + response, offset = read_string(bytes, offset) + elif header.type == Delete.type: + response = True + elif header.type == SetData.type: + response = ZnodeStat._make( + stat_struct.unpack_from(bytes, offset)) + offset += stat_struct.size + elif header.type == CheckVersion.type: + response = True + elif header.type == -1: + err = int_struct.unpack_from(bytes, offset)[0] + offset += int_struct.size + response = EXCEPTIONS[err]() + if response: + results.append(response) + header, offset = MultiHeader.deserialize(bytes, offset) + return results + + @staticmethod + def unchroot(client, response): + resp = [] + for result in response: + if isinstance(result, unicode): + resp.append(client.unchroot(result)) + else: + resp.append(result) + return resp + + class Auth(namedtuple('Auth', 'auth_type scheme auth')): type = 100 @@ -297,20 +355,35 @@ class Auth(namedtuple('Auth', 'auth_type scheme auth')): class Watch(namedtuple('Watch', 'type state path')): @classmethod - def deserialize(cls, buffer, offset): - """Given a buffer and the current buffer offset, return the + def deserialize(cls, bytes, offset): + """Given bytes and the current bytes offset, return the type, state, path, and new offset""" - type, state = int_int_struct.unpack_from(buffer, offset) + type, state = int_int_struct.unpack_from(bytes, offset) offset += int_int_struct.size - path, offset = read_string(buffer, offset) + path, offset = read_string(bytes, offset) return cls(type, state, path), offset class ReplyHeader(namedtuple('ReplyHeader', 'xid, zxid, err')): @classmethod - def deserialize(cls, buffer, offset): - """Given a buffer and the current buffer offset, return a + def deserialize(cls, bytes, offset): + """Given bytes and the current bytes offset, return a :class:`ReplyHeader` instance and the new offset""" new_offset = offset + reply_header_struct.size return cls._make( - reply_header_struct.unpack_from(buffer, offset)), new_offset + reply_header_struct.unpack_from(bytes, offset)), new_offset + + +class MultiHeader(namedtuple('MultiHeader', 'type done err')): + def serialize(self): + b = bytearray() + b.extend(int_struct.pack(self.type)) + b.extend([1 if self.done else 0]) + b.extend(int_struct.pack(self.err)) + return b + + @classmethod + def deserialize(cls, bytes, offset): + t, done, err = multiheader_struct.unpack_from(bytes, offset) + offset += multiheader_struct.size + return cls(t, done is 1, err), offset |
