From d0c6b1f95c2e677545d1faaeae525e8768abea9e Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sun, 9 Jul 2017 08:04:39 -0700 Subject: Protocol updates for 0.11.0.0 (#1127) --- kafka/protocol/offset.py | 39 ++++++++++++++++++++++++++++++++++++--- 1 file changed, 36 insertions(+), 3 deletions(-) (limited to 'kafka/protocol/offset.py') diff --git a/kafka/protocol/offset.py b/kafka/protocol/offset.py index 588dfec..8353f8c 100644 --- a/kafka/protocol/offset.py +++ b/kafka/protocol/offset.py @@ -1,7 +1,7 @@ from __future__ import absolute_import from .api import Request, Response -from .types import Array, Int16, Int32, Int64, Schema, String +from .types import Array, Int8, Int16, Int32, Int64, Schema, String class OffsetResetStrategy(object): @@ -36,6 +36,21 @@ class OffsetResponse_v1(Response): ) +class OffsetResponse_v2(Response): + API_KEY = 2 + API_VERSION = 2 + SCHEMA = Schema( + ('throttle_time_ms', Int32), + ('topics', Array( + ('topic', String('utf-8')), + ('partitions', Array( + ('partition', Int32), + ('error_code', Int16), + ('timestamp', Int64), + ('offset', Int64))))) + ) + + class OffsetRequest_v0(Request): API_KEY = 2 API_VERSION = 0 @@ -70,5 +85,23 @@ class OffsetRequest_v1(Request): } -OffsetRequest = [OffsetRequest_v0, OffsetRequest_v1] -OffsetResponse = [OffsetResponse_v0, OffsetResponse_v1] +class OffsetRequest_v2(Request): + API_KEY = 2 + API_VERSION = 2 + RESPONSE_TYPE = OffsetResponse_v2 + SCHEMA = Schema( + ('replica_id', Int32), + ('isolation_level', Int8), + ('topics', Array( + ('topic', String('utf-8')), + ('partitions', Array( + ('partition', Int32), + ('timestamp', Int64))))) + ) + DEFAULTS = { + 'replica_id': -1 + } + + +OffsetRequest = [OffsetRequest_v0, OffsetRequest_v1, OffsetRequest_v2] +OffsetResponse = [OffsetResponse_v0, OffsetResponse_v1, OffsetResponse_v2] -- cgit v1.2.1