diff options
Diffstat (limited to 'kafka/protocol/api.py')
-rw-r--r-- | kafka/protocol/api.py | 43 |
1 files changed, 42 insertions, 1 deletions
diff --git a/kafka/protocol/api.py b/kafka/protocol/api.py index 64276fc..f12cb97 100644 --- a/kafka/protocol/api.py +++ b/kafka/protocol/api.py @@ -3,7 +3,7 @@ from __future__ import absolute_import import abc from kafka.protocol.struct import Struct -from kafka.protocol.types import Int16, Int32, String, Schema, Array +from kafka.protocol.types import Int16, Int32, String, Schema, Array, TaggedFields class RequestHeader(Struct): @@ -20,9 +20,40 @@ class RequestHeader(Struct): ) +class RequestHeaderV2(Struct): + # Flexible response / request headers end in field buffer + SCHEMA = Schema( + ('api_key', Int16), + ('api_version', Int16), + ('correlation_id', Int32), + ('client_id', String('utf-8')), + ('tags', TaggedFields), + ) + + def __init__(self, request, correlation_id=0, client_id='kafka-python', tags=None): + super(RequestHeaderV2, self).__init__( + request.API_KEY, request.API_VERSION, correlation_id, client_id, tags or {} + ) + + +class ResponseHeader(Struct): + SCHEMA = Schema( + ('correlation_id', Int32), + ) + + +class ResponseHeaderV2(Struct): + SCHEMA = Schema( + ('correlation_id', Int32), + ('tags', TaggedFields), + ) + + class Request(Struct): __metaclass__ = abc.ABCMeta + FLEXIBLE_VERSION = False + @abc.abstractproperty def API_KEY(self): """Integer identifier for api request""" @@ -50,6 +81,16 @@ class Request(Struct): def to_object(self): return _to_object(self.SCHEMA, self) + def build_request_header(self, correlation_id, client_id): + if self.FLEXIBLE_VERSION: + return RequestHeaderV2(self, correlation_id=correlation_id, client_id=client_id) + return RequestHeader(self, correlation_id=correlation_id, client_id=client_id) + + def parse_response_header(self, read_buffer): + if self.FLEXIBLE_VERSION: + return ResponseHeaderV2.decode(read_buffer) + return ResponseHeader.decode(read_buffer) + class Response(Struct): __metaclass__ = abc.ABCMeta |