summaryrefslogtreecommitdiff
path: root/kafka/protocol/api.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/protocol/api.py')
-rw-r--r--kafka/protocol/api.py43
1 files changed, 42 insertions, 1 deletions
diff --git a/kafka/protocol/api.py b/kafka/protocol/api.py
index 64276fc..f12cb97 100644
--- a/kafka/protocol/api.py
+++ b/kafka/protocol/api.py
@@ -3,7 +3,7 @@ from __future__ import absolute_import
import abc
from kafka.protocol.struct import Struct
-from kafka.protocol.types import Int16, Int32, String, Schema, Array
+from kafka.protocol.types import Int16, Int32, String, Schema, Array, TaggedFields
class RequestHeader(Struct):
@@ -20,9 +20,40 @@ class RequestHeader(Struct):
)
+class RequestHeaderV2(Struct):
+ # Flexible response / request headers end in field buffer
+ SCHEMA = Schema(
+ ('api_key', Int16),
+ ('api_version', Int16),
+ ('correlation_id', Int32),
+ ('client_id', String('utf-8')),
+ ('tags', TaggedFields),
+ )
+
+ def __init__(self, request, correlation_id=0, client_id='kafka-python', tags=None):
+ super(RequestHeaderV2, self).__init__(
+ request.API_KEY, request.API_VERSION, correlation_id, client_id, tags or {}
+ )
+
+
+class ResponseHeader(Struct):
+ SCHEMA = Schema(
+ ('correlation_id', Int32),
+ )
+
+
+class ResponseHeaderV2(Struct):
+ SCHEMA = Schema(
+ ('correlation_id', Int32),
+ ('tags', TaggedFields),
+ )
+
+
class Request(Struct):
__metaclass__ = abc.ABCMeta
+ FLEXIBLE_VERSION = False
+
@abc.abstractproperty
def API_KEY(self):
"""Integer identifier for api request"""
@@ -50,6 +81,16 @@ class Request(Struct):
def to_object(self):
return _to_object(self.SCHEMA, self)
+ def build_request_header(self, correlation_id, client_id):
+ if self.FLEXIBLE_VERSION:
+ return RequestHeaderV2(self, correlation_id=correlation_id, client_id=client_id)
+ return RequestHeader(self, correlation_id=correlation_id, client_id=client_id)
+
+ def parse_response_header(self, read_buffer):
+ if self.FLEXIBLE_VERSION:
+ return ResponseHeaderV2.decode(read_buffer)
+ return ResponseHeader.decode(read_buffer)
+
class Response(Struct):
__metaclass__ = abc.ABCMeta