1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
|
from __future__ import absolute_import
import abc
from kafka.protocol.struct import Struct
from kafka.protocol.types import Int16, Int32, String, Schema, Array, TaggedFields
class RequestHeader(Struct):
SCHEMA = Schema(
('api_key', Int16),
('api_version', Int16),
('correlation_id', Int32),
('client_id', String('utf-8'))
)
def __init__(self, request, correlation_id=0, client_id='kafka-python'):
super(RequestHeader, self).__init__(
request.API_KEY, request.API_VERSION, correlation_id, client_id
)
class RequestHeaderV2(Struct):
# Flexible response / request headers end in field buffer
SCHEMA = Schema(
('api_key', Int16),
('api_version', Int16),
('correlation_id', Int32),
('client_id', String('utf-8')),
('tags', TaggedFields),
)
def __init__(self, request, correlation_id=0, client_id='kafka-python', tags=None):
super(RequestHeaderV2, self).__init__(
request.API_KEY, request.API_VERSION, correlation_id, client_id, tags or {}
)
class ResponseHeader(Struct):
SCHEMA = Schema(
('correlation_id', Int32),
)
class ResponseHeaderV2(Struct):
SCHEMA = Schema(
('correlation_id', Int32),
('tags', TaggedFields),
)
class Request(Struct):
__metaclass__ = abc.ABCMeta
FLEXIBLE_VERSION = False
@abc.abstractproperty
def API_KEY(self):
"""Integer identifier for api request"""
pass
@abc.abstractproperty
def API_VERSION(self):
"""Integer of api request version"""
pass
@abc.abstractproperty
def SCHEMA(self):
"""An instance of Schema() representing the request structure"""
pass
@abc.abstractproperty
def RESPONSE_TYPE(self):
"""The Response class associated with the api request"""
pass
def expect_response(self):
"""Override this method if an api request does not always generate a response"""
return True
def to_object(self):
return _to_object(self.SCHEMA, self)
def build_request_header(self, correlation_id, client_id):
if self.FLEXIBLE_VERSION:
return RequestHeaderV2(self, correlation_id=correlation_id, client_id=client_id)
return RequestHeader(self, correlation_id=correlation_id, client_id=client_id)
def parse_response_header(self, read_buffer):
if self.FLEXIBLE_VERSION:
return ResponseHeaderV2.decode(read_buffer)
return ResponseHeader.decode(read_buffer)
class Response(Struct):
__metaclass__ = abc.ABCMeta
@abc.abstractproperty
def API_KEY(self):
"""Integer identifier for api request/response"""
pass
@abc.abstractproperty
def API_VERSION(self):
"""Integer of api request/response version"""
pass
@abc.abstractproperty
def SCHEMA(self):
"""An instance of Schema() representing the response structure"""
pass
def to_object(self):
return _to_object(self.SCHEMA, self)
def _to_object(schema, data):
obj = {}
for idx, (name, _type) in enumerate(zip(schema.names, schema.fields)):
if isinstance(data, Struct):
val = data.get_item(name)
else:
val = data[idx]
if isinstance(_type, Schema):
obj[name] = _to_object(_type, val)
elif isinstance(_type, Array):
if isinstance(_type.array_of, (Array, Schema)):
obj[name] = [
_to_object(_type.array_of, x)
for x in val
]
else:
obj[name] = val
else:
obj[name] = val
return obj
|