From 9feeb79140ed10e3a7f2036491fc07573740c231 Mon Sep 17 00:00:00 2001 From: Tincu Gabriel Date: Wed, 2 Dec 2020 15:45:13 +0100 Subject: Core Protocol: Add support for flexible versions (#2151) - Add support for new request and response headers, supporting flexible versions / tagged fields - Add List / Alter partition reassignments APIs - Add support for varints - Add support for compact collections (byte array, string, array) --- kafka/protocol/parser.py | 21 +++++++-------------- 1 file changed, 7 insertions(+), 14 deletions(-) (limited to 'kafka/protocol/parser.py') diff --git a/kafka/protocol/parser.py b/kafka/protocol/parser.py index cfee046..a9e7672 100644 --- a/kafka/protocol/parser.py +++ b/kafka/protocol/parser.py @@ -4,10 +4,9 @@ import collections import logging import kafka.errors as Errors -from kafka.protocol.api import RequestHeader from kafka.protocol.commit import GroupCoordinatorResponse from kafka.protocol.frame import KafkaBytes -from kafka.protocol.types import Int32 +from kafka.protocol.types import Int32, TaggedFields from kafka.version import __version__ log = logging.getLogger(__name__) @@ -59,9 +58,8 @@ class KafkaProtocol(object): log.debug('Sending request %s', request) if correlation_id is None: correlation_id = self._next_correlation_id() - header = RequestHeader(request, - correlation_id=correlation_id, - client_id=self._client_id) + + header = request.build_request_header(correlation_id=correlation_id, client_id=self._client_id) message = b''.join([header.encode(), request.encode()]) size = Int32.encode(len(message)) data = size + message @@ -135,17 +133,12 @@ class KafkaProtocol(object): return responses def _process_response(self, read_buffer): - recv_correlation_id = Int32.decode(read_buffer) - log.debug('Received correlation id: %d', recv_correlation_id) - if not self.in_flight_requests: - raise Errors.CorrelationIdError( - 'No in-flight-request found for server response' - ' with correlation ID %d' - % (recv_correlation_id,)) - + raise Errors.CorrelationIdError('No in-flight-request found for server response') (correlation_id, request) = self.in_flight_requests.popleft() - + response_header = request.parse_response_header(read_buffer) + recv_correlation_id = response_header.correlation_id + log.debug('Received correlation id: %d', recv_correlation_id) # 0.8.2 quirk if (recv_correlation_id == 0 and correlation_id != 0 and -- cgit v1.2.1