summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTyler Lubeck <tyler@coffeemeetsbagel.com>2020-02-06 12:27:09 -0800
committerGitHub <noreply@github.com>2020-02-06 12:27:09 -0800
commit209515bf9dcdd9e03bc286035641af3ae72fcbf9 (patch)
tree647c19c40da7ff78fdcc485fd790cee82838940d
parent3d98741be0e9608a352221b476cf3aa2d86777be (diff)
downloadkafka-python-209515bf9dcdd9e03bc286035641af3ae72fcbf9.tar.gz
Implement methods to convert a Struct object to a pythonic object (#1951)
Implement methods to convert a Struct object to a pythonic object
-rw-r--r--kafka/protocol/api.py32
-rw-r--r--kafka/protocol/struct.py6
-rw-r--r--test/test_object_conversion.py236
3 files changed, 273 insertions, 1 deletions
diff --git a/kafka/protocol/api.py b/kafka/protocol/api.py
index efaf63e..64276fc 100644
--- a/kafka/protocol/api.py
+++ b/kafka/protocol/api.py
@@ -3,7 +3,7 @@ from __future__ import absolute_import
import abc
from kafka.protocol.struct import Struct
-from kafka.protocol.types import Int16, Int32, String, Schema
+from kafka.protocol.types import Int16, Int32, String, Schema, Array
class RequestHeader(Struct):
@@ -47,6 +47,9 @@ class Request(Struct):
"""Override this method if an api request does not always generate a response"""
return True
+ def to_object(self):
+ return _to_object(self.SCHEMA, self)
+
class Response(Struct):
__metaclass__ = abc.ABCMeta
@@ -65,3 +68,30 @@ class Response(Struct):
def SCHEMA(self):
"""An instance of Schema() representing the response structure"""
pass
+
+ def to_object(self):
+ return _to_object(self.SCHEMA, self)
+
+
+def _to_object(schema, data):
+ obj = {}
+ for idx, (name, _type) in enumerate(zip(schema.names, schema.fields)):
+ if isinstance(data, Struct):
+ val = data.get_item(name)
+ else:
+ val = data[idx]
+
+ if isinstance(_type, Schema):
+ obj[name] = _to_object(_type, val)
+ elif isinstance(_type, Array):
+ if isinstance(_type.array_of, (Array, Schema)):
+ obj[name] = [
+ _to_object(_type.array_of, x)
+ for x in val
+ ]
+ else:
+ obj[name] = val
+ else:
+ obj[name] = val
+
+ return obj
diff --git a/kafka/protocol/struct.py b/kafka/protocol/struct.py
index 693e2a2..e9da6e6 100644
--- a/kafka/protocol/struct.py
+++ b/kafka/protocol/struct.py
@@ -30,6 +30,7 @@ class Struct(AbstractType):
# causes instances to "leak" to garbage
self.encode = WeakMethod(self._encode_self)
+
@classmethod
def encode(cls, item): # pylint: disable=E0202
bits = []
@@ -48,6 +49,11 @@ class Struct(AbstractType):
data = BytesIO(data)
return cls(*[field.decode(data) for field in cls.SCHEMA.fields])
+ def get_item(self, name):
+ if name not in self.SCHEMA.names:
+ raise KeyError("%s is not in the schema" % name)
+ return self.__dict__[name]
+
def __repr__(self):
key_vals = []
for name, field in zip(self.SCHEMA.names, self.SCHEMA.fields):
diff --git a/test/test_object_conversion.py b/test/test_object_conversion.py
new file mode 100644
index 0000000..9b1ff21
--- /dev/null
+++ b/test/test_object_conversion.py
@@ -0,0 +1,236 @@
+from kafka.protocol.admin import Request
+from kafka.protocol.admin import Response
+from kafka.protocol.types import Schema
+from kafka.protocol.types import Array
+from kafka.protocol.types import Int16
+from kafka.protocol.types import String
+
+import pytest
+
+@pytest.mark.parametrize('superclass', (Request, Response))
+class TestObjectConversion:
+ def test_get_item(self, superclass):
+ class TestClass(superclass):
+ API_KEY = 0
+ API_VERSION = 0
+ RESPONSE_TYPE = None # To satisfy the Request ABC
+ SCHEMA = Schema(
+ ('myobject', Int16))
+
+ tc = TestClass(myobject=0)
+ assert tc.get_item('myobject') == 0
+ with pytest.raises(KeyError):
+ tc.get_item('does-not-exist')
+
+ def test_with_empty_schema(self, superclass):
+ class TestClass(superclass):
+ API_KEY = 0
+ API_VERSION = 0
+ RESPONSE_TYPE = None # To satisfy the Request ABC
+ SCHEMA = Schema()
+
+ tc = TestClass()
+ tc.encode()
+ assert tc.to_object() == {}
+
+ def test_with_basic_schema(self, superclass):
+ class TestClass(superclass):
+ API_KEY = 0
+ API_VERSION = 0
+ RESPONSE_TYPE = None # To satisfy the Request ABC
+ SCHEMA = Schema(
+ ('myobject', Int16))
+
+ tc = TestClass(myobject=0)
+ tc.encode()
+ assert tc.to_object() == {'myobject': 0}
+
+ def test_with_basic_array_schema(self, superclass):
+ class TestClass(superclass):
+ API_KEY = 0
+ API_VERSION = 0
+ RESPONSE_TYPE = None # To satisfy the Request ABC
+ SCHEMA = Schema(
+ ('myarray', Array(Int16)))
+
+ tc = TestClass(myarray=[1,2,3])
+ tc.encode()
+ assert tc.to_object()['myarray'] == [1, 2, 3]
+
+ def test_with_complex_array_schema(self, superclass):
+ class TestClass(superclass):
+ API_KEY = 0
+ API_VERSION = 0
+ RESPONSE_TYPE = None # To satisfy the Request ABC
+ SCHEMA = Schema(
+ ('myarray', Array(
+ ('subobject', Int16),
+ ('othersubobject', String('utf-8')))))
+
+ tc = TestClass(
+ myarray=[[10, 'hello']]
+ )
+ tc.encode()
+ obj = tc.to_object()
+ assert len(obj['myarray']) == 1
+ assert obj['myarray'][0]['subobject'] == 10
+ assert obj['myarray'][0]['othersubobject'] == 'hello'
+
+ def test_with_array_and_other(self, superclass):
+ class TestClass(superclass):
+ API_KEY = 0
+ API_VERSION = 0
+ RESPONSE_TYPE = None # To satisfy the Request ABC
+ SCHEMA = Schema(
+ ('myarray', Array(
+ ('subobject', Int16),
+ ('othersubobject', String('utf-8')))),
+ ('notarray', Int16))
+
+ tc = TestClass(
+ myarray=[[10, 'hello']],
+ notarray=42
+ )
+
+ obj = tc.to_object()
+ assert len(obj['myarray']) == 1
+ assert obj['myarray'][0]['subobject'] == 10
+ assert obj['myarray'][0]['othersubobject'] == 'hello'
+ assert obj['notarray'] == 42
+
+ def test_with_nested_array(self, superclass):
+ class TestClass(superclass):
+ API_KEY = 0
+ API_VERSION = 0
+ RESPONSE_TYPE = None # To satisfy the Request ABC
+ SCHEMA = Schema(
+ ('myarray', Array(
+ ('subarray', Array(Int16)),
+ ('otherobject', Int16))))
+
+ tc = TestClass(
+ myarray=[
+ [[1, 2], 2],
+ [[2, 3], 4],
+ ]
+ )
+ print(tc.encode())
+
+
+ obj = tc.to_object()
+ assert len(obj['myarray']) == 2
+ assert obj['myarray'][0]['subarray'] == [1, 2]
+ assert obj['myarray'][0]['otherobject'] == 2
+ assert obj['myarray'][1]['subarray'] == [2, 3]
+ assert obj['myarray'][1]['otherobject'] == 4
+
+ def test_with_complex_nested_array(self, superclass):
+ class TestClass(superclass):
+ API_KEY = 0
+ API_VERSION = 0
+ RESPONSE_TYPE = None # To satisfy the Request ABC
+ SCHEMA = Schema(
+ ('myarray', Array(
+ ('subarray', Array(
+ ('innertest', String('utf-8')),
+ ('otherinnertest', String('utf-8')))),
+ ('othersubarray', Array(Int16)))),
+ ('notarray', String('utf-8')))
+
+ tc = TestClass(
+ myarray=[
+ [[['hello', 'hello'], ['hello again', 'hello again']], [0]],
+ [[['hello', 'hello again']], [1]],
+ ],
+ notarray='notarray'
+ )
+ tc.encode()
+
+ obj = tc.to_object()
+
+ assert obj['notarray'] == 'notarray'
+ myarray = obj['myarray']
+ assert len(myarray) == 2
+
+ assert myarray[0]['othersubarray'] == [0]
+ assert len(myarray[0]['subarray']) == 2
+ assert myarray[0]['subarray'][0]['innertest'] == 'hello'
+ assert myarray[0]['subarray'][0]['otherinnertest'] == 'hello'
+ assert myarray[0]['subarray'][1]['innertest'] == 'hello again'
+ assert myarray[0]['subarray'][1]['otherinnertest'] == 'hello again'
+
+ assert myarray[1]['othersubarray'] == [1]
+ assert len(myarray[1]['subarray']) == 1
+ assert myarray[1]['subarray'][0]['innertest'] == 'hello'
+ assert myarray[1]['subarray'][0]['otherinnertest'] == 'hello again'
+
+def test_with_metadata_response():
+ from kafka.protocol.metadata import MetadataResponse_v5
+ tc = MetadataResponse_v5(
+ throttle_time_ms=0,
+ brokers=[
+ [0, 'testhost0', 9092, 'testrack0'],
+ [1, 'testhost1', 9092, 'testrack1'],
+ ],
+ cluster_id='abcd',
+ controller_id=0,
+ topics=[
+ [0, 'testtopic1', False, [
+ [0, 0, 0, [0, 1], [0, 1], []],
+ [0, 1, 1, [1, 0], [1, 0], []],
+ ],
+ ], [0, 'other-test-topic', True, [
+ [0, 0, 0, [0, 1], [0, 1], []],
+ ]
+ ]]
+ )
+ tc.encode() # Make sure this object encodes successfully
+
+
+ obj = tc.to_object()
+
+ assert obj['throttle_time_ms'] == 0
+
+ assert len(obj['brokers']) == 2
+ assert obj['brokers'][0]['node_id'] == 0
+ assert obj['brokers'][0]['host'] == 'testhost0'
+ assert obj['brokers'][0]['port'] == 9092
+ assert obj['brokers'][0]['rack'] == 'testrack0'
+ assert obj['brokers'][1]['node_id'] == 1
+ assert obj['brokers'][1]['host'] == 'testhost1'
+ assert obj['brokers'][1]['port'] == 9092
+ assert obj['brokers'][1]['rack'] == 'testrack1'
+
+ assert obj['cluster_id'] == 'abcd'
+ assert obj['controller_id'] == 0
+
+ assert len(obj['topics']) == 2
+ assert obj['topics'][0]['error_code'] == 0
+ assert obj['topics'][0]['topic'] == 'testtopic1'
+ assert obj['topics'][0]['is_internal'] == False
+ assert len(obj['topics'][0]['partitions']) == 2
+ assert obj['topics'][0]['partitions'][0]['error_code'] == 0
+ assert obj['topics'][0]['partitions'][0]['partition'] == 0
+ assert obj['topics'][0]['partitions'][0]['leader'] == 0
+ assert obj['topics'][0]['partitions'][0]['replicas'] == [0, 1]
+ assert obj['topics'][0]['partitions'][0]['isr'] == [0, 1]
+ assert obj['topics'][0]['partitions'][0]['offline_replicas'] == []
+ assert obj['topics'][0]['partitions'][1]['error_code'] == 0
+ assert obj['topics'][0]['partitions'][1]['partition'] == 1
+ assert obj['topics'][0]['partitions'][1]['leader'] == 1
+ assert obj['topics'][0]['partitions'][1]['replicas'] == [1, 0]
+ assert obj['topics'][0]['partitions'][1]['isr'] == [1, 0]
+ assert obj['topics'][0]['partitions'][1]['offline_replicas'] == []
+
+ assert obj['topics'][1]['error_code'] == 0
+ assert obj['topics'][1]['topic'] == 'other-test-topic'
+ assert obj['topics'][1]['is_internal'] == True
+ assert len(obj['topics'][1]['partitions']) == 1
+ assert obj['topics'][1]['partitions'][0]['error_code'] == 0
+ assert obj['topics'][1]['partitions'][0]['partition'] == 0
+ assert obj['topics'][1]['partitions'][0]['leader'] == 0
+ assert obj['topics'][1]['partitions'][0]['replicas'] == [0, 1]
+ assert obj['topics'][1]['partitions'][0]['isr'] == [0, 1]
+ assert obj['topics'][1]['partitions'][0]['offline_replicas'] == []
+
+ tc.encode()