summaryrefslogtreecommitdiff
path: root/kafka/protocol/offset.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/protocol/offset.py')
-rw-r--r--kafka/protocol/offset.py89
1 files changed, 87 insertions, 2 deletions
diff --git a/kafka/protocol/offset.py b/kafka/protocol/offset.py
index 3c254de..1ed382b 100644
--- a/kafka/protocol/offset.py
+++ b/kafka/protocol/offset.py
@@ -53,6 +53,43 @@ class OffsetResponse_v2(Response):
)
+class OffsetResponse_v3(Response):
+ """
+ on quota violation, brokers send out responses before throttling
+ """
+ API_KEY = 2
+ API_VERSION = 3
+ SCHEMA = OffsetResponse_v2.SCHEMA
+
+
+class OffsetResponse_v4(Response):
+ """
+ Add leader_epoch to response
+ """
+ API_KEY = 2
+ API_VERSION = 4
+ SCHEMA = Schema(
+ ('throttle_time_ms', Int32),
+ ('topics', Array(
+ ('topic', String('utf-8')),
+ ('partitions', Array(
+ ('partition', Int32),
+ ('error_code', Int16),
+ ('timestamp', Int64),
+ ('offset', Int64),
+ ('leader_epoch', Int32)))))
+ )
+
+
+class OffsetResponse_v5(Response):
+ """
+ adds a new error code, OFFSET_NOT_AVAILABLE
+ """
+ API_KEY = 2
+ API_VERSION = 5
+ SCHEMA = OffsetResponse_v4.SCHEMA
+
+
class OffsetRequest_v0(Request):
API_KEY = 2
API_VERSION = 0
@@ -105,5 +142,53 @@ class OffsetRequest_v2(Request):
}
-OffsetRequest = [OffsetRequest_v0, OffsetRequest_v1, OffsetRequest_v2]
-OffsetResponse = [OffsetResponse_v0, OffsetResponse_v1, OffsetResponse_v2]
+class OffsetRequest_v3(Request):
+ API_KEY = 2
+ API_VERSION = 3
+ RESPONSE_TYPE = OffsetResponse_v3
+ SCHEMA = OffsetRequest_v2.SCHEMA
+ DEFAULTS = {
+ 'replica_id': -1
+ }
+
+
+class OffsetRequest_v4(Request):
+ """
+ Add current_leader_epoch to request
+ """
+ API_KEY = 2
+ API_VERSION = 4
+ RESPONSE_TYPE = OffsetResponse_v4
+ SCHEMA = Schema(
+ ('replica_id', Int32),
+ ('isolation_level', Int8), # <- added isolation_level
+ ('topics', Array(
+ ('topic', String('utf-8')),
+ ('partitions', Array(
+ ('partition', Int32),
+ ('current_leader_epoch', Int64),
+ ('timestamp', Int64)))))
+ )
+ DEFAULTS = {
+ 'replica_id': -1
+ }
+
+
+class OffsetRequest_v5(Request):
+ API_KEY = 2
+ API_VERSION = 5
+ RESPONSE_TYPE = OffsetResponse_v5
+ SCHEMA = OffsetRequest_v4.SCHEMA
+ DEFAULTS = {
+ 'replica_id': -1
+ }
+
+
+OffsetRequest = [
+ OffsetRequest_v0, OffsetRequest_v1, OffsetRequest_v2,
+ OffsetRequest_v3, OffsetRequest_v4, OffsetRequest_v5,
+]
+OffsetResponse = [
+ OffsetResponse_v0, OffsetResponse_v1, OffsetResponse_v2,
+ OffsetResponse_v3, OffsetResponse_v4, OffsetResponse_v5,
+]