diff options
author | Tincu Gabriel <gabi@aiven.io> | 2020-12-02 15:45:13 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-12-02 06:45:13 -0800 |
commit | 9feeb79140ed10e3a7f2036491fc07573740c231 (patch) | |
tree | fd65f63a46e43ae261008a87081c460d76810f4d /kafka/protocol/parser.py | |
parent | c48817e0d21d7752077e28f2ea9a657b9001a14b (diff) | |
download | kafka-python-9feeb79140ed10e3a7f2036491fc07573740c231.tar.gz |
Core Protocol: Add support for flexible versions (#2151)
- Add support for new request and response headers, supporting flexible
versions / tagged fields
- Add List / Alter partition reassignments APIs
- Add support for varints
- Add support for compact collections (byte array, string, array)
Diffstat (limited to 'kafka/protocol/parser.py')
-rw-r--r-- | kafka/protocol/parser.py | 21 |
1 files changed, 7 insertions, 14 deletions
diff --git a/kafka/protocol/parser.py b/kafka/protocol/parser.py index cfee046..a9e7672 100644 --- a/kafka/protocol/parser.py +++ b/kafka/protocol/parser.py @@ -4,10 +4,9 @@ import collections import logging import kafka.errors as Errors -from kafka.protocol.api import RequestHeader from kafka.protocol.commit import GroupCoordinatorResponse from kafka.protocol.frame import KafkaBytes -from kafka.protocol.types import Int32 +from kafka.protocol.types import Int32, TaggedFields from kafka.version import __version__ log = logging.getLogger(__name__) @@ -59,9 +58,8 @@ class KafkaProtocol(object): log.debug('Sending request %s', request) if correlation_id is None: correlation_id = self._next_correlation_id() - header = RequestHeader(request, - correlation_id=correlation_id, - client_id=self._client_id) + + header = request.build_request_header(correlation_id=correlation_id, client_id=self._client_id) message = b''.join([header.encode(), request.encode()]) size = Int32.encode(len(message)) data = size + message @@ -135,17 +133,12 @@ class KafkaProtocol(object): return responses def _process_response(self, read_buffer): - recv_correlation_id = Int32.decode(read_buffer) - log.debug('Received correlation id: %d', recv_correlation_id) - if not self.in_flight_requests: - raise Errors.CorrelationIdError( - 'No in-flight-request found for server response' - ' with correlation ID %d' - % (recv_correlation_id,)) - + raise Errors.CorrelationIdError('No in-flight-request found for server response') (correlation_id, request) = self.in_flight_requests.popleft() - + response_header = request.parse_response_header(read_buffer) + recv_correlation_id = response_header.correlation_id + log.debug('Received correlation id: %d', recv_correlation_id) # 0.8.2 quirk if (recv_correlation_id == 0 and correlation_id != 0 and |