summaryrefslogtreecommitdiff
path: root/kazoo/protocol/serialization.py
diff options
context:
space:
mode:
Diffstat (limited to 'kazoo/protocol/serialization.py')
-rw-r--r--kazoo/protocol/serialization.py73
1 files changed, 56 insertions, 17 deletions
diff --git a/kazoo/protocol/serialization.py b/kazoo/protocol/serialization.py
index 759e2e6..0503a60 100644
--- a/kazoo/protocol/serialization.py
+++ b/kazoo/protocol/serialization.py
@@ -33,19 +33,67 @@ def write_buffer(bytes):
return int_struct.pack(len(bytes)) + bytes
-class Connect(namedtuple('Connect', 'protocol_version', 'last_zxid_seen',
- 'time_out', 'session_id', 'passwd', 'read_only')):
+def read_buffer(bytes, offset):
+ length = int_struct.unpack_from(bytes, offset)[0]
+ offset += int_struct.size
+ if length < 0:
+ return None, offset
+ else:
+ index = offset
+ offset += length
+ return bytes[index:index + length], offset
+
+
+class Connect(namedtuple('Connect', 'protocol_version last_zxid_seen'
+ ' time_out session_id passwd read_only')):
"""A connection request"""
+ type = None
+
def serialize(self):
- b = int_long_int_long_struct.pack(
+ b = bytearray()
+ b.extend(int_long_int_long_struct.pack(
self.protocol_version, self.last_zxid_seen, self.time_out,
- self.session_id)
- b += write_buffer(self.passwd)
- b += 1 if self.read_only else 0
+ self.session_id))
+ b.extend(write_buffer(self.passwd))
+ b.extend([1 if self.read_only else 0])
return b
- def deserialize(buffer, offset):
- pass
+ @classmethod
+ def deserialize(cls, bytes, offset):
+ proto_version, timeout, session_id = int_int_long_struct.unpack_from(
+ bytes, offset)
+ offset += int_int_long_struct.size
+ password, offset = read_buffer(bytes, offset)
+ return cls(proto_version, 0, timeout, session_id, password, 0), offset
+
+
+class Close(object):
+ __slots__ = ['type']
+ type = -11
+
+ @classmethod
+ def serialize(cls):
+ return ''
+
+
+class Ping(object):
+ __slots__ = ['type']
+ type = 11
+
+ @classmethod
+ def serialize(cls):
+ return ''
+
+
+class Watch(namedtuple('Watch', 'type state path')):
+ @classmethod
+ def deserialize(cls, buffer, offset):
+ """Given a buffer and the current buffer offset, return the type,
+ state, path, and new offset"""
+ type, state = int_int_struct.unpack_from(buffer, offset)
+ offset += int_int_struct.size
+ path, offset = read_string(buffer, offset)
+ return cls(type, state, path), offset
def deserialize_reply_header(buffer, offset):
@@ -54,12 +102,3 @@ def deserialize_reply_header(buffer, offset):
new_offset = offset + reply_header_struct.size
return ReplyHeader._make(
reply_header_struct.unpack_from(buffer, offset)), new_offset
-
-
-def deserialize_watcher_event(buffer, offset):
- """Given a buffer and the current buffer offset, return the type,
- state, path, and new offset"""
- type, state = int_int_struct.unpack_from(buffer, offset)
- offset += int_int_struct.size
- path, offset = read_string(buffer, offset)
- return type, state, path, offset