summaryrefslogtreecommitdiff
path: root/kafka/protocol/api.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/protocol/api.py')
-rw-r--r--kafka/protocol/api.py32
1 files changed, 31 insertions, 1 deletions
diff --git a/kafka/protocol/api.py b/kafka/protocol/api.py
index efaf63e..64276fc 100644
--- a/kafka/protocol/api.py
+++ b/kafka/protocol/api.py
@@ -3,7 +3,7 @@ from __future__ import absolute_import
import abc
from kafka.protocol.struct import Struct
-from kafka.protocol.types import Int16, Int32, String, Schema
+from kafka.protocol.types import Int16, Int32, String, Schema, Array
class RequestHeader(Struct):
@@ -47,6 +47,9 @@ class Request(Struct):
"""Override this method if an api request does not always generate a response"""
return True
+ def to_object(self):
+ return _to_object(self.SCHEMA, self)
+
class Response(Struct):
__metaclass__ = abc.ABCMeta
@@ -65,3 +68,30 @@ class Response(Struct):
def SCHEMA(self):
"""An instance of Schema() representing the response structure"""
pass
+
+ def to_object(self):
+ return _to_object(self.SCHEMA, self)
+
+
+def _to_object(schema, data):
+ obj = {}
+ for idx, (name, _type) in enumerate(zip(schema.names, schema.fields)):
+ if isinstance(data, Struct):
+ val = data.get_item(name)
+ else:
+ val = data[idx]
+
+ if isinstance(_type, Schema):
+ obj[name] = _to_object(_type, val)
+ elif isinstance(_type, Array):
+ if isinstance(_type.array_of, (Array, Schema)):
+ obj[name] = [
+ _to_object(_type.array_of, x)
+ for x in val
+ ]
+ else:
+ obj[name] = val
+ else:
+ obj[name] = val
+
+ return obj