diff options
Diffstat (limited to 'kafka/protocol/parser.py')
-rw-r--r-- | kafka/protocol/parser.py | 21 |
1 files changed, 7 insertions, 14 deletions
diff --git a/kafka/protocol/parser.py b/kafka/protocol/parser.py index cfee046..a9e7672 100644 --- a/kafka/protocol/parser.py +++ b/kafka/protocol/parser.py @@ -4,10 +4,9 @@ import collections import logging import kafka.errors as Errors -from kafka.protocol.api import RequestHeader from kafka.protocol.commit import GroupCoordinatorResponse from kafka.protocol.frame import KafkaBytes -from kafka.protocol.types import Int32 +from kafka.protocol.types import Int32, TaggedFields from kafka.version import __version__ log = logging.getLogger(__name__) @@ -59,9 +58,8 @@ class KafkaProtocol(object): log.debug('Sending request %s', request) if correlation_id is None: correlation_id = self._next_correlation_id() - header = RequestHeader(request, - correlation_id=correlation_id, - client_id=self._client_id) + + header = request.build_request_header(correlation_id=correlation_id, client_id=self._client_id) message = b''.join([header.encode(), request.encode()]) size = Int32.encode(len(message)) data = size + message @@ -135,17 +133,12 @@ class KafkaProtocol(object): return responses def _process_response(self, read_buffer): - recv_correlation_id = Int32.decode(read_buffer) - log.debug('Received correlation id: %d', recv_correlation_id) - if not self.in_flight_requests: - raise Errors.CorrelationIdError( - 'No in-flight-request found for server response' - ' with correlation ID %d' - % (recv_correlation_id,)) - + raise Errors.CorrelationIdError('No in-flight-request found for server response') (correlation_id, request) = self.in_flight_requests.popleft() - + response_header = request.parse_response_header(read_buffer) + recv_correlation_id = response_header.correlation_id + log.debug('Received correlation id: %d', recv_correlation_id) # 0.8.2 quirk if (recv_correlation_id == 0 and correlation_id != 0 and |