summaryrefslogtreecommitdiff
path: root/kafka/protocol
diff options
context:
space:
mode:
authorZack Dever <zack.dever@rd.io>2015-12-03 15:17:47 -0800
committerZack Dever <zack.dever@rd.io>2015-12-04 11:25:40 -0800
commit892f5dd9337fdf8aa06eccb37b4087432f7e0c14 (patch)
tree296046ecdbc09c36d8f7d3211328abd11f62c8a5 /kafka/protocol
parent7a6c51bf2e0a926ffe2595f008c68c6b63db2ce7 (diff)
downloadkafka-python-892f5dd9337fdf8aa06eccb37b4087432f7e0c14.tar.gz
group membership api schemas
Diffstat (limited to 'kafka/protocol')
-rw-r--r--kafka/protocol/group.py108
1 files changed, 108 insertions, 0 deletions
diff --git a/kafka/protocol/group.py b/kafka/protocol/group.py
new file mode 100644
index 0000000..3766e48
--- /dev/null
+++ b/kafka/protocol/group.py
@@ -0,0 +1,108 @@
+from .struct import Struct
+from .types import Array, Bytes, Int16, Int32, Schema, String
+
+
+class JoinGroupResponse(Struct):
+ SCHEMA = Schema(
+ ('error_code', Int16),
+ ('generation_id', Int32),
+ ('group_protocol', String('utf-8')),
+ ('leader_id', String('utf-8')),
+ ('member_id', String('utf-8')),
+ ('members', Array(
+ ('member_id', String('utf-8')),
+ ('member_metadata', Bytes)))
+ )
+
+
+class JoinGroupRequest(Struct):
+ API_KEY = 11
+ API_VERSION = 0
+ RESPONSE_TYPE = JoinGroupResponse
+ SCHEMA = Schema(
+ ('group', String('utf-8')),
+ ('session_timeout', Int32),
+ ('member_id', String('utf-8')),
+ ('protocol_type', String('utf-8')),
+ ('group_protocols', Array(
+ ('protocol_name', String('utf-8')),
+ ('protocol_metadata', Bytes)))
+ )
+
+
+class ProtocolName(Struct):
+ SCHEMA = Schema(
+ ('assignment_strategy', String('utf-8'))
+ )
+
+
+class ProtocolMetadata(Struct):
+ SCHEMA = Schema(
+ ('version', Int16),
+ ('subscription', Array(String('utf-8'))), # topics list
+ ('user_data', Bytes)
+ )
+
+
+class SyncGroupResponse(Struct):
+ SCHEMA = Schema(
+ ('error_code', Int16),
+ ('member_assignment', Bytes)
+ )
+
+
+class SyncGroupRequest(Struct):
+ API_KEY = 14
+ API_VERSION = 0
+ RESPONSE_TYPE = SyncGroupResponse
+ SCHEMA = Schema(
+ ('group', String('utf-8')),
+ ('generation_id', Int32),
+ ('member_id', String('utf-8')),
+ ('group_assignment', Array(
+ ('member_id', String('utf-8')),
+ ('member_metadata', Bytes)))
+ )
+
+
+class MemberAssignment(Struct):
+ SCHEMA = Schema(
+ ('version', Int16),
+ ('partition_assignment', Array(
+ ('topic', String('utf-8')),
+ ('partitions', Array(Int32)))),
+ ('user_data', Bytes)
+ )
+
+
+class HeartbeatResponse(Struct):
+ SCHEMA = Schema(
+ ('error_code', Int16)
+ )
+
+
+class HeartbeatRequest(Struct):
+ API_KEY = 12
+ API_VERSION = 0
+ RESPONSE_TYPE = HeartbeatResponse
+ SCHEMA = Schema(
+ ('group', String('utf-8')),
+ ('generation_id', Int32),
+ ('member_id', String('utf-8'))
+ )
+
+
+class LeaveGroupResponse(Struct):
+ SCHEMA = Schema(
+ ('error_code', Int16)
+ )
+
+
+class LeaveGroupRequest(Struct):
+ API_KEY = 13
+ API_VERSION = 0
+ RESPONSE_TYPE = LeaveGroupResponse
+ SCHEMA = Schema(
+ ('group', String('utf-8')),
+ ('member_id', String('utf-8'))
+ )