summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTincu Gabriel <gabi@aiven.io>2020-12-02 15:45:13 +0100
committerGitHub <noreply@github.com>2020-12-02 06:45:13 -0800
commit9feeb79140ed10e3a7f2036491fc07573740c231 (patch)
treefd65f63a46e43ae261008a87081c460d76810f4d
parentc48817e0d21d7752077e28f2ea9a657b9001a14b (diff)
downloadkafka-python-9feeb79140ed10e3a7f2036491fc07573740c231.tar.gz
Core Protocol: Add support for flexible versions (#2151)
- Add support for new request and response headers, supporting flexible versions / tagged fields - Add List / Alter partition reassignments APIs - Add support for varints - Add support for compact collections (byte array, string, array)
-rw-r--r--kafka/protocol/__init__.py2
-rw-r--r--kafka/protocol/admin.py91
-rw-r--r--kafka/protocol/api.py43
-rw-r--r--kafka/protocol/parser.py21
-rw-r--r--kafka/protocol/types.py153
-rw-r--r--test/test_protocol.py54
6 files changed, 347 insertions, 17 deletions
diff --git a/kafka/protocol/__init__.py b/kafka/protocol/__init__.py
index e739b5c..025447f 100644
--- a/kafka/protocol/__init__.py
+++ b/kafka/protocol/__init__.py
@@ -43,5 +43,7 @@ API_KEYS = {
40: 'ExpireDelegationToken',
41: 'DescribeDelegationToken',
42: 'DeleteGroups',
+ 45: 'AlterPartitionReassignments',
+ 46: 'ListPartitionReassignments',
48: 'DescribeClientQuotas',
}
diff --git a/kafka/protocol/admin.py b/kafka/protocol/admin.py
index 63a3327..f9d61e5 100644
--- a/kafka/protocol/admin.py
+++ b/kafka/protocol/admin.py
@@ -1,7 +1,7 @@
from __future__ import absolute_import
from kafka.protocol.api import Request, Response
-from kafka.protocol.types import Array, Boolean, Bytes, Int8, Int16, Int32, Int64, Schema, String, Float64
+from kafka.protocol.types import Array, Boolean, Bytes, Int8, Int16, Int32, Int64, Schema, String, Float64, CompactString, CompactArray, TaggedFields
class ApiVersionResponse_v0(Response):
@@ -963,3 +963,92 @@ DescribeClientQuotasRequest = [
DescribeClientQuotasResponse = [
DescribeClientQuotasResponse_v0,
]
+
+
+class AlterPartitionReassignmentsResponse_v0(Response):
+ API_KEY = 45
+ API_VERSION = 0
+ SCHEMA = Schema(
+ ("throttle_time_ms", Int32),
+ ("error_code", Int16),
+ ("error_message", CompactString("utf-8")),
+ ("responses", CompactArray(
+ ("name", CompactString("utf-8")),
+ ("partitions", CompactArray(
+ ("partition_index", Int32),
+ ("error_code", Int16),
+ ("error_message", CompactString("utf-8")),
+ ("tags", TaggedFields)
+ )),
+ ("tags", TaggedFields)
+ )),
+ ("tags", TaggedFields)
+ )
+
+
+class AlterPartitionReassignmentsRequest_v0(Request):
+ FLEXIBLE_VERSION = True
+ API_KEY = 45
+ API_VERSION = 0
+ RESPONSE_TYPE = AlterPartitionReassignmentsResponse_v0
+ SCHEMA = Schema(
+ ("timeout_ms", Int32),
+ ("topics", CompactArray(
+ ("name", CompactString("utf-8")),
+ ("partitions", CompactArray(
+ ("partition_index", Int32),
+ ("replicas", CompactArray(Int32)),
+ ("tags", TaggedFields)
+ )),
+ ("tags", TaggedFields)
+ )),
+ ("tags", TaggedFields)
+ )
+
+
+AlterPartitionReassignmentsRequest = [AlterPartitionReassignmentsRequest_v0]
+
+AlterPartitionReassignmentsResponse = [AlterPartitionReassignmentsResponse_v0]
+
+
+class ListPartitionReassignmentsResponse_v0(Response):
+ API_KEY = 46
+ API_VERSION = 0
+ SCHEMA = Schema(
+ ("throttle_time_ms", Int32),
+ ("error_code", Int16),
+ ("error_message", CompactString("utf-8")),
+ ("topics", CompactArray(
+ ("name", CompactString("utf-8")),
+ ("partitions", CompactArray(
+ ("partition_index", Int32),
+ ("replicas", CompactArray(Int32)),
+ ("adding_replicas", CompactArray(Int32)),
+ ("removing_replicas", CompactArray(Int32)),
+ ("tags", TaggedFields)
+ )),
+ ("tags", TaggedFields)
+ )),
+ ("tags", TaggedFields)
+ )
+
+
+class ListPartitionReassignmentsRequest_v0(Request):
+ FLEXIBLE_VERSION = True
+ API_KEY = 46
+ API_VERSION = 0
+ RESPONSE_TYPE = ListPartitionReassignmentsResponse_v0
+ SCHEMA = Schema(
+ ("timeout_ms", Int32),
+ ("topics", CompactArray(
+ ("name", CompactString("utf-8")),
+ ("partition_index", CompactArray(Int32)),
+ ("tags", TaggedFields)
+ )),
+ ("tags", TaggedFields)
+ )
+
+
+ListPartitionReassignmentsRequest = [ListPartitionReassignmentsRequest_v0]
+
+ListPartitionReassignmentsResponse = [ListPartitionReassignmentsResponse_v0]
diff --git a/kafka/protocol/api.py b/kafka/protocol/api.py
index 64276fc..f12cb97 100644
--- a/kafka/protocol/api.py
+++ b/kafka/protocol/api.py
@@ -3,7 +3,7 @@ from __future__ import absolute_import
import abc
from kafka.protocol.struct import Struct
-from kafka.protocol.types import Int16, Int32, String, Schema, Array
+from kafka.protocol.types import Int16, Int32, String, Schema, Array, TaggedFields
class RequestHeader(Struct):
@@ -20,9 +20,40 @@ class RequestHeader(Struct):
)
+class RequestHeaderV2(Struct):
+ # Flexible response / request headers end in field buffer
+ SCHEMA = Schema(
+ ('api_key', Int16),
+ ('api_version', Int16),
+ ('correlation_id', Int32),
+ ('client_id', String('utf-8')),
+ ('tags', TaggedFields),
+ )
+
+ def __init__(self, request, correlation_id=0, client_id='kafka-python', tags=None):
+ super(RequestHeaderV2, self).__init__(
+ request.API_KEY, request.API_VERSION, correlation_id, client_id, tags or {}
+ )
+
+
+class ResponseHeader(Struct):
+ SCHEMA = Schema(
+ ('correlation_id', Int32),
+ )
+
+
+class ResponseHeaderV2(Struct):
+ SCHEMA = Schema(
+ ('correlation_id', Int32),
+ ('tags', TaggedFields),
+ )
+
+
class Request(Struct):
__metaclass__ = abc.ABCMeta
+ FLEXIBLE_VERSION = False
+
@abc.abstractproperty
def API_KEY(self):
"""Integer identifier for api request"""
@@ -50,6 +81,16 @@ class Request(Struct):
def to_object(self):
return _to_object(self.SCHEMA, self)
+ def build_request_header(self, correlation_id, client_id):
+ if self.FLEXIBLE_VERSION:
+ return RequestHeaderV2(self, correlation_id=correlation_id, client_id=client_id)
+ return RequestHeader(self, correlation_id=correlation_id, client_id=client_id)
+
+ def parse_response_header(self, read_buffer):
+ if self.FLEXIBLE_VERSION:
+ return ResponseHeaderV2.decode(read_buffer)
+ return ResponseHeader.decode(read_buffer)
+
class Response(Struct):
__metaclass__ = abc.ABCMeta
diff --git a/kafka/protocol/parser.py b/kafka/protocol/parser.py
index cfee046..a9e7672 100644
--- a/kafka/protocol/parser.py
+++ b/kafka/protocol/parser.py
@@ -4,10 +4,9 @@ import collections
import logging
import kafka.errors as Errors
-from kafka.protocol.api import RequestHeader
from kafka.protocol.commit import GroupCoordinatorResponse
from kafka.protocol.frame import KafkaBytes
-from kafka.protocol.types import Int32
+from kafka.protocol.types import Int32, TaggedFields
from kafka.version import __version__
log = logging.getLogger(__name__)
@@ -59,9 +58,8 @@ class KafkaProtocol(object):
log.debug('Sending request %s', request)
if correlation_id is None:
correlation_id = self._next_correlation_id()
- header = RequestHeader(request,
- correlation_id=correlation_id,
- client_id=self._client_id)
+
+ header = request.build_request_header(correlation_id=correlation_id, client_id=self._client_id)
message = b''.join([header.encode(), request.encode()])
size = Int32.encode(len(message))
data = size + message
@@ -135,17 +133,12 @@ class KafkaProtocol(object):
return responses
def _process_response(self, read_buffer):
- recv_correlation_id = Int32.decode(read_buffer)
- log.debug('Received correlation id: %d', recv_correlation_id)
-
if not self.in_flight_requests:
- raise Errors.CorrelationIdError(
- 'No in-flight-request found for server response'
- ' with correlation ID %d'
- % (recv_correlation_id,))
-
+ raise Errors.CorrelationIdError('No in-flight-request found for server response')
(correlation_id, request) = self.in_flight_requests.popleft()
-
+ response_header = request.parse_response_header(read_buffer)
+ recv_correlation_id = response_header.correlation_id
+ log.debug('Received correlation id: %d', recv_correlation_id)
# 0.8.2 quirk
if (recv_correlation_id == 0 and
correlation_id != 0 and
diff --git a/kafka/protocol/types.py b/kafka/protocol/types.py
index 2fde24f..0e3685d 100644
--- a/kafka/protocol/types.py
+++ b/kafka/protocol/types.py
@@ -210,3 +210,156 @@ class Array(AbstractType):
if list_of_items is None:
return 'NULL'
return '[' + ', '.join([self.array_of.repr(item) for item in list_of_items]) + ']'
+
+
+class UnsignedVarInt32(AbstractType):
+ @classmethod
+ def decode(cls, data):
+ value, i = 0, 0
+ while True:
+ b, = struct.unpack('B', data.read(1))
+ if not (b & 0x80):
+ break
+ value |= (b & 0x7f) << i
+ i += 7
+ if i > 28:
+ raise ValueError('Invalid value {}'.format(value))
+ value |= b << i
+ return value
+
+ @classmethod
+ def encode(cls, value):
+ value &= 0xffffffff
+ ret = b''
+ while (value & 0xffffff80) != 0:
+ b = (value & 0x7f) | 0x80
+ ret += struct.pack('B', b)
+ value >>= 7
+ ret += struct.pack('B', value)
+ return ret
+
+
+class VarInt32(AbstractType):
+ @classmethod
+ def decode(cls, data):
+ value = UnsignedVarInt32.decode(data)
+ return (value >> 1) ^ -(value & 1)
+
+ @classmethod
+ def encode(cls, value):
+ # bring it in line with the java binary repr
+ value &= 0xffffffff
+ return UnsignedVarInt32.encode((value << 1) ^ (value >> 31))
+
+
+class VarInt64(AbstractType):
+ @classmethod
+ def decode(cls, data):
+ value, i = 0, 0
+ while True:
+ b = data.read(1)
+ if not (b & 0x80):
+ break
+ value |= (b & 0x7f) << i
+ i += 7
+ if i > 63:
+ raise ValueError('Invalid value {}'.format(value))
+ value |= b << i
+ return (value >> 1) ^ -(value & 1)
+
+ @classmethod
+ def encode(cls, value):
+ # bring it in line with the java binary repr
+ value &= 0xffffffffffffffff
+ v = (value << 1) ^ (value >> 63)
+ ret = b''
+ while (v & 0xffffffffffffff80) != 0:
+ b = (value & 0x7f) | 0x80
+ ret += struct.pack('B', b)
+ v >>= 7
+ ret += struct.pack('B', v)
+ return ret
+
+
+class CompactString(String):
+ def decode(self, data):
+ length = UnsignedVarInt32.decode(data) - 1
+ if length < 0:
+ return None
+ value = data.read(length)
+ if len(value) != length:
+ raise ValueError('Buffer underrun decoding string')
+ return value.decode(self.encoding)
+
+ def encode(self, value):
+ if value is None:
+ return UnsignedVarInt32.encode(0)
+ value = str(value).encode(self.encoding)
+ return UnsignedVarInt32.encode(len(value) + 1) + value
+
+
+class TaggedFields(AbstractType):
+ @classmethod
+ def decode(cls, data):
+ num_fields = UnsignedVarInt32.decode(data)
+ ret = {}
+ if not num_fields:
+ return ret
+ prev_tag = -1
+ for i in range(num_fields):
+ tag = UnsignedVarInt32.decode(data)
+ if tag <= prev_tag:
+ raise ValueError('Invalid or out-of-order tag {}'.format(tag))
+ prev_tag = tag
+ size = UnsignedVarInt32.decode(data)
+ val = data.read(size)
+ ret[tag] = val
+ return ret
+
+ @classmethod
+ def encode(cls, value):
+ ret = UnsignedVarInt32.encode(len(value))
+ for k, v in value.items():
+ # do we allow for other data types ?? It could get complicated really fast
+ assert isinstance(v, bytes), 'Value {} is not a byte array'.format(v)
+ assert isinstance(k, int) and k > 0, 'Key {} is not a positive integer'.format(k)
+ ret += UnsignedVarInt32.encode(k)
+ ret += v
+ return ret
+
+
+class CompactBytes(AbstractType):
+ @classmethod
+ def decode(cls, data):
+ length = UnsignedVarInt32.decode(data) - 1
+ if length < 0:
+ return None
+ value = data.read(length)
+ if len(value) != length:
+ raise ValueError('Buffer underrun decoding Bytes')
+ return value
+
+ @classmethod
+ def encode(cls, value):
+ if value is None:
+ return UnsignedVarInt32.encode(0)
+ else:
+ return UnsignedVarInt32.encode(len(value) + 1) + value
+
+
+class CompactArray(Array):
+
+ def encode(self, items):
+ if items is None:
+ return UnsignedVarInt32.encode(0)
+ return b''.join(
+ [UnsignedVarInt32.encode(len(items) + 1)] +
+ [self.array_of.encode(item) for item in items]
+ )
+
+ def decode(self, data):
+ length = UnsignedVarInt32.decode(data) - 1
+ if length == -1:
+ return None
+ return [self.array_of.decode(data) for _ in range(length)]
+
diff --git a/test/test_protocol.py b/test/test_protocol.py
index e295174..6a77e19 100644
--- a/test/test_protocol.py
+++ b/test/test_protocol.py
@@ -9,7 +9,7 @@ from kafka.protocol.commit import GroupCoordinatorRequest
from kafka.protocol.fetch import FetchRequest, FetchResponse
from kafka.protocol.message import Message, MessageSet, PartialMessage
from kafka.protocol.metadata import MetadataRequest
-from kafka.protocol.types import Int16, Int32, Int64, String
+from kafka.protocol.types import Int16, Int32, Int64, String, UnsignedVarInt32, CompactString, CompactArray, CompactBytes
def test_create_message():
@@ -282,3 +282,55 @@ def test_struct_unrecognized_kwargs():
def test_struct_missing_kwargs():
fr = FetchRequest[0](max_wait_time=100)
assert fr.min_bytes is None
+
+
+def test_unsigned_varint_serde():
+ pairs = {
+ 0: [0],
+ -1: [0xff, 0xff, 0xff, 0xff, 0x0f],
+ 1: [1],
+ 63: [0x3f],
+ -64: [0xc0, 0xff, 0xff, 0xff, 0x0f],
+ 64: [0x40],
+ 8191: [0xff, 0x3f],
+ -8192: [0x80, 0xc0, 0xff, 0xff, 0x0f],
+ 8192: [0x80, 0x40],
+ -8193: [0xff, 0xbf, 0xff, 0xff, 0x0f],
+ 1048575: [0xff, 0xff, 0x3f],
+
+ }
+ for value, expected_encoded in pairs.items():
+ value &= 0xffffffff
+ encoded = UnsignedVarInt32.encode(value)
+ assert encoded == b''.join(struct.pack('>B', x) for x in expected_encoded)
+ assert value == UnsignedVarInt32.decode(io.BytesIO(encoded))
+
+
+def test_compact_data_structs():
+ cs = CompactString()
+ encoded = cs.encode(None)
+ assert encoded == struct.pack('B', 0)
+ decoded = cs.decode(io.BytesIO(encoded))
+ assert decoded is None
+ assert b'\x01' == cs.encode('')
+ assert '' == cs.decode(io.BytesIO(b'\x01'))
+ encoded = cs.encode("foobarbaz")
+ assert cs.decode(io.BytesIO(encoded)) == "foobarbaz"
+
+ arr = CompactArray(CompactString())
+ assert arr.encode(None) == b'\x00'
+ assert arr.decode(io.BytesIO(b'\x00')) is None
+ enc = arr.encode([])
+ assert enc == b'\x01'
+ assert [] == arr.decode(io.BytesIO(enc))
+ encoded = arr.encode(["foo", "bar", "baz", "quux"])
+ assert arr.decode(io.BytesIO(encoded)) == ["foo", "bar", "baz", "quux"]
+
+ enc = CompactBytes.encode(None)
+ assert enc == b'\x00'
+ assert CompactBytes.decode(io.BytesIO(b'\x00')) is None
+ enc = CompactBytes.encode(b'')
+ assert enc == b'\x01'
+ assert CompactBytes.decode(io.BytesIO(b'\x01')) is b''
+ enc = CompactBytes.encode(b'foo')
+ assert CompactBytes.decode(io.BytesIO(enc)) == b'foo'