diff options
author | Tyler Lubeck <tyler@coffeemeetsbagel.com> | 2020-02-06 12:27:09 -0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-02-06 12:27:09 -0800 |
commit | 209515bf9dcdd9e03bc286035641af3ae72fcbf9 (patch) | |
tree | 647c19c40da7ff78fdcc485fd790cee82838940d | |
parent | 3d98741be0e9608a352221b476cf3aa2d86777be (diff) | |
download | kafka-python-209515bf9dcdd9e03bc286035641af3ae72fcbf9.tar.gz |
Implement methods to convert a Struct object to a pythonic object (#1951)
Implement methods to convert a Struct object to a pythonic object
-rw-r--r-- | kafka/protocol/api.py | 32 | ||||
-rw-r--r-- | kafka/protocol/struct.py | 6 | ||||
-rw-r--r-- | test/test_object_conversion.py | 236 |
3 files changed, 273 insertions, 1 deletions
diff --git a/kafka/protocol/api.py b/kafka/protocol/api.py index efaf63e..64276fc 100644 --- a/kafka/protocol/api.py +++ b/kafka/protocol/api.py @@ -3,7 +3,7 @@ from __future__ import absolute_import import abc from kafka.protocol.struct import Struct -from kafka.protocol.types import Int16, Int32, String, Schema +from kafka.protocol.types import Int16, Int32, String, Schema, Array class RequestHeader(Struct): @@ -47,6 +47,9 @@ class Request(Struct): """Override this method if an api request does not always generate a response""" return True + def to_object(self): + return _to_object(self.SCHEMA, self) + class Response(Struct): __metaclass__ = abc.ABCMeta @@ -65,3 +68,30 @@ class Response(Struct): def SCHEMA(self): """An instance of Schema() representing the response structure""" pass + + def to_object(self): + return _to_object(self.SCHEMA, self) + + +def _to_object(schema, data): + obj = {} + for idx, (name, _type) in enumerate(zip(schema.names, schema.fields)): + if isinstance(data, Struct): + val = data.get_item(name) + else: + val = data[idx] + + if isinstance(_type, Schema): + obj[name] = _to_object(_type, val) + elif isinstance(_type, Array): + if isinstance(_type.array_of, (Array, Schema)): + obj[name] = [ + _to_object(_type.array_of, x) + for x in val + ] + else: + obj[name] = val + else: + obj[name] = val + + return obj diff --git a/kafka/protocol/struct.py b/kafka/protocol/struct.py index 693e2a2..e9da6e6 100644 --- a/kafka/protocol/struct.py +++ b/kafka/protocol/struct.py @@ -30,6 +30,7 @@ class Struct(AbstractType): # causes instances to "leak" to garbage self.encode = WeakMethod(self._encode_self) + @classmethod def encode(cls, item): # pylint: disable=E0202 bits = [] @@ -48,6 +49,11 @@ class Struct(AbstractType): data = BytesIO(data) return cls(*[field.decode(data) for field in cls.SCHEMA.fields]) + def get_item(self, name): + if name not in self.SCHEMA.names: + raise KeyError("%s is not in the schema" % name) + return self.__dict__[name] + def __repr__(self): key_vals = [] for name, field in zip(self.SCHEMA.names, self.SCHEMA.fields): diff --git a/test/test_object_conversion.py b/test/test_object_conversion.py new file mode 100644 index 0000000..9b1ff21 --- /dev/null +++ b/test/test_object_conversion.py @@ -0,0 +1,236 @@ +from kafka.protocol.admin import Request +from kafka.protocol.admin import Response +from kafka.protocol.types import Schema +from kafka.protocol.types import Array +from kafka.protocol.types import Int16 +from kafka.protocol.types import String + +import pytest + +@pytest.mark.parametrize('superclass', (Request, Response)) +class TestObjectConversion: + def test_get_item(self, superclass): + class TestClass(superclass): + API_KEY = 0 + API_VERSION = 0 + RESPONSE_TYPE = None # To satisfy the Request ABC + SCHEMA = Schema( + ('myobject', Int16)) + + tc = TestClass(myobject=0) + assert tc.get_item('myobject') == 0 + with pytest.raises(KeyError): + tc.get_item('does-not-exist') + + def test_with_empty_schema(self, superclass): + class TestClass(superclass): + API_KEY = 0 + API_VERSION = 0 + RESPONSE_TYPE = None # To satisfy the Request ABC + SCHEMA = Schema() + + tc = TestClass() + tc.encode() + assert tc.to_object() == {} + + def test_with_basic_schema(self, superclass): + class TestClass(superclass): + API_KEY = 0 + API_VERSION = 0 + RESPONSE_TYPE = None # To satisfy the Request ABC + SCHEMA = Schema( + ('myobject', Int16)) + + tc = TestClass(myobject=0) + tc.encode() + assert tc.to_object() == {'myobject': 0} + + def test_with_basic_array_schema(self, superclass): + class TestClass(superclass): + API_KEY = 0 + API_VERSION = 0 + RESPONSE_TYPE = None # To satisfy the Request ABC + SCHEMA = Schema( + ('myarray', Array(Int16))) + + tc = TestClass(myarray=[1,2,3]) + tc.encode() + assert tc.to_object()['myarray'] == [1, 2, 3] + + def test_with_complex_array_schema(self, superclass): + class TestClass(superclass): + API_KEY = 0 + API_VERSION = 0 + RESPONSE_TYPE = None # To satisfy the Request ABC + SCHEMA = Schema( + ('myarray', Array( + ('subobject', Int16), + ('othersubobject', String('utf-8'))))) + + tc = TestClass( + myarray=[[10, 'hello']] + ) + tc.encode() + obj = tc.to_object() + assert len(obj['myarray']) == 1 + assert obj['myarray'][0]['subobject'] == 10 + assert obj['myarray'][0]['othersubobject'] == 'hello' + + def test_with_array_and_other(self, superclass): + class TestClass(superclass): + API_KEY = 0 + API_VERSION = 0 + RESPONSE_TYPE = None # To satisfy the Request ABC + SCHEMA = Schema( + ('myarray', Array( + ('subobject', Int16), + ('othersubobject', String('utf-8')))), + ('notarray', Int16)) + + tc = TestClass( + myarray=[[10, 'hello']], + notarray=42 + ) + + obj = tc.to_object() + assert len(obj['myarray']) == 1 + assert obj['myarray'][0]['subobject'] == 10 + assert obj['myarray'][0]['othersubobject'] == 'hello' + assert obj['notarray'] == 42 + + def test_with_nested_array(self, superclass): + class TestClass(superclass): + API_KEY = 0 + API_VERSION = 0 + RESPONSE_TYPE = None # To satisfy the Request ABC + SCHEMA = Schema( + ('myarray', Array( + ('subarray', Array(Int16)), + ('otherobject', Int16)))) + + tc = TestClass( + myarray=[ + [[1, 2], 2], + [[2, 3], 4], + ] + ) + print(tc.encode()) + + + obj = tc.to_object() + assert len(obj['myarray']) == 2 + assert obj['myarray'][0]['subarray'] == [1, 2] + assert obj['myarray'][0]['otherobject'] == 2 + assert obj['myarray'][1]['subarray'] == [2, 3] + assert obj['myarray'][1]['otherobject'] == 4 + + def test_with_complex_nested_array(self, superclass): + class TestClass(superclass): + API_KEY = 0 + API_VERSION = 0 + RESPONSE_TYPE = None # To satisfy the Request ABC + SCHEMA = Schema( + ('myarray', Array( + ('subarray', Array( + ('innertest', String('utf-8')), + ('otherinnertest', String('utf-8')))), + ('othersubarray', Array(Int16)))), + ('notarray', String('utf-8'))) + + tc = TestClass( + myarray=[ + [[['hello', 'hello'], ['hello again', 'hello again']], [0]], + [[['hello', 'hello again']], [1]], + ], + notarray='notarray' + ) + tc.encode() + + obj = tc.to_object() + + assert obj['notarray'] == 'notarray' + myarray = obj['myarray'] + assert len(myarray) == 2 + + assert myarray[0]['othersubarray'] == [0] + assert len(myarray[0]['subarray']) == 2 + assert myarray[0]['subarray'][0]['innertest'] == 'hello' + assert myarray[0]['subarray'][0]['otherinnertest'] == 'hello' + assert myarray[0]['subarray'][1]['innertest'] == 'hello again' + assert myarray[0]['subarray'][1]['otherinnertest'] == 'hello again' + + assert myarray[1]['othersubarray'] == [1] + assert len(myarray[1]['subarray']) == 1 + assert myarray[1]['subarray'][0]['innertest'] == 'hello' + assert myarray[1]['subarray'][0]['otherinnertest'] == 'hello again' + +def test_with_metadata_response(): + from kafka.protocol.metadata import MetadataResponse_v5 + tc = MetadataResponse_v5( + throttle_time_ms=0, + brokers=[ + [0, 'testhost0', 9092, 'testrack0'], + [1, 'testhost1', 9092, 'testrack1'], + ], + cluster_id='abcd', + controller_id=0, + topics=[ + [0, 'testtopic1', False, [ + [0, 0, 0, [0, 1], [0, 1], []], + [0, 1, 1, [1, 0], [1, 0], []], + ], + ], [0, 'other-test-topic', True, [ + [0, 0, 0, [0, 1], [0, 1], []], + ] + ]] + ) + tc.encode() # Make sure this object encodes successfully + + + obj = tc.to_object() + + assert obj['throttle_time_ms'] == 0 + + assert len(obj['brokers']) == 2 + assert obj['brokers'][0]['node_id'] == 0 + assert obj['brokers'][0]['host'] == 'testhost0' + assert obj['brokers'][0]['port'] == 9092 + assert obj['brokers'][0]['rack'] == 'testrack0' + assert obj['brokers'][1]['node_id'] == 1 + assert obj['brokers'][1]['host'] == 'testhost1' + assert obj['brokers'][1]['port'] == 9092 + assert obj['brokers'][1]['rack'] == 'testrack1' + + assert obj['cluster_id'] == 'abcd' + assert obj['controller_id'] == 0 + + assert len(obj['topics']) == 2 + assert obj['topics'][0]['error_code'] == 0 + assert obj['topics'][0]['topic'] == 'testtopic1' + assert obj['topics'][0]['is_internal'] == False + assert len(obj['topics'][0]['partitions']) == 2 + assert obj['topics'][0]['partitions'][0]['error_code'] == 0 + assert obj['topics'][0]['partitions'][0]['partition'] == 0 + assert obj['topics'][0]['partitions'][0]['leader'] == 0 + assert obj['topics'][0]['partitions'][0]['replicas'] == [0, 1] + assert obj['topics'][0]['partitions'][0]['isr'] == [0, 1] + assert obj['topics'][0]['partitions'][0]['offline_replicas'] == [] + assert obj['topics'][0]['partitions'][1]['error_code'] == 0 + assert obj['topics'][0]['partitions'][1]['partition'] == 1 + assert obj['topics'][0]['partitions'][1]['leader'] == 1 + assert obj['topics'][0]['partitions'][1]['replicas'] == [1, 0] + assert obj['topics'][0]['partitions'][1]['isr'] == [1, 0] + assert obj['topics'][0]['partitions'][1]['offline_replicas'] == [] + + assert obj['topics'][1]['error_code'] == 0 + assert obj['topics'][1]['topic'] == 'other-test-topic' + assert obj['topics'][1]['is_internal'] == True + assert len(obj['topics'][1]['partitions']) == 1 + assert obj['topics'][1]['partitions'][0]['error_code'] == 0 + assert obj['topics'][1]['partitions'][0]['partition'] == 0 + assert obj['topics'][1]['partitions'][0]['leader'] == 0 + assert obj['topics'][1]['partitions'][0]['replicas'] == [0, 1] + assert obj['topics'][1]['partitions'][0]['isr'] == [0, 1] + assert obj['topics'][1]['partitions'][0]['offline_replicas'] == [] + + tc.encode() |