summaryrefslogtreecommitdiff
path: root/kafka/protocol/parser.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/protocol/parser.py')
-rw-r--r--kafka/protocol/parser.py21
1 files changed, 7 insertions, 14 deletions
diff --git a/kafka/protocol/parser.py b/kafka/protocol/parser.py
index cfee046..a9e7672 100644
--- a/kafka/protocol/parser.py
+++ b/kafka/protocol/parser.py
@@ -4,10 +4,9 @@ import collections
import logging
import kafka.errors as Errors
-from kafka.protocol.api import RequestHeader
from kafka.protocol.commit import GroupCoordinatorResponse
from kafka.protocol.frame import KafkaBytes
-from kafka.protocol.types import Int32
+from kafka.protocol.types import Int32, TaggedFields
from kafka.version import __version__
log = logging.getLogger(__name__)
@@ -59,9 +58,8 @@ class KafkaProtocol(object):
log.debug('Sending request %s', request)
if correlation_id is None:
correlation_id = self._next_correlation_id()
- header = RequestHeader(request,
- correlation_id=correlation_id,
- client_id=self._client_id)
+
+ header = request.build_request_header(correlation_id=correlation_id, client_id=self._client_id)
message = b''.join([header.encode(), request.encode()])
size = Int32.encode(len(message))
data = size + message
@@ -135,17 +133,12 @@ class KafkaProtocol(object):
return responses
def _process_response(self, read_buffer):
- recv_correlation_id = Int32.decode(read_buffer)
- log.debug('Received correlation id: %d', recv_correlation_id)
-
if not self.in_flight_requests:
- raise Errors.CorrelationIdError(
- 'No in-flight-request found for server response'
- ' with correlation ID %d'
- % (recv_correlation_id,))
-
+ raise Errors.CorrelationIdError('No in-flight-request found for server response')
(correlation_id, request) = self.in_flight_requests.popleft()
-
+ response_header = request.parse_response_header(read_buffer)
+ recv_correlation_id = response_header.correlation_id
+ log.debug('Received correlation id: %d', recv_correlation_id)
# 0.8.2 quirk
if (recv_correlation_id == 0 and
correlation_id != 0 and